#raft-consensus #distributed-systems #raft #maelstrom #distributed-consensus #jepsen

涡旋raft

用于Maelstrom的Raft共识分布式系统库

1个不稳定版本

0.1.0 2023年12月31日

#14 in #maelstrom

MIT/Apache

35KB
656

  • 概述

涡旋是一个提供构建分布式系统模型并使用Rust和Maelstrom测试的基础设施的库。

它已通过[[https://fly.io/dist-sys/][Fly.io分布式系统挑战]]进行测试。

功能包括

  • 消息传递所需的基础设施(包括通过Serde进行序列化和反序列化)。
  • 事件循环处理和将服务实现为特质的能力。
  • 例如计时器和运行时多态发送器的实用函数。
  • 内置通过[[https://raft.github.io/raft.pdf][Raft]]实现的容错共识。
  • 快速开始

首先,[[https://github.com/jepsen-io/maelstrom/releases/][安装Maelstrom]]以测试您的分布式系统。将二进制文件放置在您的PATH或测试期间可以访问的安全位置。

预期消息格式与Maelstrom提供的一致,因此您只需指定有效载荷(对于传入和传出消息)

#+begin_src rust use vortex_raft::*;

#[derive(Debug, Deserialize, Serialize, Clone)] #[serde(tag = "type")] #[serde(rename_all = "snake_case")] enum MsgPayload { Msg { msg: String }, MsgOk { msg: String }, } #+end_src

注意#[serde(tag = "type")]。这告诉Serde根据JSON中的type字段将有效载荷序列化为或反序列化为类型MsgMsgOk

#[serde(rename_all = "snake_case")]将JSON中的msg_ok转换为我们的枚举中的MsgOk

现在,来定义我们的服务

#+begin_src rust // 定义我们需要我们的服务跟踪的所有状态 struct MsgService { msg_id: IdCounter, }

// 实现必要的特质以创建服务 impl Service for MsgService { fn create( // 在创建时不需要网络 _network: &mut Network, // 这个发送器用于本地消息 _sender: std::sync::mpsc::Sender<Event>, ) -> Self { Self { msg_id: IdCounter::new(), } }

  fn step(&mut self, event: Event<MsgPayload>, network: &mut Network) -> anyhow::Result<()> {
      let Event::Message(input) = event else {
          panic!("MsgService should only recieve messages");
      };


      let MsgPayload::Msg { msg } = &input.body.payload else {
          return Ok(())  // Ignore other messages
      };

      network
          .reply(
              input.src,
              // increments the msg_id counter
              self.msg_id.next(), 
              input.body.msg_id,
              MsgPayload::MsgOk { msg: msg.clone() },
          )
          .context("Msg reply")?;
      Ok(())
  }

{} #+end_src

最后,启动服务的事件循环!

#+begin_src rust fn main() -> anyhow::Result<()> { MsgService::run().context("运行消息服务") } #+end_src

运行cargo build然后使用Maelstrom进行测试。例如,如果您实现了Fly.io挑战中的Echo服务,则执行

#+begin_src sh path/to/maelstrom test -w echo --bin target/debug/ --node-count 1 --time-limit 10 #+end_src

  • 使用Raft

Raft实现是涡旋的一个主要特性。

要使用它,请将RaftService嵌入到您的服务中

#+begin_src rust #[derive(..)] enum MyPayload { ... }

// Raft需要复制的类型 RaftEntry = (String, u64);

type E = Event<MyPayload, (), RaftEntry>;

struct MyService { msg_id: IdCounter, raft: RaftService, }

为 LogService 实现 impl Service { fn create(network: &mut Network, sender: mpsc::Sender) -> Self {

      // Raft needs access to the network but also a sender
      // to route messages back up to this Service
      let raft = RaftService::create(network, sender.map_input(E::Raft));

      // Raft will ignore any type of topology you set
      // So this only affects messages you send from MyService 
      network.set_mesh_topology();
      Self {
          msg_id: IdCounter::new(),
          raft,
      }
  }
  fn step(&mut self, event: E, network: &mut Network) -> anyhow::Result<()> {
      ...
  }

{} #+end_src

Raft 服务依赖于其父服务来路由事件,因此请注意 Event::Raft(..) 并确保在 step 方法中将该事件发送到 Raft

#+begin_src rust fn step(&mut self, event: E, network: &mut Network) -> anyhow::Result<()> { match event { Event::Raft(e) => { match e { // 将 RaftMessage 和 RaftSignals 简单路由到 Raft 的 step 函数 RaftEvent::RaftMessage(message) => { self.raft.step(RaftEvent::RaftMessage(message), network)?; } RaftEvent::RaftSignal(signal) => { self.raft.step(RaftEvent::RaftSignal(signal), network)?; } // 这里你可以获取到已提交的条目

              // You're gauranteed a majority of other nodes have
              // replicated this data.
              RaftEvent::CommitedEntry((data, client_id)) => {
                    network
                        .reply(
                            client_id,
                            self.msg_id.next(),
                            None,
                            MyPayload::CommittedOk {
                                data,
                            },
                        )
                        .context("Send reply")?;
              }
          }
      }
      ...
  };

  Ok(())

{} #+end_src

为了在 Raft 中开始复制一个条目,只需使用 RaftService::Request

#+begin_src rust self.raft .request( ("Hi client 123, this has been replicated".to_string(), 123), network, ) .context("Requesting raft")?; #+end_src

请注意,如果发送此请求的节点不是 Raft 领导者,它不会请求,以保持一致性。

将消息转发到领导者的责任在你。

依赖项

~1.1–2.4MB
~47K SLoC