#kafka #持久化 #yrs #crdt #rocksdb

yrs-kafka

使用RocksDB和Kafka进行Yrs同步和持久化

1 个不稳定版本

0.1.0 2024年3月31日

数据库接口 中排名第1496

WTFPL许可协议

340KB
523 代码行

yrs-kafka

Yrs 使用Kafka和一个由压缩主题喂食的临时RocksDB实例进行同步和持久化。

用法

主题配置

使用yrs-kafka需要两个主题,一个压缩主题和一个变更日志主题。要使用Redpanda配置这两个主题,请运行以下命令

rpk topic create y-compacted -c cleanup.policy=compact
rpk topic create y-changelog 

在应用中

use yrs::{updates::decoder::Decode, Text, Transact, Update};

async fn run() {
    let yrs_kafka = yrs_kafka::start(yrs_kafka::config::Config {
        db_path: "/tmp/yrs-kafka".into(),
        kafka: yrs_kafka::config::KafkaConfig {
            brokers: vec!["localhost:9092".to_string()],
            group_id: "yrs-kafka".to_string(),
            changelog_topic: "y-changelog".to_string(),
            compacted_topic: "y-compacted".to_string(),
        },
    })
    .unwrap();

    let doc = yrs::Doc::new();

    let update = yrs_kafka.load_document("my-document-id").await.unwrap();
    let update = update.get().as_deref().unwrap();
    doc.transact_mut().apply_update(Update::decode_v1(update).unwrap());

    let update = {
        let text = doc.get_or_insert_text("article");
        let mut txn = doc.transact_mut();
        text.insert(&mut txn, 0, "hello");
        text.insert(&mut txn, 5, " world");
        txn.encode_update_v1()
    };

    yrs_kafka.update(b"my-document-id", update).await.unwrap();
}

设计

更新

通过变更日志主题的分区所有权,为文档分配了一个事实上的“所有者”。对给定文档的所有更改都由所有者处理,并将其合并到压缩主题中,该主题被服务的其他实例消费。

发布到压缩主题的更改将持久化到本地实例的临时RocksDB中,除非文档所有者,它们在从变更日志读取时将其合并到自己的RocksDB实例中。

Update diagram

读取

读取文档状态完全使用实例的本地临时RocksDB执行。

Read diagram

依赖项

~44MB
~802K SLoC