2 个不稳定版本
0.2.0 | 2020 年 5 月 15 日 |
---|---|
0.1.3 | 2020 年 5 月 18 日 |
#974 in 异步
145KB
2.5K SLoC
tokio-zookeeper
此 crate 提供了一个与 Apache ZooKeeper 交互的客户端,ZooKeeper 是一个用于维护配置信息、命名、提供分布式同步和提供组服务的非常可靠的服务。
关于 ZooKeeper
ZooKeeper 概述 提供了 ZooKeeper 的全面介绍,但我们在这里会重复最重要的几点。在本质上,ZooKeeper 是一个 层次化键值存储(即键可以有“子键”),它具有保证客户端和服务器故障时一致操作的附加机制。ZooKeeper 中的键看起来像路径(例如,/key/subkey
),路径上的每个项目都称为一个 "Znode"。每个 Znode(包括具有子节点的 Znode)还可以关联数据,这些数据可以像在其他键值存储中一样查询和更新。除了其数据和子节点外,每个 Znode 还存储元信息,例如 访问控制列表、修改时间戳 和版本号,允许客户端在访问值时避免相互干扰(稍后会详细介绍)。
操作
ZooKeeper的API包含了你期望在文件系统中找到的基本操作:用于创建新的Z节点,create
;用于删除节点,delete
;用于检查节点是否存在,exists
;用于获取节点数据,get_data
和用于设置节点数据,set_data
;用于获取指定节点的子节点(即其子键),get_children
。对于所有这些操作,ZooKeeper都提供了强保证,包括当有多个客户端与系统交互时会发生什么,以及系统和网络故障时的响应。
临时节点
当你创建一个Z节点时,你还会指定一个CreateMode
。使用CreateMode::Persistent
创建的节点是迄今为止我们讨论过的节点。它们会保留在服务器上,直到你删除它们。另一方面,使用CreateMode::Ephemeral
创建的节点是特殊的。这些临时节点在创建它们的客户端断开连接时会被服务器自动删除。这可以用于实现类似租约的机制,并用于检测故障。由于它们会自动删除,并且带有子节点的节点不能直接删除,因此临时节点不允许有子节点。
监视器
除了上述方法外,ZooKeeper::exists
、ZooKeeper::get_data
和 ZooKeeper::get_children
也支持在节点上设置 "监视器"。监视器是一次性触发器,当监视器所设置的节点状态发生变化时,会将WatchedEvent
发送到设置监视器的客户端。例如,对于监视的get_data
,当目标节点数据在处理原始get_data
调用响应后首次发生变化时,将发送一次通知。有关详细信息,请参阅程序员指南中的“监视器”部分。
入门指南
要启动 ZooKeeper,请遵循官方的入门指南。在大多数 Linux 环境中,使基本设置正常工作的步骤通常是仅安装 zookeeper
软件包,然后运行 systemctl start zookeeper
。ZooKeeper 将在 127.0.0.1:2181
上运行。
本实现
这个库与官方 Java 实现提供的异步 API 类似,对于大多数操作,Java 文档应适用于 Rust 实现。如果不是这样,则被视为 一个错误,我们非常欢迎您提供尽可能多的相关信息。
请注意,由于本实现是异步的,客户端的用户必须在其代码中注意不要重新排序操作。官方文档的Java 绑定部分对此有一些讨论。
有关 ZooKeeper 的更多信息,请参阅ZooKeeper 编程指南和Confluence ZooKeeper 维基。还有一个基本的教程(使用 Java 客户端),请参阅这里。
与 Tokio 的交互
本库中的 futures 期望在 tokio::Runtime
下运行。在常见情况下,您不能仅使用 .wait()
来解决它们,而应使用 tokio::run
或显式创建一个 tokio::Runtime
,然后使用 Runtime::block_on
。
一个有点愚蠢的例子
extern crate tokio;
#[macro_use]
extern crate failure;
extern crate tokio_zookeeper;
use tokio_zookeeper::*;
use tokio::prelude::*;
tokio::run(
ZooKeeper::connect(&"127.0.0.1:2181".parse().unwrap())
.and_then(|(zk, default_watcher)| {
// let's first check if /example exists. the .watch() causes us to be notified
// the next time the "exists" status of /example changes after the call.
zk.watch()
.exists("/example")
.inspect(|(_, stat)| {
// initially, /example does not exist
assert_eq!(stat, &None)
})
.and_then(|(zk, _)| {
// so let's make it!
zk.create(
"/example",
&b"Hello world"[..],
Acl::open_unsafe(),
CreateMode::Persistent,
)
})
.inspect(|(_, ref path)| {
assert_eq!(path.as_ref().map(String::as_str), Ok("/example"))
})
.and_then(|(zk, _)| {
// does it exist now?
zk.watch().exists("/example")
})
.inspect(|(_, stat)| {
// looks like it!
// note that the creation above also triggered our "exists" watch!
assert_eq!(stat.unwrap().data_length as usize, b"Hello world".len())
})
.and_then(|(zk, _)| {
// did the data get set correctly?
zk.get_data("/example")
})
.inspect(|(_, res)| {
let data = b"Hello world";
let res = res.as_ref().unwrap();
assert_eq!(res.0, data);
assert_eq!(res.1.data_length as usize, data.len());
})
.and_then(|(zk, res)| {
// let's update the data.
zk.set_data("/example", Some(res.unwrap().1.version), &b"Bye world"[..])
})
.inspect(|(_, stat)| {
assert_eq!(stat.unwrap().data_length as usize, "Bye world".len());
})
.and_then(|(zk, _)| {
// create a child of /example
zk.create(
"/example/more",
&b"Hello more"[..],
Acl::open_unsafe(),
CreateMode::Persistent,
)
})
.inspect(|(_, ref path)| {
assert_eq!(path.as_ref().map(String::as_str), Ok("/example/more"))
})
.and_then(|(zk, _)| {
// it should be visible as a child of /example
zk.get_children("/example")
})
.inspect(|(_, children)| {
assert_eq!(children, &Some(vec!["more".to_string()]));
})
.and_then(|(zk, _)| {
// it is not legal to delete a node that has children directly
zk.delete("/example", None)
})
.inspect(|(_, res)| assert_eq!(res, &Err(error::Delete::NotEmpty)))
.and_then(|(zk, _)| {
// instead we must delete the children first
zk.delete("/example/more", None)
})
.inspect(|(_, res)| assert_eq!(res, &Ok(())))
.and_then(|(zk, _)| zk.delete("/example", None))
.inspect(|(_, res)| assert_eq!(res, &Ok(())))
.and_then(|(zk, _)| {
// no /example should no longer exist!
zk.exists("/example")
})
.inspect(|(_, stat)| assert_eq!(stat, &None))
.and_then(move |(zk, _)| {
// now let's check that the .watch().exists we did in the very
// beginning actually triggered!
default_watcher
.into_future()
.map(move |x| (zk, x))
.map_err(|e| format_err!("stream error: {:?}", e.0))
})
.inspect(|(_, (event, _))| {
assert_eq!(
event,
&Some(WatchedEvent {
event_type: WatchedEventType::NodeCreated,
keeper_state: KeeperState::SyncConnected,
path: String::from("/example"),
})
);
})
})
.map(|_| ())
.map_err(|e| panic!("{:?}", e)),
);
现场编码
该库是作为现场编码系列的一部分进行开发的,旨在为已经对 Rust 有一定了解的用户提供,他们想看到更大、更复杂的东西被构建。对于与 futures 相关的内容,我也强烈推荐 @aturon 的Rust 中的异步编程书籍。
您可以在这个 YouTube 播单中找到过去会议的录音。这个库最初在这个视频中出现,然后在这个后续视频中更加完善,最后在第 3 部分中基本完成。如果您想跟上,我还建议您查看ZooKeeper 编程指南。要了解未来的流媒体更新,请关注我的Patreon或Twitter。
谢谢
对于我建设的每一个项目,我都想感谢那些愿意并且能够额外一步支持我在 Patreon 或 Liberapay 上制作这些视频的人。我由衷地感谢你们,我非常激动,你们觉得我做的事情足够有趣,以至于愿意给一个陌生人钱去做他们热爱的事情!
- 罗德里戈·瓦林
- 鸽子F
- 帕特里克·艾伦
- 马修·奈特
依赖项
~5MB
~88K SLoC