1个不稳定版本
0.1.0 | 2023年12月31日 |
---|
#14 in #maelstrom
35KB
656 行
- 概述
涡旋是一个提供构建分布式系统模型并使用Rust和Maelstrom测试的基础设施的库。
它已通过[[https://fly.io/dist-sys/][Fly.io分布式系统挑战]]进行测试。
功能包括
- 消息传递所需的基础设施(包括通过Serde进行序列化和反序列化)。
- 事件循环处理和将服务实现为特质的能力。
- 例如计时器和运行时多态发送器的实用函数。
- 内置通过[[https://raft.github.io/raft.pdf][Raft]]实现的容错共识。
- 快速开始
首先,[[https://github.com/jepsen-io/maelstrom/releases/][安装Maelstrom]]以测试您的分布式系统。将二进制文件放置在您的PATH或测试期间可以访问的安全位置。
预期消息格式与Maelstrom提供的一致,因此您只需指定有效载荷(对于传入和传出消息)
#+begin_src rust use vortex_raft::*;
#[derive(Debug, Deserialize, Serialize, Clone)] #[serde(tag = "type")] #[serde(rename_all = "snake_case")] enum MsgPayload { Msg { msg: String }, MsgOk { msg: String }, } #+end_src
注意#[serde(tag = "type")]。这告诉Serde根据JSON中的type字段将有效载荷序列化为或反序列化为类型Msg或MsgOk。
#[serde(rename_all = "snake_case")]将JSON中的msg_ok转换为我们的枚举中的MsgOk。
现在,来定义我们的服务
#+begin_src rust // 定义我们需要我们的服务跟踪的所有状态 struct MsgService { msg_id: IdCounter, }
// 实现必要的特质以创建服务 impl Service for MsgService { fn create( // 在创建时不需要网络 _network: &mut Network, // 这个发送器用于本地消息 _sender: std::sync::mpsc::Sender<Event>, ) -> Self { Self { msg_id: IdCounter::new(), } }
fn step(&mut self, event: Event<MsgPayload>, network: &mut Network) -> anyhow::Result<()> {
let Event::Message(input) = event else {
panic!("MsgService should only recieve messages");
};
let MsgPayload::Msg { msg } = &input.body.payload else {
return Ok(()) // Ignore other messages
};
network
.reply(
input.src,
// increments the msg_id counter
self.msg_id.next(),
input.body.msg_id,
MsgPayload::MsgOk { msg: msg.clone() },
)
.context("Msg reply")?;
Ok(())
}
{} #+end_src
最后,启动服务的事件循环!
#+begin_src rust fn main() -> anyhow::Result<()> { MsgService::run().context("运行消息服务") } #+end_src
运行cargo build然后使用Maelstrom进行测试。例如,如果您实现了Fly.io挑战中的Echo服务,则执行
#+begin_src sh path/to/maelstrom test -w echo --bin target/debug/ --node-count 1 --time-limit 10 #+end_src
- 使用Raft
Raft实现是涡旋的一个主要特性。
要使用它,请将RaftService嵌入到您的服务中
#+begin_src rust #[derive(..)] enum MyPayload { ... }
// Raft需要复制的类型 RaftEntry = (String, u64);
type E = Event<MyPayload, (), RaftEntry>;
struct MyService { msg_id: IdCounter, raft: RaftService, }
为 LogService 实现 impl Service
// Raft needs access to the network but also a sender
// to route messages back up to this Service
let raft = RaftService::create(network, sender.map_input(E::Raft));
// Raft will ignore any type of topology you set
// So this only affects messages you send from MyService
network.set_mesh_topology();
Self {
msg_id: IdCounter::new(),
raft,
}
}
fn step(&mut self, event: E, network: &mut Network) -> anyhow::Result<()> {
...
}
{} #+end_src
Raft 服务依赖于其父服务来路由事件,因此请注意 Event::Raft(..) 并确保在 step 方法中将该事件发送到 Raft
#+begin_src rust fn step(&mut self, event: E, network: &mut Network) -> anyhow::Result<()> { match event { Event::Raft(e) => { match e { // 将 RaftMessage 和 RaftSignals 简单路由到 Raft 的 step 函数 RaftEvent::RaftMessage(message) => { self.raft.step(RaftEvent::RaftMessage(message), network)?; } RaftEvent::RaftSignal(signal) => { self.raft.step(RaftEvent::RaftSignal(signal), network)?; } // 这里你可以获取到已提交的条目
// You're gauranteed a majority of other nodes have
// replicated this data.
RaftEvent::CommitedEntry((data, client_id)) => {
network
.reply(
client_id,
self.msg_id.next(),
None,
MyPayload::CommittedOk {
data,
},
)
.context("Send reply")?;
}
}
}
...
};
Ok(())
{} #+end_src
为了在 Raft 中开始复制一个条目,只需使用 RaftService::Request
#+begin_src rust self.raft .request( ("Hi client 123, this has been replicated".to_string(), 123), network, ) .context("Requesting raft")?; #+end_src
请注意,如果发送此请求的节点不是 Raft 领导者,它不会请求,以保持一致性。
将消息转发到领导者的责任在你。
依赖项
~1.1–2.4MB
~47K SLoC