26 个重大版本发布
0.45.0 | 2024 年 6 月 23 日 |
---|---|
0.43.0 | 2023 年 8 月 28 日 |
0.41.0 | 2023 年 4 月 8 日 |
0.35.0 | 2023 年 3 月 6 日 |
0.0.0 | 2021 年 3 月 28 日 |
#129 在 算法 中
每月 92 次下载
55KB
1.5K SLoC
mergable
简单、高效、实用的 CRDT。
正在进行中:不要使用。
目标
按优先级大致排序。
- 易于使用。
- 全网格 P2P 支持(真正的 CRDT)。
- 支持由常见组件构建的任意强类型数据结构。
- 支持清理旧数据(当然,只要每个人都已看到即可)。
- 支持撤销操作(生成“撤销”操作)。
- 原子更新。 (不是与读取事务性,但井然有序。)
- 相对高效。
设计
关键是具有以下接口的 Mergable
trait。该包的其余部分是实现该接口的数据类型,未来将提供帮助您管理结构和同步的工具。
基本同步
基本同步非常简单。要将您的更改发送给其他人,只需发送您的结构即可。然后他们可以调用 local.merge(remote)
并现在他们有了您的更改。他们可以将他们的更改发送给您,您以相同的方式合并它们,现在您有了相同的结构。您甚至可以并行执行此操作!
let mut alice_state = unimplemented!();
let mut bob_state = unimplemented!();
// Send states to the other party.
let for_bob = serialize(&alice_state);
let for_alice = serialize(&bob_state);
// Update local states with remote.
alice_state.merge(deserialize(for_alice));
bob_state.merge(deserialize(for_bob));
assert_eq!(alice_state, bob_state);
1:1 Delta 同步
对于 1:1 同步,您维护数据结构的两个副本。一个表示远程状态,另一个表示本地状态。根据需要执行对本地状态的编辑。偶尔会生成一个 delta 并将其添加到同步队列中。
let mut remote_state = unimplemented!();
let mut local_state = remote_state.clone(); // Or whatever you had lying around.
let mut sync_queue = Vec::new();
for _ in 0..10 {
make_changes(&mut local_state);
let delta = local_state.diff(&remote_state);
sync_queue.push(serialize(&delta));
remote_state.apply(delta.clone());
}
一旦建立了网络连接,远程就可以应用您的更改。
let mut remote_state = unimplemented!();
for delta in fetch_sync_queue() {
remote_state.apply(delta);
}
此时,两个节点都有相同的数据结构。(假设远程没有并发更改。)
1:N Delta 同步
这是一个常见的模式,其中中央服务器作为“真相之源”。这可以用于允许离线编辑(如 Google Docs)或与 P2P 备份以实现最大可靠性。这种模式可以为客户端提供最大效率,但会带来一些延迟(因为编辑需要通过服务器)。
对于客户端,这看起来完全像是 1:1
同步,一个简单的服务器看起来大致如下。
struct Server<T: Mergable> {
state: T,
deltas: T::Diff,
}
impl<T: Mergable> Server<T> {
/// Get the current stae and version.
fn get(&self) -> (Mergable, usize) {
(self.state.clone(), self.deltas.len())
}
fn update(&mut self, diff: T::Diff) {
}
}
let mut state = unimplemented!();
let mut deltas = Vec::new();
let mut clients = Vec::new();
for event in unimplemented!("recieve events from clients") {
match event {
NewClient{client} => {
client.send(New{
state: serialize(&state),
version: delta.len(),
});
clients.push(client);
}
ResumeClient{client, version} => {
for (version, delta) in deltas.iter().enumerate().skip(version) {
client.send(Delta{
delta: &delta,
version: data.len(),
})
}
clients.push(client);
}
Delta{client_id, delta} => {
state.apply(deserialize(&delta));
deltas.push(delta);
for client in &mut clients {
client.send(Delta{
delta: &delta,
version: data.len(),
})
}
}
}
}
这是一个简单的实现,一些事情可能可以改进。
- 从客户端重新生成 diff 以确保它们是最小的。长时间断开连接的客户端在收到最新数据后不重新生成他们的 diff 可能会推送比所需更多的数据。对于大多数情况下实时场景中的始终连接的客户端,这可能不是一个主要问题。
- 为了重新连接的客户端,基于他们拥有的最新版本生成一个优化的差异,而不是仅仅回放所有版本。
- 一旦所有客户端都通过了它们,清理旧的差异数据(或者无条件地使旧差异数据过期,并将一个
New
状态发送给旧客户端)。
将来,我希望在这个库中提供一个服务器核心,以便轻松实现高质量的功能。
N:M 差分同步
目前这很复杂。现在最好的选择可能是让所有客户端都充当服务器,但必须注意避免在客户端中保留过多的状态。
另一个选择是保存所有对等方的最后看到的状态副本(一段时间内),并在重新连接时根据这些状态生成差分。如果数据不是很大,这将可行。
将来,我们可能会添加对共同祖先发现的支持,这将允许进行更高效的初始同步。(这类似于Git的推送和拉取方式。)
依赖项
~0.5–1MB
~23K SLoC