12个版本
0.3.5 | 2023年12月2日 |
---|---|
0.3.4 | 2023年10月12日 |
0.3.3 | 2023年9月3日 |
0.3.2 | 2023年7月31日 |
0.2.2 | 2022年11月28日 |
#370 in 算法
每月375次下载
89KB
2K SLoC
RmqttRaft - 专为普通人设计的raft框架
这是在 tikv/raft-rs 之上创建一个层的尝试,使其更容易使用和实现。这并不是最功能丰富的raft,而是一个方便的接口,可以快速开始,并在短时间内拥有一个工作的raft。
接口强烈受到 canonical/raft 中使用的接口的启发。
用法
将以下内容添加到您的 Cargo.toml
[dependencies]
rmqtt-raft = "0.3"
入门指南
为了“raft”存储,我们需要为它实现 Storage
特性。以下是一个使用 HashStore
的示例,它是一个围绕 HashMap
的线程安全包装器
/// convienient data structure to pass Message in the raft
#[derive(Serialize, Deserialize)]
pub enum Message {
Insert { key: String, value: String },
Get { key: String },
}
#[derive(Clone)]
struct HashStore(Arc<RwLock<HashMap<String, String>>>);
impl HashStore {
fn new() -> Self {
Self(Arc::new(RwLock::new(HashMap::new())))
}
fn get(&self, key: &str) -> Option<String> {
self.0.read().unwrap().get(key).cloned()
}
}
#[async_trait]
impl Store for HashStore {
async fn apply(&mut self, message: &[u8]) -> RaftResult<Vec<u8>> {
let message: Message = deserialize(message).unwrap();
let message: Vec<u8> = match message {
Message::Insert { key, value } => {
let mut db = self.0.write().unwrap();
let v = serialize(&value).unwrap();
db.insert(key, value);
v
}
_ => Vec::new(),
};
Ok(message)
}
async fn query(&self, query: &[u8]) -> RaftResult<Vec<u8>> {
let query: Message = deserialize(query).unwrap();
let data: Vec<u8> = match query {
Message::Get { key } => {
if let Some(val) = self.get(&key) {
serialize(&val).unwrap()
} else {
Vec::new()
}
}
_ => Vec::new(),
};
Ok(data)
}
async fn snapshot(&self) -> RaftResult<Vec<u8>> {
Ok(serialize(&self.0.read().unwrap().clone())?)
}
async fn restore(&mut self, snapshot: &[u8]) -> RaftResult<()> {
let new: HashMap<String, String> = deserialize(snapshot).unwrap();
let mut db = self.0.write().unwrap();
let _ = std::mem::replace(&mut *db, new);
Ok(())
}
}
Store只需要实现4个方法
Store::apply
:将已提交的条目应用到存储中。Store::query
:从存储中查询条目;Store::snapshot
:返回存储的快照数据。Store::restore
:应用作为参数传递的快照。
运行raft
#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let store = HashStore::new();
let cfg = Config {
..Default::default()
};
let raft = Raft::new(options.raft_laddr, store.clone(), logger.clone(), cfg)?;
let leader_info = raft.find_leader_info(options.peer_addrs).await?;
let mailbox = Arc::new(raft.mailbox());
let (raft_handle, mailbox) = match leader_info {
Some((leader_id, leader_addr)) => {
info!(logger, "running in follower mode");
let handle = tokio::spawn(raft.join(options.id, Some(leader_id), leader_addr));
(handle, mailbox)
}
None => {
info!(logger, "running in leader mode");
let handle = tokio::spawn(raft.lead(options.id));
(handle, mailbox)
}
};
tokio::try_join!(raft_handle)?.0?;
Ok(())
}
mailbox
为您提供了与raft交互的方式,例如发送消息或离开集群等。
致谢
这项工作基于 riteraft,但对代码进行了更多调整和改进。
许可证
此库受以下其中之一许可:
- MIT许可证 LICENSE-MIT 或 http://opensource.org/licenses/MIT
- Apache许可证2.0 LICENSE-APACHE 或 https://opensource.org/licenses/Apache-2.0
任选其一。
依赖关系
~11–23MB
~313K SLoC