#raft #distributed-systems #ha #networking

raftify

实验性高级 Raft 框架

46 个版本

0.1.67 2024 年 8 月 4 日
0.1.66 2024 年 7 月 29 日
0.1.65 2024 年 3 月 31 日
0.1.57 2024 年 2 月 26 日
0.1.42 2024 年 1 月 23 日

#346数据库接口

Download history 27/week @ 2024-04-27 15/week @ 2024-05-04 1/week @ 2024-05-18 2/week @ 2024-05-25 293/week @ 2024-07-06 12/week @ 2024-07-13 133/week @ 2024-07-27 142/week @ 2024-08-03 6/week @ 2024-08-10

每月 281 次下载

MIT/Apache

175KB
4.5K SLoC

raftify

⚠️ 警告:此库处于非常实验阶段。API 可能会损坏。

raftify 是 Raft 的高级实现,旨在简化 Raft 算法的集成。

它使用 tikv/raft-rs 和 gRPC 作为网络层,以及 heed (LMDB 包装器) 作为存储层。

快速指南

强烈建议阅读 基本 memstore 示例代码 以了解如何使用此库,但以下是一个快速指南。

定义自己的日志条目

定义要存储在 LogEntry 中的数据以及如何 序列化反序列化 它。

#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum LogEntry {
    Insert { key: u64, value: String },
}

impl AbstractLogEntry for LogEntry {
    fn encode(&self) -> Result<Vec<u8>> {
        serialize(self).map_err(|e| e.into())
    }

    fn decode(bytes: &[u8]) -> Result<LogEntry> {
        let log_entry: LogEntry = deserialize(bytes)?;
        Ok(log_entry)
    }
}

定义您的应用程序 Raft FSM

对于 Store,需要实现以下三个方法。

  • apply:将已提交的条目应用到存储中。
  • snapshot:返回存储的快照数据。
  • restore:应用作为参数传递的快照。

同样地,您还需要实现 encodedecode

#[derive(Clone, Debug)]
pub struct HashStore(pub Arc<RwLock<HashMap<u64, String>>>);

impl HashStore {
    pub fn new() -> Self {
        Self(Arc::new(RwLock::new(HashMap::new())))
    }

    pub fn get(&self, id: u64) -> Option<String> {
        self.0.read().unwrap().get(&id).cloned()
    }
}

#[async_trait]
impl AbstractStateMachine for HashStore {
    async fn apply(&mut self, data: Vec<u8>) -> Result<Vec<u8>> {
        let log_entry: LogEntry = LogEntry::decode(&data)?;
        match log_entry {
            LogEntry::Insert { ref key, ref value } => {
                let mut db = self.0.write().unwrap();
                log::info!("Inserted: ({}, {})", key, value);
                db.insert(*key, value.clone());
            }
        };
        Ok(data)
    }

    async fn snapshot(&self) -> Result<Vec<u8>> {
        Ok(serialize(&self.0.read().unwrap().clone())?)
    }

    async fn restore(&mut self, snapshot: Vec<u8>) -> Result<()> {
        let new: HashMap<u64, String> = deserialize(&snapshot[..]).unwrap();
        let mut db = self.0.write().unwrap();
        let _ = std::mem::replace(&mut *db, new);
        Ok(())
    }

    fn encode(&self) -> Result<Vec<u8>> {
        serialize(&self.0.read().unwrap().clone()).map_err(|e| e.into())
    }

    fn decode(bytes: &[u8]) -> Result<Self> {
        let db: HashMap<u64, String> = deserialize(bytes)?;
        Ok(Self(Arc::new(RwLock::new(db))))
    }
}

引导 raft 集群

首先引导包含领导节点的集群。

let raft_addr = "127.0.0.1:60061".to_owned();
let node_id = 1;

let raft = Raft::bootstrap(
    node_id,
    raft_addr,
    store.clone(),
    raft_config,
    logger.clone(),
)?;

tokio::spawn(raft.clone().run());

// ...
tokio::try_join!(raft_handle)?;

将跟随节点加入集群

然后加入跟随节点。

如果对等方指定了初始成员的配置,则所有成员节点引导后集群将开始运行。

let raft_addr = "127.0.0.1:60062".to_owned();
let peer_addr = "127.0.0.1:60061".to_owned();
let join_ticket = Raft::request_id(raft_addr, peer_addr).await;

let raft = Raft::bootstrap(
    join_ticket.reserved_id,
    raft_addr,
    store.clone(),
    raft_config,
    logger.clone(),
)?;

let raft_handle = tokio::spawn(raft.clone().run());
raft.join_cluster(vec![join_ticket]).await;

// ...
tokio::try_join!(raft_handle)?;

通过 RaftServiceClient 操作 FSM

如果您想远程操作 FSM,可以使用 RaftServiceClient

let mut leader_client = create_client(&"127.0.0.1:60061").await.unwrap();

leader_client
    .propose(raft_service::ProposeArgs {
        msg: LogEntry::Insert {
            key: 1,
            value: "test".to_string(),
        }
        .encode()
        .unwrap(),
    })
    .await
    .unwrap();

通过 RaftNode 操作 FSM

如果您想本地操作 FSM,使用 RaftNode 类型的 Raft 对象。

raft.propose(LogEntry::Insert {
    key: 123,
    value: "test".to_string(),
}.encode().unwrap()).await;

调试

您可以使用一组 CLI 命令来检查持久存储中保留的数据以及 Raft 服务器状态。

❯ raftify-cli debug persisted ./logs/node-1
---- Persisted entries ----
Key: 1, "Entry { context: [], data: [], entry_type: EntryNormal, index: 1, sync_log: false, term: 1 }"
Key: 2, "Entry { context: [], data: ConfChange { change_type: AddNode, node_id: 2, context: [127.0.0.1:60062], id: 0 }, entry_type: EntryConfChange, index: 2, sync_log: false, term: 1 }"
Key: 3, "Entry { context: [], data: ConfChange { change_type: AddNode, node_id: 3, context: [127.0.0.1:60063], id: 0 }, entry_type: EntryConfChange, index: 3, sync_log: false, term: 1 }"

---- Metadata ----
HardState { term: 1, vote: 1, commit: 3 }
ConfState { voters: [1, 2, 3], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }
Snapshot { data: HashStore(RwLock { data: {}, poisoned: false, .. }), metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [1, 2, 3], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 1, term: 1 }) }
Last index: 3

从 WAL 引导

您可以从 WAL(写入前日志)及其快照引导集群。

此功能在超过法定节点数发生故障时很有用,需要重启集群,或者在需要对集群成员进行批量更改后重启集群时。

RaftConfig 中使用 restore_wal_fromrestore_wal_snapshot_from 选项。

有关更多详细信息,请参阅此示例

其他语言支持

raftify 为以下语言提供了绑定。

参考文献

raftify 受到众多先前 Raft 实现的启发。

感谢所有相关开发者。

  • tikv/raft-rs - 在此库底层使用 Rust 实现的 Raft 分布式一致性算法。
  • ritelabs/riteraft - 一个面向普通人的 raft 框架。raftify 从此库分叉而来。

依赖关系

~18–31MB
~470K SLoC