#crdt #user-friendly #update #structures #data-structures #undo #p2p

mergable

一个用于用户友好和高效的 CRDT 库

26 个重大版本发布

0.45.0 2024 年 6 月 23 日
0.43.0 2023 年 8 月 28 日
0.41.0 2023 年 4 月 8 日
0.35.0 2023 年 3 月 6 日
0.0.0 2021 年 3 月 28 日

#129算法

Download history 163/week @ 2024-06-20 5/week @ 2024-06-27 23/week @ 2024-07-04 80/week @ 2024-07-25 12/week @ 2024-08-01

每月 92 次下载

Apache-2.0

55KB
1.5K SLoC

mergable

简单、高效、实用的 CRDT。

正在进行中:不要使用。

目标

按优先级大致排序。

  • 易于使用。
  • 全网格 P2P 支持(真正的 CRDT)。
  • 支持由常见组件构建的任意强类型数据结构。
  • 支持清理旧数据(当然,只要每个人都已看到即可)。
  • 支持撤销操作(生成“撤销”操作)。
  • 原子更新。 (不是与读取事务性,但井然有序。)
  • 相对高效。

设计

关键是具有以下接口的 Mergable trait。该包的其余部分是实现该接口的数据类型,未来将提供帮助您管理结构和同步的工具。

基本同步

基本同步非常简单。要将您的更改发送给其他人,只需发送您的结构即可。然后他们可以调用 local.merge(remote) 并现在他们有了您的更改。他们可以将他们的更改发送给您,您以相同的方式合并它们,现在您有了相同的结构。您甚至可以并行执行此操作!

let mut alice_state = unimplemented!();
let mut bob_state = unimplemented!();

// Send states to the other party.
let for_bob = serialize(&alice_state);
let for_alice = serialize(&bob_state);

// Update local states with remote.
alice_state.merge(deserialize(for_alice));
bob_state.merge(deserialize(for_bob));

assert_eq!(alice_state, bob_state);

1:1 Delta 同步

对于 1:1 同步,您维护数据结构的两个副本。一个表示远程状态,另一个表示本地状态。根据需要执行对本地状态的编辑。偶尔会生成一个 delta 并将其添加到同步队列中。

let mut remote_state = unimplemented!();
let mut local_state = remote_state.clone(); // Or whatever you had lying around.
let mut sync_queue = Vec::new();

for _ in 0..10 {
	make_changes(&mut local_state);

	let delta = local_state.diff(&remote_state);
	sync_queue.push(serialize(&delta));
	remote_state.apply(delta.clone());
}

一旦建立了网络连接,远程就可以应用您的更改。

let mut remote_state = unimplemented!();

for delta in fetch_sync_queue() {
	remote_state.apply(delta);
}

此时,两个节点都有相同的数据结构。(假设远程没有并发更改。)

1:N Delta 同步

这是一个常见的模式,其中中央服务器作为“真相之源”。这可以用于允许离线编辑(如 Google Docs)或与 P2P 备份以实现最大可靠性。这种模式可以为客户端提供最大效率,但会带来一些延迟(因为编辑需要通过服务器)。

对于客户端,这看起来完全像是 1:1 同步,一个简单的服务器看起来大致如下。

struct Server<T: Mergable> {
	state: T,
	deltas: T::Diff,
}

impl<T: Mergable> Server<T> {
	/// Get the current stae and version.
	fn get(&self) -> (Mergable, usize) {
		(self.state.clone(), self.deltas.len())
	}

	fn update(&mut self, diff: T::Diff) {
		
	}
}

let mut state = unimplemented!();
let mut deltas = Vec::new();
let mut clients = Vec::new();
for event in unimplemented!("recieve events from clients") {
	match event {
		NewClient{client} => {
			client.send(New{
				state: serialize(&state),
				version: delta.len(),
			});
			clients.push(client);
		}
		ResumeClient{client, version} => {
			for (version, delta) in deltas.iter().enumerate().skip(version) {
				client.send(Delta{
					delta: &delta,
					version: data.len(),
				})
			}
			clients.push(client);
		}
		Delta{client_id, delta} => {
			state.apply(deserialize(&delta));
			deltas.push(delta);

			for client in &mut clients {
				client.send(Delta{
					delta: &delta,
					version: data.len(),
				})
			}
		}
	}
}

这是一个简单的实现,一些事情可能可以改进。

  • 从客户端重新生成 diff 以确保它们是最小的。长时间断开连接的客户端在收到最新数据后不重新生成他们的 diff 可能会推送比所需更多的数据。对于大多数情况下实时场景中的始终连接的客户端,这可能不是一个主要问题。
  • 为了重新连接的客户端,基于他们拥有的最新版本生成一个优化的差异,而不是仅仅回放所有版本。
  • 一旦所有客户端都通过了它们,清理旧的差异数据(或者无条件地使旧差异数据过期,并将一个New状态发送给旧客户端)。

将来,我希望在这个库中提供一个服务器核心,以便轻松实现高质量的功能。

N:M 差分同步

目前这很复杂。现在最好的选择可能是让所有客户端都充当服务器,但必须注意避免在客户端中保留过多的状态。

另一个选择是保存所有对等方的最后看到的状态副本(一段时间内),并在重新连接时根据这些状态生成差分。如果数据不是很大,这将可行。

将来,我们可能会添加对共同祖先发现的支持,这将允许进行更高效的初始同步。(这类似于Git的推送和拉取方式。)

依赖项

~0.5–1MB
~23K SLoC