7个版本
0.1.6 | 2023年4月17日 |
---|---|
0.1.5 | 2023年4月16日 |
#490 in 异步
85 每月下载量
61KB
1K SLoC
maelstrom-rust-node
又一个用于实现https://github.com/jepsen-io/maelstrom节点并解决https://fly.io/dist-sys/挑战的Rust crate。
什么是Maelstrom?
Maelstrom是一个用于学习分布式系统的平台。它基于Jepsen和Elle构建,以确保不会违反任何属性。使用Maelstrom,您可以构建节点,形成可以处理不同工作负载的分布式系统。
特性
- 异步(tokio)
- 多线程
- 简单的API - 单个trait fn实现
- 自动推导响应类型,通过Value()获取额外数据
- 处理未知消息类型
- 支持a/sync RPC() + 超时/上下文
- lin/seq/lww kv存储
- 透明错误处理
- 待办事项:thiserror + 错误解析/序列化原因
示例
回声工作负载
$ cargo build --examples
$ maelstrom test -w echo --bin ./target/debug/examples/echo --node-count 1 --time-limit 10 --log-stderr
实现
use async_trait::async_trait;
use maelstrom::protocol::Message;
use maelstrom::{Node, Result, Runtime};
use std::sync::Arc;
pub(crate) fn main() -> Result<()> {
Runtime::init(try_main())
}
async fn try_main() -> Result<()> {
let handler = Arc::new(Handler::default());
Runtime::new().with_handler(handler).run().await
}
#[derive(Clone, Default)]
struct Handler {}
#[async_trait]
impl Node for Handler {
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
if req.get_type() == "echo" {
let echo = req.body.clone().with_type("echo_ok");
return runtime.reply(req, echo).await;
}
done(runtime, message)
}
}
规范
接收
{
"src": "c1",
"dest": "n1",
"body": {
"type": "echo",
"msg_id": 1,
"echo": "Please echo 35"
}
}
发送相同的消息,body.type == echo_ok。
{
"src": "n1",
"dest": "c1",
"body": {
"type": "echo_ok",
"msg_id": 1,
"in_reply_to": 1,
"echo": "Please echo 35"
}
}
广播工作负载
$ cargo build --examples
$ RUST_LOG=debug maelstrom test -w broadcast --bin ./target/debug/examples/broadcast --node-count 2 --time-limit 20 --rate 10 --log-stderr
实现
#[async_trait]
impl Node for Handler {
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
let msg: Result<Request> = req.body.as_obj();
match msg {
Ok(Request::Read {}) => {
let data = self.snapshot();
let msg = Request::ReadOk { messages: data };
return runtime.reply(req, msg).await;
}
Ok(Request::Broadcast { message: element }) => {
if self.try_add(element) {
info!("messages now {}", element);
for node in runtime.neighbours() {
runtime.call_async(node, Request::Broadcast { message: element });
}
}
return runtime.reply_ok(req).await;
}
Ok(Request::Topology { topology }) => {
let neighbours = topology.get(runtime.node_id()).unwrap();
self.inner.lock().unwrap().t = neighbours.clone();
info!("My neighbors are {:?}", neighbours);
return runtime.reply_ok(req).await;
}
_ => done(runtime, req),
}
}
}
lin-kv工作负载
$ cargo build --examples
$ RUST_LOG=debug ~/Projects/maelstrom/maelstrom test -w lin-kv --bin ./target/debug/examples/lin_kv --node-count 4 --concurrency 2n --time-limit 10 --rate 100 --log-stderr
实现
#[async_trait]
impl Node for Handler {
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
let (ctx, _handler) = Context::new();
let msg: Result<Request> = req.body.as_obj();
match msg {
Ok(Request::Read { key }) => {
let value = self.s.get(ctx, key.to_string()).await?;
return runtime.reply(req, Request::ReadOk { value }).await;
}
Ok(Request::Write { key, value }) => {
self.s.put(ctx, key.to_string(), value).await?;
return runtime.reply(req, Request::WriteOk {}).await;
}
Ok(Request::Cas { key, from, to, put }) => {
self.s.cas(ctx, key.to_string(), from, to, put).await?;
return runtime.reply(req, Request::CasOk {}).await;
}
_ => done(runtime, req),
}
}
}
fn handler(runtime: Runtime) -> Handler {
Handler { s: lin_kv(runtime) }
}
g-set工作负载
$ cargo build --examples
$ RUST_LOG=debug ~/Projects/maelstrom/maelstrom test -w g-set --bin ./target/debug/examples/g_set --node-count 2 --concurrency 2n --time-limit 20 --rate 10 --log-stderr
实现
#[async_trait]
impl Node for Handler {
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
let msg: Result<Request> = req.body.as_obj();
match msg {
Ok(Request::Read {}) => {
let data = to_seq(&self.s.lock().unwrap());
return runtime.reply(req, Request::ReadOk { value: data }).await;
}
Ok(Request::Add { element }) => {
self.s.lock().unwrap().insert(element);
return runtime.reply(req, Request::AddOk {}).await;
}
Ok(Request::ReplicateOne { element }) => {
self.s.lock().unwrap().insert(element);
return Ok(());
}
Ok(Request::ReplicateFull { value }) => {
let mut s = self.s.lock().unwrap();
for v in value {
s.insert(v);
}
return Ok(());
}
Ok(Request::Init {}) => {
// spawn into tokio (instead of runtime) to not to wait
// until it is completed, as it will never be.
let (r0, h0) = (runtime.clone(), self.clone());
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(5)).await;
debug!("emit replication signal");
let s = h0.s.lock().unwrap();
for n in r0.neighbours() {
let msg = Request::ReplicateFull { value: to_seq(&s) };
drop(r0.send_async(n.to_string(), msg));
}
}
});
return Ok(());
}
_ => done(runtime, req),
}
}
}
API
键值存储
use async_trait::async_trait;
use maelstrom::kv::{lin_kv, Storage, KV};
use maelstrom::protocol::Message;
use maelstrom::{done, Node, Result, Runtime};
use tokio_context::context::Context;
#[derive(Clone)]
struct Handler {
s: Storage,
}
#[async_trait]
impl Node for Handler {
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
let (ctx, _handler) = Context::new();
let msg: Result<Request> = req.body.as_obj();
match msg {
Ok(Request::Read { key }) => {
let value = self.s.get(ctx, key.to_string()).await?;
return runtime.reply(req, Request::ReadOk { value }).await;
}
Ok(Request::Write { key, value }) => {
self.s.put(ctx, key.to_string(), value).await?;
return runtime.reply(req, Request::WriteOk {}).await;
}
Ok(Request::Cas { key, from, to, put }) => {
self.s.cas(ctx, key.to_string(), from, to, put).await?;
return runtime.reply(req, Request::CasOk {}).await;
}
_ => done(runtime, req),
}
}
}
fn handler(runtime: Runtime) -> Handler {
Handler { s: lin_kv(runtime) }
}
RPC
use async_trait::async_trait;
use maelstrom::protocol::Message;
use maelstrom::{Node, Result, Runtime};
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex};
#[derive(Clone, Default)]
struct Handler {}
#[async_trait]
impl Node for Handler {
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
let (mut ctx, _handler) = Context::with_timeout(Duration::from_secs(1));
// 1.
runtime.call_async(node, msg.clone());
// 2. put it into runtime.spawn(async move { ... }) if needed
let res: RPCResult = runtime.rpc(node, msg.clone()).await?;
let msg: Result<Message> = res.await;
// 3. put it into runtime.spawn(async move { ... }) if needed
let mut res: RPCResult = runtime.rpc(node, msg.clone()).await?;
let msg: Message = res.done_with(ctx).await?;
// 4. put it into runtime.spawn(async move { ... }) if needed
let msg = runtime.call(ctx, node, msg.clone()).await?;
// 5. async send variant
// spawn into tokio (instead of runtime) to not to wait
// until it is completed, as it will never be.
let (r0, h0) = (runtime.clone(), self.clone());
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(5)).await;
debug!("emit replication signal");
let s = h0.s.lock().unwrap();
for n in r0.neighbours() {
let msg = Request::ReplicateFull { value: to_seq(&s) };
drop(r0.send_async(n, msg));
}
}
});
return runtime.reply_ok(req).await;
}
}
请求
use serde::{Deserialize, Serialize};
#[derive(Deserialize)]
struct TopologyRequest {
topology: HashMap<String, Vec<String>>,
}
// or
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "snake_case", tag = "type")]
pub enum Message {
Topology {
topology: HashMap<String, Vec<String>>,
},
Broadcast {
message: u64,
},
ReadOk {
messages: Vec<u64>,
},
}
响应
use async_trait::async_trait;
use log::info;
use maelstrom::protocol::Message;
use maelstrom::{done, Node, Result, Runtime};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::sync::{Arc, Mutex};
#[derive(Clone, Default)]
struct Handler { /* ... */ }
#[async_trait]
impl Node for Handler {
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
if req.get_type() == "echo" {
let echo = req.body.clone().with_type("echo_ok");
return runtime.reply(req, echo).await;
}
if req.get_type() == "echo" {
let echo = format!("Another echo {}", message.body.msg_id);
let msg = Value::Object(Map::from_iter([("echo".to_string(), Value::String(echo))]));
return runtime.reply(message, msg).await;
}
if req.get_type() == "echo" {
let err = maelstrom::Error::TemporarilyUnavailable {};
let body = ErrorMessageBody::from_error(err);
return runtime.reply(message, body).await;
}
if req.get_type() == "echo" {
let body = MessageBody::default().with_type("echo_ok").with_reply_to(req.body.msg_id);
// send: no response type auto-deduction and no reply_to
return runtime.send(message, body).await;
}
if req.get_type() == "echo" {
return runtime.reply(message, EchoResponse { echo: "blah".to_string() }).await;
}
if req.get_type() == "read" {
let data = self.inner.lock().unwrap().clone();
let msg = ReadResponse { messages: data };
return runtime.reply(req, msg).await;
}
if req.get_type() == "broadcast" {
let raw = Value::Object(req.body.extra.clone());
let mut msg = serde_json::from_value::<BroadcastRequest>(raw)?;
msg.typ = req.body.typ.clone();
return runtime.reply(req, msg).await;
}
if req.get_type() == "broadcast" {
let mut msg = serde_json::from_value::<BroadcastRequest>(req.body.raw())?;
msg.typ = req.body.typ.clone();
return runtime.reply(req, msg).await;
}
if req.get_type() == "broadcast" {
let mut msg = req.body.as_obj::<BroadcastRequest>()?;
msg.typ = req.body.typ.clone();
return runtime.reply(req, msg).await;
}
if req.get_type() == "topology" {
info!("new topology {:?}", req.body.extra.get("topology").unwrap());
return runtime.reply_ok(req).await;
}
done(runtime, message)
}
}
为什么
因为我正在学习Rust,并且非常喜欢Maelstrom和fly.io。我想玩转语言和生态系统的不同方面,并构建一个既方便又简洁的API。对于我的自负,我表示歉意。
感谢Aphyr和各位。
在哪里
依赖
~7–15MB
~183K SLoC