#raft #consensus-algorithm #distributed-systems #ha #distributed-consensus

raft-jopemachine

Raft算法的Rust语言实现

4个版本

0.7.7 2024年3月12日
0.7.6 2024年3月12日
0.7.5 2024年1月12日

#541算法


用于 raftify

Apache-2.0

415KB
8K SLoC

Raft

tikv/raft-rs 分支,为 一个小补丁


lib.rs:

创建Raft节点

您可以使用 RawNode::new 创建Raft节点。要创建Raft节点,您需要向 Storage 组件和一个 Config 提供给 RawNode::new 函数。

use jopemachine_raft::{
logger::Slogger,
Config,
storage::MemStorage,
raw_node::RawNode,
};
use std::sync::Arc;
use slog::{Drain, o};

// Select some defaults, then change what we need.
let config = Config {
id: 1,
..Default::default()
};
// Initialize logger.
let logger = slog::Logger::root(slog_stdlog::StdLog.fuse(), o!());
let logger = Arc::new(Slogger { slog: logger.clone() });
// ... Make any configuration changes.
// After, make sure it's valid!
config.validate().unwrap();
// We'll use the built-in `MemStorage`, but you will likely want your own.
// Finally, create our Raft node!
let storage = MemStorage::new_with_conf_state((vec![1], vec![]));
let mut node = RawNode::new(&config, storage, logger).unwrap();

定时Raft节点

使用定时器以固定时间间隔定时Raft节点。以下示例使用Rust通道 recv_timeout 以至少每100ms调用一次的方式驱动Raft节点,每次调用时调用 tick()

use std::{sync::mpsc::{channel, RecvTimeoutError}, time::{Instant, Duration}};

// We're using a channel, but this could be any stream of events.
let (tx, rx) = channel();
let timeout = Duration::from_millis(100);
let mut remaining_timeout = timeout;

// Send the `tx` somewhere else...

loop {
let now = Instant::now();

match rx.recv_timeout(remaining_timeout) {
Ok(()) => {
// Let's save this for later.
unimplemented!()
},
Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => unimplemented!(),
}

let elapsed = now.elapsed();
if elapsed >= remaining_timeout {
remaining_timeout = timeout;
// We drive Raft every 100ms.
node.tick();
} else {
remaining_timeout -= elapsed;
}
}

向Raft节点提出建议并执行步骤

使用 propose 函数,您可以在客户端向Raft服务器发送请求时驱动Raft节点。您可以调用 propose 显式地将请求添加到Raft日志中。

在大多数情况下,客户端需要等待请求的响应。例如,如果客户端将值写入键并想知道写入是否成功,但由于Raft中的写入流是异步的,因此写入日志条目必须复制到其他跟随者,然后提交并最终应用于状态机,因此我们需要一种在写入完成后通知客户端的方法。

一种简单的方法是使用唯一的客户端请求ID,并将关联的回调函数保存到哈希表中。当日志条目应用时,我们可以从解码条目中获取ID,调用相应的回调并通知客户端。

当您从其他节点接收到Raft消息时,可以调用 step 函数。

以下是一个使用 proposestep 的简单示例。

#
#
#
enum Msg {
Propose {
id: u8,
callback: Box<dyn Fn() + Send>,
},
Raft(Message),
}

// Simulate a message coming down the stream.
tx.send(Msg::Propose { id: 1, callback: Box::new(|| ()) });

let mut cbs = HashMap::new();
loop {
let now = Instant::now();

match rx.recv_timeout(remaining_timeout) {
Ok(Msg::Propose { id, callback }) => {
cbs.insert(id, callback);
node.propose(vec![], vec![id]).unwrap();
}
Ok(Msg::Raft(m)) => node.step(m).unwrap(),
Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => unimplemented!(),
}

let elapsed = now.elapsed();
if elapsed >= remaining_timeout {
remaining_timeout = timeout;
// We drive Raft every 100ms.
node.tick();
} else {
remaining_timeout -= elapsed;
}
break;
}

在上面的示例中,我们使用一个通道来接收 proposestep 消息。我们只将请求数据ID提交到 Raft 日志。在您的实践中,您可以将ID嵌入到您的请求中,并提交编码后的二进制请求数据。

处理 Ready 状态

当您的 Raft 节点被触发并运行时,Raft 应该进入 Ready 状态。您需要首先使用 has_ready 来检查 Raft 是否已准备好。如果是,则使用 ready 函数获取一个 Ready 状态。

#
#
if !node.has_ready() {
return;
}

// The Raft is ready, we can do something now.
let mut ready = node.ready();

Ready 状态包含大量信息,您需要逐个检查和处理它们。

  1. 检查 messages 是否为空。如果不为空,则表示节点将向其他节点发送消息。
#
#
#
if !ready.messages().is_empty() {
for msg in ready.take_messages() {
// Send messages to other peers.
}
}
  1. 检查 snapshot 是否为空。如果不为空,则表示 Raft 节点已从领导者接收 Raft 快照,我们必须应用该快照。
#
#
#
if !ready.snapshot().is_empty() {
// This is a snapshot, we need to apply the snapshot at first.
node.mut_store()
.wl()
.apply_snapshot(ready.snapshot().clone())
.unwrap();
}

  1. 检查 committed_entries 是否为空。如果不为空,则表示有一些新的已提交日志条目,您必须将这些条目应用到状态机。当然,在应用后,您需要更新已应用索引,并在稍后恢复 apply
#
#
#
#
#
#
let mut _last_apply_index = 0;
for entry in ready.take_committed_entries() {
// Mostly, you need to save the last apply index to resume applying
// after restart. Here we just ignore this because we use a Memory storage.
_last_apply_index = entry.index;

if entry.data.is_empty() {
// Emtpy entry, when the peer becomes Leader it will send an empty entry.
continue;
}

match entry.get_entry_type() {
EntryType::EntryNormal => handle_normal(entry),
// It's recommended to always use `EntryType::EntryConfChangeV2.
EntryType::EntryConfChange => handle_conf_change(entry),
EntryType::EntryConfChangeV2 => handle_conf_change_v2(entry),
}
}

注意,尽管 Raft 保证只有已持久化的已提交条目才会被应用,但它不保证在应用之前提交索引已被持久化。例如,如果应用在将提交索引持久化之前应用了已提交条目后重新启动,则应用索引可能大于提交索引,从而导致恐慌。为了解决这个问题,在应用条目之前或同时持久化提交索引。您也可以在重启后始终将提交索引分配给 max(commit_index, applied_index)这可能有效,但可能忽略潜在日志丢失的风险

  1. 检查 entries 是否为空。如果不为空,则表示有一些新添加的条目但尚未提交,我们必须将这些条目附加到 Raft 日志中。
#
#
#
if !ready.entries().is_empty() {
// Append entries to the Raft log
node.mut_store().wl().append(ready.entries()).unwrap();
}

  1. 检查 hs 是否为空。如果不为空,则表示节点的 HardState 已更改。例如,节点可能会为新领导者投票,或者提交索引已增加。我们必须持久化更改后的 HardState
#
#
#
if let Some(hs) = ready.hs() {
// Raft HardState changed, and we need to persist it.
node.mut_store().wl().set_hardstate(hs.clone());
}
  1. 检查 persisted_messages 是否为空。如果不为空,则表示节点将在持久化硬状态、条目和快照后向其他节点发送消息。
#
#
#
if !ready.persisted_messages().is_empty() {
for msg in ready.take_persisted_messages() {
// Send persisted messages to other peers.
}
}
  1. 调用 advance 来通知先前工作已完成。获取返回值 LightReady 并像步骤 1 和步骤 3 一样处理其 messagescommitted_entries。然后调用 advance_apply 来推进内部的应用索引。
#
#
#
#
let mut light_rd = node.advance(ready);
// Like step 1 and 3, you can use functions to make them behave the same.
handle_messages(light_rd.take_messages());
handle_committed_entries(light_rd.take_committed_entries());
node.advance_apply();

有关更多信息,请参阅示例

有时在 IO 操作中不阻塞 raft 机器会更好,这样读写延迟就可以更可预测,fsync 频率也可以得到控制。该包支持异步就绪,可以将 IO 操作卸载到其他线程。用法与上面相同,除了

  1. 所有写操作都不需要立即持久化,它们可以写入内存缓存;
  2. 在所有相应的写入都持久化之后,应该发送持久化消息;
  3. 当所有写入都完成时,使用 advance_append_async 而不是 advance/advance_append
  4. 只有已持久化的条目才能提交和应用,因此为了取得进展,所有写入都应该在某一点被持久化。

任意成员变更

在构建一个健壮、可扩展的分布式系统时,强烈需要能够动态地更改对等组(peer group)的成员,而无需停机。这个Raft crate通过联合共识Raft论文,第6节)支持这一点。

它允许健壮的任意动态成员变更。成员变更可以执行以下任一或所有操作

  • 将Peer(学习者或投票者)n添加到组中。
  • 从组中移除学习者n
  • 将学习者提升为投票者。
  • 将投票者降级为学习者。
  • 用一个节点n替换另一个节点m

例如,提升学习者4并降级现有的投票者3

#
let steps = vec![
raft_proto::new_conf_change_single(4, ConfChangeType::AddNode),
raft_proto::new_conf_change_single(3, ConfChangeType::RemoveNode),
];
let mut cc = ConfChangeV2::default();
cc.set_changes(steps.into());
node.propose_conf_change(vec![], cc).unwrap();
// After the log is committed and applied
// node.apply_conf_change(&cc).unwrap();

这个过程是一个两阶段过程,在这个过程中,对等组的领导者正在管理两个独立、可能重叠的对等集。

注意:为了保持弹性保证(在两个对等集的大部分处于活动状态时取得进展),建议在将旧、已删除的对等机下线之前,等待整个对等组都退出转换阶段。

为使用raft crate的crate编写的前言。

这个前言与标准库的前言类似,因为你几乎总是想导入其全部内容,但与标准库的前言不同,你必须手动这样做。

use jopemachine_raft::prelude::*;

随着更多项的使用变得普遍,前言可能会随着时间的推移而增长。

依赖项

~3–13MB
~141K SLoC