8 个不稳定版本 (3 个破坏性版本)
0.4.0 | 2024年5月8日 |
---|---|
0.3.0 | 2024年5月7日 |
0.2.1 | 2023年2月13日 |
0.1.3 | 2018年12月4日 |
0.1.1 | 2018年7月30日 |
#61 in 异步
每月477次下载
在3 个crates中使用(直接使用2个)
135KB
2.5K SLoC
tokio-zookeeper
此crate提供与Apache ZooKeeper交互的客户端,ZooKeeper是一种高度可靠的分布式服务,用于维护配置信息、命名、提供分布式同步以及提供组服务。
关于ZooKeeper
ZooKeeper概述提供了对ZooKeeper的全面介绍,但我们在此处重复最重要的几点。在它的核心,ZooKeeper是一个分层键值存储(即键可以有“子键”),并具有额外的机制,以保证客户端和服务器故障时的一致性操作。ZooKeeper中的键看起来像路径(例如,/key/subkey
),路径上的每个项目都称为一个"Znode"。每个Znode(包括有子节点的Znode)还可以关联数据,可以像在其他键值存储中一样查询和更新。除了数据和子节点外,每个Znode还存储元信息,例如访问控制列表、修改时间戳和一个版本号,允许客户端在访问值时避免互相干扰(稍后会有更多介绍)。
操作
ZooKeeper 的 API 包括您在文件系统中期望找到的基本操作:用于创建新 Znodes 的 create
,用于删除它们的 delete
,用于检查节点是否存在 的 exists
,用于获取和设置节点关联数据 分别为 get_data
和 set_data
,以及用于检索给定节点(即其子键)的子节点的 get_children
。对于所有这些操作,ZooKeeper 提供了关于当有多个客户端与系统交互时会发生什么,甚至在系统和网络故障时的响应的 强烈保证。
短暂节点
当您创建一个 Znode 时,您还可以指定一个 CreateMode
。使用 CreateMode::Persistent
创建的节点是我们到目前为止讨论过的节点。它们在服务器上保留,直到您删除它们。另一方面,使用 CreateMode::Ephemeral
创建的节点是特殊的。这些 短暂节点 在创建它们的客户端断开连接时将被服务器自动删除。这对于实现类似租约的机制和检测故障很有用。由于它们会自动删除,并且带有子节点的节点不能直接删除,因此不允许短暂节点有子节点。
监视
除了上述方法之外,ZooKeeper::exists
、ZooKeeper::get_data
和 ZooKeeper::get_children
也支持在节点上设置 "监视"。监视是一个一次性触发器,当监视设置的状态改变时,将导致一个 WatchedEvent
被发送到设置监视的客户端。例如,对于监视的 get_data
,当目标节点的数据在原始 get_data
调用的响应处理之后第一次更改时,将发送一次性通知。您应该查看 程序员指南中的“监视”部分 以获取详细信息。
入门指南
要启动ZooKeeper,请遵循官方的入门指南。在大多数Linux环境中,使基本设置运行的工作流程通常是安装zookeeper
包,然后运行systemctl start zookeeper
。之后,ZooKeeper将在127.0.0.1:2181
上运行。
此实现
此库类似于官方Java实现提供的异步API,对于大多数操作,Java文档应适用于Rust实现。如果不是这种情况,则被认为是一个错误,我们非常欢迎您提供尽可能多的相关信息。
请注意,由于此实现是异步的,客户端用户必须注意不要在其自己的代码中重新排序操作。官方文档中对此有一些讨论(Java绑定)。
有关ZooKeeper的更多信息,请参阅ZooKeeper程序员指南和Confluence ZooKeeper维基。还有一个基本教程(使用Java客户端)在此。
与Tokio的交互
此crate中的futures期望在tokio::runtime::Runtime
下运行。在常见情况下,它们将通过在通过#[tokio::main]
或#[tokio::test]
执行的环境中通过.await
它们来执行。您也可以显式创建一个tokio::runtime::Runtime
,然后使用Runtime::block_on
或Runtime::spawn
。
一个有点愚蠢的例子
use tokio_zookeeper::*;
use futures::prelude::*;
let (zk, default_watcher) = ZooKeeper::connect(&"127.0.0.1:2181".parse().unwrap())
.await
.unwrap();
// let's first check if /example exists. the .watch() causes us to be notified
// the next time the "exists" status of /example changes after the call.
let stat = zk.watch().exists("/example").await.unwrap();
// initially, /example does not exist
assert_eq!(stat, None);
// so let's make it!
let path = zk
.create(
"/example",
&b"Hello world"[..],
Acl::open_unsafe(),
CreateMode::Persistent,
)
.await
.unwrap();
assert_eq!(path.as_deref(), Ok("/example"));
// does it exist now?
let stat = zk.watch().exists("/example").await.unwrap();
// looks like it!
// note that the creation above also triggered our "exists" watch!
assert_eq!(stat.unwrap().data_length as usize, b"Hello world".len());
// did the data get set correctly?
let res = zk.get_data("/example").await.unwrap();
let data = b"Hello world";
let res = res.unwrap();
assert_eq!(res.0, data);
assert_eq!(res.1.data_length as usize, data.len());
// let's update the data.
let stat = zk
.set_data("/example", Some(res.1.version), &b"Bye world"[..])
.await
.unwrap();
assert_eq!(stat.unwrap().data_length as usize, "Bye world".len());
// create a child of /example
let path = zk
.create(
"/example/more",
&b"Hello more"[..],
Acl::open_unsafe(),
CreateMode::Persistent,
)
.await
.unwrap();
assert_eq!(path.as_deref(), Ok("/example/more"));
// it should be visible as a child of /example
let children = zk.get_children("/example").await.unwrap();
assert_eq!(children, Some(vec!["more".to_string()]));
// it is not legal to delete a node that has children directly
let res = zk.delete("/example", None).await.unwrap();
assert_eq!(res, Err(error::Delete::NotEmpty));
// instead we must delete the children first
let res = zk.delete("/example/more", None).await.unwrap();
assert_eq!(res, Ok(()));
let res = zk.delete("/example", None).await.unwrap();
assert_eq!(res, Ok(()));
// no /example should no longer exist!
let stat = zk.exists("/example").await.unwrap();
assert_eq!(stat, None);
// now let's check that the .watch().exists we did in the very
// beginning actually triggered!
let (event, _w) = default_watcher.into_future().await;
assert_eq!(
event,
Some(WatchedEvent {
event_type: WatchedEventType::NodeCreated,
keeper_state: KeeperState::SyncConnected,
path: String::from("/example"),
})
);
谢谢
此crate最初由Jon Gjengset (@jonhoo)作为其长期系列流的一部分开发,从此视频开始。
依赖关系
~4–13MB
~134K SLoC