#distributed-systems #maelstrom #fly-io

maelstrom-common

运行 Maelstrom 节点的简单样板代码抽象

2 个版本

0.1.1 2023 年 3 月 4 日
0.1.0 2023 年 3 月 4 日

#233性能分析

自定义许可

14KB
87

Maelstrom

jepsen-io/maelstrom 是一个测试分布式系统玩具实现的平台。

此 crate 抽象了分布式系统中节点设置 stdin/stdout 的样板代码,并提供了一些有用的实用工具来编写处理器。

此 crate 受 Fly.io 分布式系统挑战 的启发,主要为此编写。

使用方法

要使用此 crate,您将创建一个能够处理一些 rpc 的节点。使用可序列化的 Message 枚举定义 rpc 消息,并定义任何有意义错误类型,以便在节点出现严重问题时尽早停止 maelstrom 测试。

节点必须实现 [HandleMessage] 特性,该特性需要一个 handle_message 函数,它接受一个 [Envelope] 和一个用于可选发送消息的 [Sender]。

示例

让我们创建一个简单的回声节点,该节点对 initecho 消息做出响应。这也对应于 Echo 挑战,该挑战在 Fly.io 分布式系统挑战集 中。

use maelstrom_common::{run, HandleMessage, Envelope};
use serde::{Deserialize, Serialize};
use core::panic;
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum Message {
    #[serde(rename = "init")]
    Init {
        #[serde(skip_serializing_if = "Option::is_none")]
        msg_id: Option<usize>,
        node_id: String
    },
    #[serde(rename = "echo")]
    Echo {
        echo: String,
        #[serde(skip_serializing_if = "Option::is_none")]
        msg_id: Option<usize>
    },
    #[serde(rename = "init_ok")]
    InitOk {
        #[serde(skip_serializing_if = "Option::is_none")]
        in_reply_to: Option<usize>
    },
    #[serde(rename = "echo_ok")]
    EchoOk {
        echo: String,
        #[serde(skip_serializing_if = "Option::is_none")]
        in_reply_to: Option<usize>
    },
}
#[derive(Debug, Default)]
pub struct Echo {
    // Store our ID when a client initializes us.
    node_id: Option<String>,
}
impl HandleMessage for Echo {
    type Message = Message;
    type Error = std::io::Error;
    fn handle_message(
        &mut self,
        msg: Envelope<Self::Message>,
        outbound_msg_tx: std::sync::mpsc::Sender<Envelope<Self::Message>>,
    ) -> Result<(), Self::Error> {
        match msg.body {
            Message::Init { msg_id, ref node_id } => {
                self.node_id = Some(node_id.clone());
                outbound_msg_tx.send(
                    msg.reply(Message::InitOk { in_reply_to: msg_id })
                ).unwrap();
                Ok(())
            },
            Message::Echo { ref echo, msg_id } => {
                outbound_msg_tx.send(
                    msg.reply(
                    Message::EchoOk { echo: echo.to_owned(), in_reply_to: msg_id }
                    )
                ).unwrap();
                Ok(())
            },
            _ => panic!("{}", format!("Unexpected message: {:#?}", serde_json::to_string_pretty(&msg)))
        }
    }
}
pub fn main() -> Result<(), Box<dyn std::error::Error>> {
    run(Echo::default())?;
   Ok(())
}

贡献

问题拉取请求Github 星标 总是受到欢迎。

依赖

~0.7–1.4MB
~33K SLoC