12 个版本
0.2.3 | 2023年8月11日 |
---|---|
0.2.2 | 2023年8月11日 |
0.2.1 | 2023年3月13日 |
0.1.15 | 2022年6月21日 |
在 数据结构 中排名第 255
每月下载量 45
用于 libmelda-tools
260KB
4K SLoC
什么是 Melda?
Melda 是一个 Delta-State JSON CRDT。CRDT(无冲突副本数据类型)是一种可以在网络中的多台计算机之间复制的(复制)的数据结构。每个副本都可以独立和并发地更新,无需中央协调或同步。每个副本上的更新可以在任何时候合并。
存在不同类型的 CRDT:基于操作 CRDT(在副本之间生成和交换更新操作),基于状态 CRDT(交换和合并每个副本的完整状态),以及类似 Melda 的 Delta-State CRDT(仅交换数据类型版本或状态的差异)。
Melda 原生支持 JSON 数据格式,并提供了一种同步任意 JSON 文档更改的方法。您可以使用此 Rust 库或使用 命令行工具 来处理 Melda CRDT。在 Kibi w/Melda 仓库中,您可以找到原始 Kibi 文本编辑器的分支,该编辑器使用 Melda 实现了协作功能。
如何使用 Melda?
以下指南也可在 示例 中找到。您可以使用 cargo run --examples simple
运行它。
首先,在 Cargo.toml 中添加以下依赖项
melda = { git = "https://github.com/slashdotted/libmelda" }
或者
melda = "0.2.3"
如果使用来自 crates.io 的 crate,根据需要调整版本字符串。然后导入所需的模块。对于此示例,您需要
use melda::{filesystemadapter::FilesystemAdapter, melda::Melda};
use serde_json::json;
use std::sync::{Arc, RwLock};
为了了解如何使用 Melda,我们考虑以下情况,其中由虚构的活动规划软件(即待办事项管理软件)使用的共享 JSON 文档由多个方面并发更新。提供的 JSON 由应用程序生成(通过序列化其数据模型)。我们假设用户 Alice 创建了共享 JSON 文档的第一个版本,该版本将命名为 v1_alice.json。此第一个版本包含以下数据
{
"software" : "MeldaDo",
"version" : "1.0.0",
"items♭" : []
}
根对象包含三个字段:一个定义应用程序名称的 软件 字段,一个设置软件版本的 版本 字段,以及一个 items♭ 字段,它映射到一个 JSON 对象数组(每个待办事项一个)。由于这是第一个版本,items 数组是空的。使用 ♭ 后缀是要求 Melda 将数组的内容 展开,通过顺序提取包含的 JSON 对象,以便单独跟踪它们的更改。
为了更好地理解展开过程的目的,考虑 Melda 如何处理以下两个 JSON 文件。第一个文件名为 v2_alice_noflat.json 包含
{
"software" : "MeldaDo",
"version" : "1.0.0",
"items" : [
{"_id" : "alice_todo_01", "title" : "Buy milk", "description" : "Go to the grocery store"}
]
}
在这种情况下,Melda 将保持根对象不变,一个用户对 items 数组所做的更改将不会与其他用户所做的更改合并。例如,如果有两个用户在自己的副本中向数组添加元素,然后合并这些副本,则只会看到其中一个元素。相反,现在考虑文档的另一个版本,名为 v2_alice.json,它包含
{
"software" : "MeldaDo",
"version" : "1.0.0",
"items♭" : [
{"_id" : "alice_todo_01", "title" : "Buy milk", "description" : "Go to the grocery store"}
]
}
在这种情况下,items♭ 数组中的对象将被提取并单独跟踪。特别是,上述文档产生了以下两个 JSON 对象
{
"_id" : "√",
"software" : "MeldaDo",
"version" : "1.0.0",
"items♭" : [
"alice_todo_01"
]
}
以及待办事项本身
{
"_id" : "alice_todo_01",
"title" : "Buy milk",
"description" : "Go to the grocery store"
}
请注意,每个对象都有自己的唯一标识符存储在 _id 字段中。如果客户端应用程序没有提供标识符,Melda 将自动生成一个。根对象始终由 √ (此标识符不能由客户端应用程序更改)标识。由于 items♭ 数组中的每个对象都是单独跟踪的,因此如果用户向数组添加元素,然后在与其他用户的副本合并后,所有更改都将被保留。
如果项目集合太大,我们可以要求 Melda 仅存储文档最新版本与上一个版本之间的差异数组。为此,我们只需将 items 字段的关键字前缀为 Δ(希腊大写字母 delta)。因此,版本 delta_alice.json 可能成为
{
"software" : "MeldaDo",
"version" : "1.0.0",
"Δitems♭" : [
{"_id" : "alice_todo_01", "title" : "Buy milk", "description" : "Go to the grocery store"}
]
}
为了简化问题,在以下内容中我们将不使用差异数组。让我们回到我们的示例情况... 到目前为止,我们只考虑了一些 JSON 数据,但我们还没有看到如何与 Melda 交互以更新数据结构。
适配器
Melda 实现了一种模块化设计,其中 CRDT 的逻辑与数据存储分离。通过 适配器 来实现数据存储(在我们的情况下,delta 状态)。Melda 已经提供了不同类型的适配器,支持内存存储(MemoryAdapter)、文件系统中的一个文件夹(FilesystemAdapter)、SQLite 数据库(SQLiteAdapter)和 Solid Pod(SolidAdapter)。此外,还可以使用元适配器使用 Flate2 算法压缩数据(Flate2Adapter):其他适配器可以与 Flate2Adapter 结合使用,以便在所选后端存储压缩数据。
我们可以初始化一个适配器,它将在文件系统上的 todolist 目录中存储数据如下(FilesystemAdapter)
let adapter = Box::new(FilesystemAdapter::new("todolist").expect("Cannot initialize adapter"));
如果我们想使用压缩,我们可以添加 Flate2Adapter 如下
let adapter = Box::new(Flate2Adapter::new(Arc::new(RwLock::new(Box::new(
FilesystemAdapter::new("todolist").expect("Cannot initialize adapter"))))));
或者,您可以使用 get_adapter 函数从 URL 初始化适配器
let adapter = get_adapter("file+flate://todolist").unwrap();
get_adapter 函数的有效方案
存储类型 | 示例路径 | 描述 |
---|---|---|
内存(memory://) | memory:// | |
内存+Deflate 压缩(memory+flate://) | memory+flate:// | |
内存+Brotli 压缩(memory+brotli://) | memory+brotli:// | |
文件夹(file://) | file://mycrdtdocument | 文件夹的绝对路径(可以是网络共享) |
使用Deflate压缩的文件夹(file+flate://) | file+flate://mycrdtdocument | 文件夹的绝对路径(可以是网络共享) |
使用Brotli压缩的文件夹(file+brotli://) | file+brotli://mycrdtdocument | 文件夹的绝对路径(可以是网络共享) |
Solid Pod(solid://) | solid://anuser.solidcommunity.net/mycrdtdocument | Solid Pod的URL |
Solid Pod使用Deflate压缩(solid+flate://) | solid+flate://anuser.solidcommunity.net/mycrdtdocument | Solid Pod的URL |
Solid Pod使用Brotli压缩(solid+brotli://) | solid+brotli://anuser.solidcommunity.net/mycrdtdocument | Solid Pod的URL |
SQLite(sqlite://) | sqlite://mycrdtdocument | 需要数据库名称(使用 :memory: 进行内存存储) |
使用Deflate压缩的SQLite(sqlite+flate://) | sqlite+flate://mycrdtdocument | 需要数据库名称(使用 :memory: 进行内存存储) |
使用Brotli压缩的SQLite(sqlite+brotli://) | sqlite+brotli://mycrdtdocument | 需要数据库名称(使用 :memory: 进行内存存储) |
对于 Solid Pod的访问,需要用户名和密码。
初始化Melda
要初始化Melda,我们使用new
方法,传递选择的适配器
let mut m = Melda::new(Arc::new(RwLock::new(adapter))).expect("Failed to inizialize Melda");
或者您可以使用URL
let mut m = Melda::new_from_url("file+flate://todolist").expect("Failed to inizialize Melda");
请注意,如果我们只想读取CRDT,则可以移除mut
修饰符
更新CRDT
为了更新CRDT的状态,我们使用update
方法。首先,我们需要将JSON数据解析为JSON值:因为我们使用serde_json
,所以我们调用serde_json::from_str
或json!
宏。随后,我们在生成的对象上调用update
方法
let v = json!({ "software" : "MeldaDo", "version" : "1.0.0", "items♭" : []})
.as_object()
.expect("Not an object")
.clone();
m.update(v).expect("Failed to update");
对CRDT所做的更新现在处于暂存状态。为了持久化它们,我们需要对数据存储后端进行提交。我们使用commit
方法进行提交:我们可以传递一个包含一些附加信息的可选JSON对象,这些信息将随更新一起存储。请注意,在提交之前,我们可以执行所需的所有更新,但是如果没有对CRDT进行任何更新,则无法提交。
let info = json!({ "author" : "Alice", "description" : "First commit" })
.as_object()
.expect("Not an object")
.clone();
let commit_result = m.commit(Some(info));
commit
的结果可能是错误,如果没有要提交的更改则是None
,如果是提交了更改则是Some(String)
:字符串是提交标识符。成功后,在磁盘上的(todolist目录中)应该创建了以下内容
todolist/
├── 49
│ └── 49ccea4d5797250208edf9bc5d0b89edf23c30a61f5cb3fafb87069f07276a62.delta
└── b4
└── b4e50e445542c4737f4cfd7a9193ffd3be3794049d361d114a44f36434257cb3.pack
.delta
文件称为delta block
,包含CRDT中每个对象的版本信息,而.pack
文件是data pack
,存储每个对象的实际JSON内容。每次提交都会产生一个新的delta block(具有不同的名称,对应于其内容的哈希摘要)和可能的数据pack(如果生成了新的JSON值)。todolist目录的目录结构根据其前缀将文件组织到子目录中。
我们可以使用(再次)update
方法进行另一个更新,并提交生成的更改
let v = json!({ "software" : "MeldaDo", "version" : "1.0.0", "items♭" : [
{"_id" : "alice_todo_01", "title" : "Buy milk", "description" : "Go to the grocery store"}
]
})
.as_object()
.expect("Not an object")
.clone();
m.update(v).expect("Failed to update");
let info = json!({ "author" : "Alice", "description" : "Add buy milk" })
.as_object()
.expect("Not an object")
.clone();
let commit_result = m.commit(Some(info));
更改将反映在磁盘上(在相应的目录中创建新的pack和block)
todolist/
├── 2b
│ └── 2b0a463fcba92d5cf7dae531a5c40b67aaa0f45ab351c15613534fb5bba28564.pack
├── 49
│ └── 49ccea4d5797250208edf9bc5d0b89edf23c30a61f5cb3fafb87069f07276a62.delta
├── b4
│ └── b4e50e445542c4737f4cfd7a9193ffd3be3794049d361d114a44f36434257cb3.pack
└── b6
└── b6297035f06f13186160577099759dea843addcd1fbd05d24da87d9ac071da3b.delta
读取数据
在任何时候,都可以使用read
方法将CRDT的状态读回到一个JSON文档中
let data = m.read().expect("Failed to read");
let content = serde_json::to_string(&data).unwrap();
println!("{}", content);
以下附加代码将在终端打印以下内容
{"_id":"√","items♭":
[
{"_id":"alice_todo_01","description":"Go to the grocery store","title":"Buy milk"}
],
"software":"MeldaDo","version":"1.0.0"
}
Melda管理的每个对象都将包含具有对应唯一标识符的_id
字段。
共享数据
现在假设Alice将当前状态下的todolist
目录与Bob共享(她可以简单地压缩内容,并通过电子邮件将压缩文件发送给Bob)。我们假设Bob将内容保存在todolist_bob
目录中。Bob初始化Melda并可以执行一些更新
let adapter_bob =
Box::new(FilesystemAdapter::new("todolist_bob").expect("Cannot initialize adapter"));
let mut m_bob =
Melda::new(Arc::new(RwLock::new(adapter_bob))).expect("Failed to inizialize Melda");
let v = json!({ "software" : "MeldaDo", "version" : "1.0.0", "items♭" : [
{"_id" : "alice_todo_01", "title" : "Buy milk", "description" : "Go to the grocery store"},
{"_id" : "bob_todo_01", "title" : "Pay bills", "description" : "Withdraw 500 to pay bill"},
{"_id" : "bob_todo_02", "title" : "Call mom", "description" : "Call mom to schedule dinner"},
]
})
.as_object()
.expect("Not an object")
.clone();
m_bob.update(v).expect("Failed to update");
let info = json!({ "author" : "Bob", "description" : "Add some todos" })
.as_object()
.expect("Not an object")
.clone();
let commit_result = m_bob.commit(Some(info));
如您所注意到的,Bob添加了两个新项目。与此同时,Alice继续在其副本上工作,通过删除一个项目(alice_todo_01
)和添加一个新项目(alice_todo_02
)
let v = json!({ "software" : "MeldaDo", "version" : "1.0.0", "items♭" : [
{"_id" : "alice_todo_02", "title" : "Take picture of our dog", "description" : "It must be a nice one"}
]
})
.as_object()
.expect("Not an object")
.clone();
m.update(v).expect("Failed to update");
let info = json!({ "author" : "Alice", "description" : "Some more stuff to do" })
.as_object()
.expect("Not an object")
.clone();
let commit_result = m.commit(Some(info));
最后,Bob将自己的副本与Alice共享:现在Alice只需要将Bob发送过来的目录内容与本地目录合并。或者,假设Bob修改的数据位于Alice电脑上的todolist_bob目录。要将更改合并回todolist目录,Alice可以使用meld方法
let adapter_bob = Box::new(FilesystemAdapter::new("todolist_bob").expect("Cannot initialize adapter"));
let m_bob = Melda::new(Arc::new(RwLock::new(adapter_bob))).expect("Failed to inizialize Melda");
m.meld(&m_bob).expect("Failed to meld");
m.refresh();
refresh方法用于在合并操作后从存储后端加载更新。然后Alice可以使用以下方式读取CRDT的新状态:
let data = m.read().expect("Failed to read");
let content = serde_json::to_string(&data).unwrap();
println!("{}", content);
在终端上打印的结果应该看起来像:
{
"_id": "√",
"items♭": [
{
"_id": "bob_todo_01",
"description": "Withdraw 500 to pay bill",
"title": "Pay bills"
},
{
"_id": "bob_todo_02",
"description": "Call mom to schedule dinner",
"title": "Call mom"
},
{
"_id": "alice_todo_02",
"description": "It must be a nice one",
"title": "Take picture of our dog"
}
],
"software": "MeldaDo",
"version": "1.0.0"
}
如你所见,只有一个来自Alice的待办事项,以及Bob添加的两个待办事项。
获取提交历史
当我们向CRDT提交时,会创建一个新的delta块。每个块都与某些父块相链接,以创建一个链。提交时,Melda会寻找anchor块,即当前不被任何其他块引用为父块的块。我们可以使用get_anchors来获取当前锚点集合,因此如果Alice想要获取她的CRDT的锚点,她可以使用以下方式:
let anchors = m.get_anchors();
当我们提交时,锚点列表将仅包含已提交块的标识符,而当我们从另一个副本合并时,可能会出现多个锚点(对应提交树中的不同分支)。我们可以使用get_block方法获取每个块的其他信息。如果Alice想要获取锚点的额外信息,她可以运行以下代码:
let anchors = m.get_anchors();
for block_id in anchors {
if let Some(block) = m.get_block(&block_id).expect("Failed to get block") {
let parents = block.parents;
let info = block.info;
let packs = block.packs;
println!("Block {}", block_id);
println!("\t Information: {:?}", info);
println!("\t Parents: {:?}", parents);
println!("\t Packs: {:?}", packs);
}
}
parents字段包含父块的标识符(如果没有父块,则为None,即我们位于一个原始块),而可选的info字段对应于提交信息。最后,packs字段包含在提交过程中生成的数据包的标识符(如果没有为新提交生成新数据,则设置为None)。
回到过去
我们可以通过使用reload_until方法在提交之间导航。例如,假设Alice想要回到原始版本:
let mut anchors: Vec<String> = m.get_anchors().into_iter().collect();
let mut block_id = anchors.get(0).cloned();
while block_id.is_some() {
let block = m
.get_block(block_id.as_ref().unwrap())
.expect("Failed to get block")
.unwrap();
if let Some(parents) = block.parents {
let parents: Vec<String> = parents.into_iter().collect();
block_id = parents.get(0).cloned();
} else {
// We reached the origin
break;
}
}
m.reload_until(block_id.as_ref().unwrap())
.expect("Failed to reload until origin");
不出所料,现在读取的JSON文档显示了第一个版本。
{"_id":"√","items♭":[],"software":"MeldaDo","version":"1.0.0"}
Alice可以通过重新加载CRDT回到最后一个版本。
m.reload();
冲突和解决
当我们合并两个具有一些并发更新的副本时,很可能会出现冲突。在我们的场景中,根文档(标识符为√)有冲突,因为Alice和Bob都修改了他们在各自副本上的待办事项(后来被合并)。我们可以通过使用in_conflict方法检查冲突,该方法返回所有有冲突的对象集合。如果一个对象有冲突,我们可以使用get_conflicting方法检索冲突版本。Melda使用一个确定性算法选择一个胜利版本。我们可以使用get_winner方法获取当前胜利版本。最后,我们可以使用get_value方法查看与胜利或冲突版本关联的值。因此,Alice可以使用类似以下的方式来获取有冲突的对象及其对应值的摘要:
for uuid in m.in_conflict() {
let winner = m.get_winner(&uuid).unwrap();
let conflicting = m.get_conflicting(&uuid).unwrap();
println!("Winner: {:?} -> {:?}", winner, m.get_value(&uuid, &winner));
for c in conflicting {
println!("Conflict {:?}", m.get_value(&uuid, &c));
}
}
我们可以使用resolve_as方法来解决冲突。例如,如果Alice想要接受当前胜利者并解决所有冲突,她可以使用以下代码:
for uuid in m.in_conflict() {
let winner = m.get_winner(&uuid).unwrap();
m.resolve_as(&uuid, &winner).expect("Failed to resolve");
}
assert!(m.in_conflict().is_empty());
基准测试
在libmelda-benchmarks存储库中,你可以找到一个比较Melda和Automerge的基准测试。
示例集成
在Kibi存储库中,你可以找到一个将Melda集成到文本编辑器中的示例。还有一个名为libmelda-tools的项目,它实现了一个简单的命令行工具,用于更新、读取和合并Melda结构。
出版物
2022
Amos Brocco "Melda: 一个通用的Delta State JSON CRDT"。第9届分布式数据一致性原理与实践研讨会(PaPoC'22)。2022年4月。 PDF BibTex
2021
Amos Brocco "Delta-State JSON CRDT:将协作建立在稳固的基础上"。第23届国际分布式系统稳定性、安全性和安全性研讨会(SSS 2021)。2021年11月。 PDF BibTex
联系方式
amos dot brocco at supsi dot ch
许可证
(c)2021-2022 Amos Brocco,GPL v3(目前如此...但我会评估更改许可证——到类似BSD3/MIT/...的许可证,在不久的将来)
致谢
非常感谢@ngortheone(文档更新,示例)
依赖关系
~9–49MB
~868K SLoC