4个版本
0.7.7 | 2024年3月12日 |
---|---|
0.7.6 | 2024年3月12日 |
0.7.5 | 2024年1月12日 |
#541 在 算法
用于 raftify
415KB
8K SLoC
Raft
从 tikv/raft-rs 分支,为 一个小补丁
lib.rs
:
创建Raft节点
您可以使用 RawNode::new
创建Raft节点。要创建Raft节点,您需要向 Storage
组件和一个 Config
提供给 RawNode::new
函数。
use jopemachine_raft::{
logger::Slogger,
Config,
storage::MemStorage,
raw_node::RawNode,
};
use std::sync::Arc;
use slog::{Drain, o};
// Select some defaults, then change what we need.
let config = Config {
id: 1,
..Default::default()
};
// Initialize logger.
let logger = slog::Logger::root(slog_stdlog::StdLog.fuse(), o!());
let logger = Arc::new(Slogger { slog: logger.clone() });
// ... Make any configuration changes.
// After, make sure it's valid!
config.validate().unwrap();
// We'll use the built-in `MemStorage`, but you will likely want your own.
// Finally, create our Raft node!
let storage = MemStorage::new_with_conf_state((vec![1], vec![]));
let mut node = RawNode::new(&config, storage, logger).unwrap();
定时Raft节点
使用定时器以固定时间间隔定时Raft节点。以下示例使用Rust通道 recv_timeout
以至少每100ms调用一次的方式驱动Raft节点,每次调用时调用 tick()
。
use std::{sync::mpsc::{channel, RecvTimeoutError}, time::{Instant, Duration}};
// We're using a channel, but this could be any stream of events.
let (tx, rx) = channel();
let timeout = Duration::from_millis(100);
let mut remaining_timeout = timeout;
// Send the `tx` somewhere else...
loop {
let now = Instant::now();
match rx.recv_timeout(remaining_timeout) {
Ok(()) => {
// Let's save this for later.
unimplemented!()
},
Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => unimplemented!(),
}
let elapsed = now.elapsed();
if elapsed >= remaining_timeout {
remaining_timeout = timeout;
// We drive Raft every 100ms.
node.tick();
} else {
remaining_timeout -= elapsed;
}
}
向Raft节点提出建议并执行步骤
使用 propose
函数,您可以在客户端向Raft服务器发送请求时驱动Raft节点。您可以调用 propose
显式地将请求添加到Raft日志中。
在大多数情况下,客户端需要等待请求的响应。例如,如果客户端将值写入键并想知道写入是否成功,但由于Raft中的写入流是异步的,因此写入日志条目必须复制到其他跟随者,然后提交并最终应用于状态机,因此我们需要一种在写入完成后通知客户端的方法。
一种简单的方法是使用唯一的客户端请求ID,并将关联的回调函数保存到哈希表中。当日志条目应用时,我们可以从解码条目中获取ID,调用相应的回调并通知客户端。
当您从其他节点接收到Raft消息时,可以调用 step
函数。
以下是一个使用 propose
和 step
的简单示例。
#
#
#
enum Msg {
Propose {
id: u8,
callback: Box<dyn Fn() + Send>,
},
Raft(Message),
}
// Simulate a message coming down the stream.
tx.send(Msg::Propose { id: 1, callback: Box::new(|| ()) });
let mut cbs = HashMap::new();
loop {
let now = Instant::now();
match rx.recv_timeout(remaining_timeout) {
Ok(Msg::Propose { id, callback }) => {
cbs.insert(id, callback);
node.propose(vec![], vec![id]).unwrap();
}
Ok(Msg::Raft(m)) => node.step(m).unwrap(),
Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => unimplemented!(),
}
let elapsed = now.elapsed();
if elapsed >= remaining_timeout {
remaining_timeout = timeout;
// We drive Raft every 100ms.
node.tick();
} else {
remaining_timeout -= elapsed;
}
break;
}
在上面的示例中,我们使用一个通道来接收 propose
和 step
消息。我们只将请求数据ID提交到 Raft 日志。在您的实践中,您可以将ID嵌入到您的请求中,并提交编码后的二进制请求数据。
处理 Ready 状态
当您的 Raft 节点被触发并运行时,Raft 应该进入 Ready
状态。您需要首先使用 has_ready
来检查 Raft 是否已准备好。如果是,则使用 ready
函数获取一个 Ready
状态。
#
#
if !node.has_ready() {
return;
}
// The Raft is ready, we can do something now.
let mut ready = node.ready();
Ready
状态包含大量信息,您需要逐个检查和处理它们。
- 检查
messages
是否为空。如果不为空,则表示节点将向其他节点发送消息。
#
#
#
if !ready.messages().is_empty() {
for msg in ready.take_messages() {
// Send messages to other peers.
}
}
- 检查
snapshot
是否为空。如果不为空,则表示 Raft 节点已从领导者接收 Raft 快照,我们必须应用该快照。
#
#
#
if !ready.snapshot().is_empty() {
// This is a snapshot, we need to apply the snapshot at first.
node.mut_store()
.wl()
.apply_snapshot(ready.snapshot().clone())
.unwrap();
}
- 检查
committed_entries
是否为空。如果不为空,则表示有一些新的已提交日志条目,您必须将这些条目应用到状态机。当然,在应用后,您需要更新已应用索引,并在稍后恢复apply
。
#
#
#
#
#
#
let mut _last_apply_index = 0;
for entry in ready.take_committed_entries() {
// Mostly, you need to save the last apply index to resume applying
// after restart. Here we just ignore this because we use a Memory storage.
_last_apply_index = entry.index;
if entry.data.is_empty() {
// Emtpy entry, when the peer becomes Leader it will send an empty entry.
continue;
}
match entry.get_entry_type() {
EntryType::EntryNormal => handle_normal(entry),
// It's recommended to always use `EntryType::EntryConfChangeV2.
EntryType::EntryConfChange => handle_conf_change(entry),
EntryType::EntryConfChangeV2 => handle_conf_change_v2(entry),
}
}
注意,尽管 Raft 保证只有已持久化的已提交条目才会被应用,但它不保证在应用之前提交索引已被持久化。例如,如果应用在将提交索引持久化之前应用了已提交条目后重新启动,则应用索引可能大于提交索引,从而导致恐慌。为了解决这个问题,在应用条目之前或同时持久化提交索引。您也可以在重启后始终将提交索引分配给 max(commit_index, applied_index)
,这可能有效,但可能忽略潜在日志丢失的风险。
- 检查
entries
是否为空。如果不为空,则表示有一些新添加的条目但尚未提交,我们必须将这些条目附加到 Raft 日志中。
#
#
#
if !ready.entries().is_empty() {
// Append entries to the Raft log
node.mut_store().wl().append(ready.entries()).unwrap();
}
- 检查
hs
是否为空。如果不为空,则表示节点的HardState
已更改。例如,节点可能会为新领导者投票,或者提交索引已增加。我们必须持久化更改后的HardState
。
#
#
#
if let Some(hs) = ready.hs() {
// Raft HardState changed, and we need to persist it.
node.mut_store().wl().set_hardstate(hs.clone());
}
- 检查
persisted_messages
是否为空。如果不为空,则表示节点将在持久化硬状态、条目和快照后向其他节点发送消息。
#
#
#
if !ready.persisted_messages().is_empty() {
for msg in ready.take_persisted_messages() {
// Send persisted messages to other peers.
}
}
- 调用
advance
来通知先前工作已完成。获取返回值LightReady
并像步骤 1 和步骤 3 一样处理其messages
和committed_entries
。然后调用advance_apply
来推进内部的应用索引。
#
#
#
#
let mut light_rd = node.advance(ready);
// Like step 1 and 3, you can use functions to make them behave the same.
handle_messages(light_rd.take_messages());
handle_committed_entries(light_rd.take_committed_entries());
node.advance_apply();
有关更多信息,请参阅示例。
有时在 IO 操作中不阻塞 raft 机器会更好,这样读写延迟就可以更可预测,fsync 频率也可以得到控制。该包支持异步就绪,可以将 IO 操作卸载到其他线程。用法与上面相同,除了
- 所有写操作都不需要立即持久化,它们可以写入内存缓存;
- 在所有相应的写入都持久化之后,应该发送持久化消息;
- 当所有写入都完成时,使用
advance_append_async
而不是advance/advance_append
。 - 只有已持久化的条目才能提交和应用,因此为了取得进展,所有写入都应该在某一点被持久化。
任意成员变更
在构建一个健壮、可扩展的分布式系统时,强烈需要能够动态地更改对等组(peer group)的成员,而无需停机。这个Raft crate通过联合共识(Raft论文,第6节)支持这一点。
它允许健壮的任意动态成员变更。成员变更可以执行以下任一或所有操作
- 将Peer(学习者或投票者)n添加到组中。
- 从组中移除学习者n。
- 将学习者提升为投票者。
- 将投票者降级为学习者。
- 用一个节点n替换另一个节点m。
例如,提升学习者4并降级现有的投票者3
#
let steps = vec![
raft_proto::new_conf_change_single(4, ConfChangeType::AddNode),
raft_proto::new_conf_change_single(3, ConfChangeType::RemoveNode),
];
let mut cc = ConfChangeV2::default();
cc.set_changes(steps.into());
node.propose_conf_change(vec![], cc).unwrap();
// After the log is committed and applied
// node.apply_conf_change(&cc).unwrap();
这个过程是一个两阶段过程,在这个过程中,对等组的领导者正在管理两个独立、可能重叠的对等集。
注意:为了保持弹性保证(在两个对等集的大部分处于活动状态时取得进展),建议在将旧、已删除的对等机下线之前,等待整个对等组都退出转换阶段。
为使用raft
crate的crate编写的前言。
这个前言与标准库的前言类似,因为你几乎总是想导入其全部内容,但与标准库的前言不同,你必须手动这样做。
use jopemachine_raft::prelude::*;
随着更多项的使用变得普遍,前言可能会随着时间的推移而增长。
依赖项
~3–13MB
~141K SLoC