8 个不稳定版本 (3 个破坏性版本)

0.4.0 2024年5月8日
0.3.0 2024年5月7日
0.2.1 2023年2月13日
0.1.3 2018年12月4日
0.1.1 2018年7月30日

#61 in 异步

Download history 86/week @ 2024-04-28 621/week @ 2024-05-05 113/week @ 2024-05-12 142/week @ 2024-05-19 48/week @ 2024-05-26 108/week @ 2024-06-02 87/week @ 2024-06-09 124/week @ 2024-06-16 92/week @ 2024-06-23 82/week @ 2024-06-30 185/week @ 2024-07-07 78/week @ 2024-07-14 112/week @ 2024-07-21 204/week @ 2024-07-28 64/week @ 2024-08-04 88/week @ 2024-08-11

每月477次下载
3 个crates中使用(直接使用2个)

MIT/Apache

135KB
2.5K SLoC

tokio-zookeeper

Crates.io Documentation Build

此crate提供与Apache ZooKeeper交互的客户端,ZooKeeper是一种高度可靠的分布式服务,用于维护配置信息、命名、提供分布式同步以及提供组服务。

关于ZooKeeper

ZooKeeper概述提供了对ZooKeeper的全面介绍,但我们在此处重复最重要的几点。在它的核心,ZooKeeper是一个分层键值存储(即键可以有“子键”),并具有额外的机制,以保证客户端和服务器故障时的一致性操作。ZooKeeper中的键看起来像路径(例如,/key/subkey),路径上的每个项目都称为一个"Znode"。每个Znode(包括有子节点的Znode)还可以关联数据,可以像在其他键值存储中一样查询和更新。除了数据和子节点外,每个Znode还存储元信息,例如访问控制列表修改时间戳和一个版本号,允许客户端在访问值时避免互相干扰(稍后会有更多介绍)。

操作

ZooKeeper 的 API 包括您在文件系统中期望找到的基本操作:用于创建新 Znodes 的 create,用于删除它们的 delete,用于检查节点是否存在 的 exists,用于获取和设置节点关联数据 分别为 get_dataset_data,以及用于检索给定节点(即其子键)的子节点的 get_children。对于所有这些操作,ZooKeeper 提供了关于当有多个客户端与系统交互时会发生什么,甚至在系统和网络故障时的响应的 强烈保证

短暂节点

当您创建一个 Znode 时,您还可以指定一个 CreateMode。使用 CreateMode::Persistent 创建的节点是我们到目前为止讨论过的节点。它们在服务器上保留,直到您删除它们。另一方面,使用 CreateMode::Ephemeral 创建的节点是特殊的。这些 短暂节点 在创建它们的客户端断开连接时将被服务器自动删除。这对于实现类似租约的机制和检测故障很有用。由于它们会自动删除,并且带有子节点的节点不能直接删除,因此不允许短暂节点有子节点。

监视

除了上述方法之外,ZooKeeper::existsZooKeeper::get_dataZooKeeper::get_children 也支持在节点上设置 "监视"。监视是一个一次性触发器,当监视设置的状态改变时,将导致一个 WatchedEvent 被发送到设置监视的客户端。例如,对于监视的 get_data,当目标节点的数据在原始 get_data 调用的响应处理之后第一次更改时,将发送一次性通知。您应该查看 程序员指南中的“监视”部分 以获取详细信息。

入门指南

要启动ZooKeeper,请遵循官方的入门指南。在大多数Linux环境中,使基本设置运行的工作流程通常是安装zookeeper包,然后运行systemctl start zookeeper。之后,ZooKeeper将在127.0.0.1:2181上运行。

此实现

此库类似于官方Java实现提供的异步API,对于大多数操作,Java文档应适用于Rust实现。如果不是这种情况,则被认为是一个错误,我们非常欢迎您提供尽可能多的相关信息。

请注意,由于此实现是异步的,客户端用户必须注意不要在其自己的代码中重新排序操作。官方文档中对此有一些讨论(Java绑定)

有关ZooKeeper的更多信息,请参阅ZooKeeper程序员指南Confluence ZooKeeper维基。还有一个基本教程(使用Java客户端)在此

与Tokio的交互

此crate中的futures期望在tokio::runtime::Runtime下运行。在常见情况下,它们将通过在通过#[tokio::main]#[tokio::test]执行的环境中通过.await它们来执行。您也可以显式创建一个tokio::runtime::Runtime,然后使用Runtime::block_onRuntime::spawn

一个有点愚蠢的例子

use tokio_zookeeper::*;
use futures::prelude::*;

let (zk, default_watcher) = ZooKeeper::connect(&"127.0.0.1:2181".parse().unwrap())
    .await
    .unwrap();

// let's first check if /example exists. the .watch() causes us to be notified
// the next time the "exists" status of /example changes after the call.
let stat = zk.watch().exists("/example").await.unwrap();
// initially, /example does not exist
assert_eq!(stat, None);
// so let's make it!
let path = zk
    .create(
        "/example",
        &b"Hello world"[..],
        Acl::open_unsafe(),
        CreateMode::Persistent,
    )
    .await
    .unwrap();
assert_eq!(path.as_deref(), Ok("/example"));

// does it exist now?
let stat = zk.watch().exists("/example").await.unwrap();
// looks like it!
// note that the creation above also triggered our "exists" watch!
assert_eq!(stat.unwrap().data_length as usize, b"Hello world".len());

// did the data get set correctly?
let res = zk.get_data("/example").await.unwrap();
let data = b"Hello world";
let res = res.unwrap();
assert_eq!(res.0, data);
assert_eq!(res.1.data_length as usize, data.len());

// let's update the data.
let stat = zk
    .set_data("/example", Some(res.1.version), &b"Bye world"[..])
    .await
    .unwrap();
assert_eq!(stat.unwrap().data_length as usize, "Bye world".len());

// create a child of /example
let path = zk
    .create(
        "/example/more",
        &b"Hello more"[..],
        Acl::open_unsafe(),
        CreateMode::Persistent,
    )
    .await
    .unwrap();
assert_eq!(path.as_deref(), Ok("/example/more"));

// it should be visible as a child of /example
let children = zk.get_children("/example").await.unwrap();
assert_eq!(children, Some(vec!["more".to_string()]));

// it is not legal to delete a node that has children directly
let res = zk.delete("/example", None).await.unwrap();
assert_eq!(res, Err(error::Delete::NotEmpty));
// instead we must delete the children first
let res = zk.delete("/example/more", None).await.unwrap();
assert_eq!(res, Ok(()));
let res = zk.delete("/example", None).await.unwrap();
assert_eq!(res, Ok(()));
// no /example should no longer exist!
let stat = zk.exists("/example").await.unwrap();
assert_eq!(stat, None);

// now let's check that the .watch().exists we did in the very
// beginning actually triggered!
let (event, _w) = default_watcher.into_future().await;
assert_eq!(
    event,
    Some(WatchedEvent {
        event_type: WatchedEventType::NodeCreated,
        keeper_state: KeeperState::SyncConnected,
        path: String::from("/example"),
    })
);

谢谢

此crate最初由Jon Gjengset (@jonhoo)作为其长期系列的一部分开发,从此视频开始。

依赖关系

~4–13MB
~134K SLoC