46 个版本
0.1.67 | 2024 年 8 月 4 日 |
---|---|
0.1.66 | 2024 年 7 月 29 日 |
0.1.65 | 2024 年 3 月 31 日 |
0.1.57 | 2024 年 2 月 26 日 |
0.1.42 | 2024 年 1 月 23 日 |
#346 在 数据库接口
每月 281 次下载
175KB
4.5K SLoC
raftify
⚠️ 警告:此库处于非常实验阶段。API 可能会损坏。
raftify 是 Raft 的高级实现,旨在简化 Raft 算法的集成。
它使用 tikv/raft-rs 和 gRPC 作为网络层,以及 heed (LMDB 包装器) 作为存储层。
快速指南
强烈建议阅读 基本 memstore 示例代码 以了解如何使用此库,但以下是一个快速指南。
定义自己的日志条目
定义要存储在 LogEntry
中的数据以及如何 序列化 和 反序列化 它。
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum LogEntry {
Insert { key: u64, value: String },
}
impl AbstractLogEntry for LogEntry {
fn encode(&self) -> Result<Vec<u8>> {
serialize(self).map_err(|e| e.into())
}
fn decode(bytes: &[u8]) -> Result<LogEntry> {
let log_entry: LogEntry = deserialize(bytes)?;
Ok(log_entry)
}
}
定义您的应用程序 Raft FSM
对于 Store
,需要实现以下三个方法。
apply
:将已提交的条目应用到存储中。snapshot
:返回存储的快照数据。restore
:应用作为参数传递的快照。
同样地,您还需要实现 encode
和 decode
。
#[derive(Clone, Debug)]
pub struct HashStore(pub Arc<RwLock<HashMap<u64, String>>>);
impl HashStore {
pub fn new() -> Self {
Self(Arc::new(RwLock::new(HashMap::new())))
}
pub fn get(&self, id: u64) -> Option<String> {
self.0.read().unwrap().get(&id).cloned()
}
}
#[async_trait]
impl AbstractStateMachine for HashStore {
async fn apply(&mut self, data: Vec<u8>) -> Result<Vec<u8>> {
let log_entry: LogEntry = LogEntry::decode(&data)?;
match log_entry {
LogEntry::Insert { ref key, ref value } => {
let mut db = self.0.write().unwrap();
log::info!("Inserted: ({}, {})", key, value);
db.insert(*key, value.clone());
}
};
Ok(data)
}
async fn snapshot(&self) -> Result<Vec<u8>> {
Ok(serialize(&self.0.read().unwrap().clone())?)
}
async fn restore(&mut self, snapshot: Vec<u8>) -> Result<()> {
let new: HashMap<u64, String> = deserialize(&snapshot[..]).unwrap();
let mut db = self.0.write().unwrap();
let _ = std::mem::replace(&mut *db, new);
Ok(())
}
fn encode(&self) -> Result<Vec<u8>> {
serialize(&self.0.read().unwrap().clone()).map_err(|e| e.into())
}
fn decode(bytes: &[u8]) -> Result<Self> {
let db: HashMap<u64, String> = deserialize(bytes)?;
Ok(Self(Arc::new(RwLock::new(db))))
}
}
引导 raft 集群
首先引导包含领导节点的集群。
let raft_addr = "127.0.0.1:60061".to_owned();
let node_id = 1;
let raft = Raft::bootstrap(
node_id,
raft_addr,
store.clone(),
raft_config,
logger.clone(),
)?;
tokio::spawn(raft.clone().run());
// ...
tokio::try_join!(raft_handle)?;
将跟随节点加入集群
然后加入跟随节点。
如果对等方指定了初始成员的配置,则所有成员节点引导后集群将开始运行。
let raft_addr = "127.0.0.1:60062".to_owned();
let peer_addr = "127.0.0.1:60061".to_owned();
let join_ticket = Raft::request_id(raft_addr, peer_addr).await;
let raft = Raft::bootstrap(
join_ticket.reserved_id,
raft_addr,
store.clone(),
raft_config,
logger.clone(),
)?;
let raft_handle = tokio::spawn(raft.clone().run());
raft.join_cluster(vec![join_ticket]).await;
// ...
tokio::try_join!(raft_handle)?;
通过 RaftServiceClient 操作 FSM
如果您想远程操作 FSM,可以使用 RaftServiceClient。
let mut leader_client = create_client(&"127.0.0.1:60061").await.unwrap();
leader_client
.propose(raft_service::ProposeArgs {
msg: LogEntry::Insert {
key: 1,
value: "test".to_string(),
}
.encode()
.unwrap(),
})
.await
.unwrap();
通过 RaftNode 操作 FSM
如果您想本地操作 FSM,使用 RaftNode 类型的 Raft 对象。
raft.propose(LogEntry::Insert {
key: 123,
value: "test".to_string(),
}.encode().unwrap()).await;
调试
您可以使用一组 CLI 命令来检查持久存储中保留的数据以及 Raft 服务器状态。
❯ raftify-cli debug persisted ./logs/node-1
---- Persisted entries ----
Key: 1, "Entry { context: [], data: [], entry_type: EntryNormal, index: 1, sync_log: false, term: 1 }"
Key: 2, "Entry { context: [], data: ConfChange { change_type: AddNode, node_id: 2, context: [127.0.0.1:60062], id: 0 }, entry_type: EntryConfChange, index: 2, sync_log: false, term: 1 }"
Key: 3, "Entry { context: [], data: ConfChange { change_type: AddNode, node_id: 3, context: [127.0.0.1:60063], id: 0 }, entry_type: EntryConfChange, index: 3, sync_log: false, term: 1 }"
---- Metadata ----
HardState { term: 1, vote: 1, commit: 3 }
ConfState { voters: [1, 2, 3], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }
Snapshot { data: HashStore(RwLock { data: {}, poisoned: false, .. }), metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [1, 2, 3], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 1, term: 1 }) }
Last index: 3
从 WAL 引导
您可以从 WAL(写入前日志)及其快照引导集群。
此功能在超过法定节点数发生故障时很有用,需要重启集群,或者在需要对集群成员进行批量更改后重启集群时。
在 RaftConfig
中使用 restore_wal_from
和 restore_wal_snapshot_from
选项。
有关更多详细信息,请参阅此示例。
其他语言支持
raftify 为以下语言提供了绑定。
参考文献
raftify 受到众多先前 Raft 实现的启发。
感谢所有相关开发者。
- tikv/raft-rs - 在此库底层使用 Rust 实现的 Raft 分布式一致性算法。
- ritelabs/riteraft - 一个面向普通人的 raft 框架。raftify 从此库分叉而来。
依赖关系
~18–31MB
~470K SLoC