#zookeeper #tokio #async #client-server #api-bindings

joyent-tokio-zookeeper

用于与 Apache ZooKeeper 交互的异步客户端库

2 个不稳定版本

0.2.0 2020 年 5 月 15 日
0.1.3 2020 年 5 月 18 日

#974 in 异步


用于 cueball-manatee-primary-r…

MIT/Apache

145KB
2.5K SLoC

tokio-zookeeper

Crates.io Documentation Build Status

此 crate 提供了一个与 Apache ZooKeeper 交互的客户端,ZooKeeper 是一个用于维护配置信息、命名、提供分布式同步和提供组服务的非常可靠的服务。

关于 ZooKeeper

ZooKeeper 概述 提供了 ZooKeeper 的全面介绍,但我们在这里会重复最重要的几点。在本质上,ZooKeeper 是一个 层次化键值存储(即键可以有“子键”),它具有保证客户端和服务器故障时一致操作的附加机制。ZooKeeper 中的键看起来像路径(例如,/key/subkey),路径上的每个项目都称为一个 "Znode"。每个 Znode(包括具有子节点的 Znode)还可以关联数据,这些数据可以像在其他键值存储中一样查询和更新。除了其数据和子节点外,每个 Znode 还存储元信息,例如 访问控制列表修改时间戳 和版本号,允许客户端在访问值时避免相互干扰(稍后会详细介绍)。

操作

ZooKeeper的API包含了你期望在文件系统中找到的基本操作:用于创建新的Z节点,create;用于删除节点,delete;用于检查节点是否存在,exists;用于获取节点数据,get_data 和用于设置节点数据,set_data;用于获取指定节点的子节点(即其子键),get_children。对于所有这些操作,ZooKeeper都提供了强保证,包括当有多个客户端与系统交互时会发生什么,以及系统和网络故障时的响应。

临时节点

当你创建一个Z节点时,你还会指定一个CreateMode。使用CreateMode::Persistent创建的节点是迄今为止我们讨论过的节点。它们会保留在服务器上,直到你删除它们。另一方面,使用CreateMode::Ephemeral创建的节点是特殊的。这些临时节点在创建它们的客户端断开连接时会被服务器自动删除。这可以用于实现类似租约的机制,并用于检测故障。由于它们会自动删除,并且带有子节点的节点不能直接删除,因此临时节点不允许有子节点。

监视器

除了上述方法外,ZooKeeper::existsZooKeeper::get_dataZooKeeper::get_children 也支持在节点上设置 "监视器"。监视器是一次性触发器,当监视器所设置的节点状态发生变化时,会将WatchedEvent发送到设置监视器的客户端。例如,对于监视的get_data,当目标节点数据在处理原始get_data调用响应后首次发生变化时,将发送一次通知。有关详细信息,请参阅程序员指南中的“监视器”部分

入门指南

要启动 ZooKeeper,请遵循官方的入门指南。在大多数 Linux 环境中,使基本设置正常工作的步骤通常是仅安装 zookeeper 软件包,然后运行 systemctl start zookeeper。ZooKeeper 将在 127.0.0.1:2181 上运行。

本实现

这个库与官方 Java 实现提供的异步 API 类似,对于大多数操作,Java 文档应适用于 Rust 实现。如果不是这样,则被视为 一个错误,我们非常欢迎您提供尽可能多的相关信息。

请注意,由于本实现是异步的,客户端的用户必须在其代码中注意不要重新排序操作。官方文档的Java 绑定部分对此有一些讨论。

有关 ZooKeeper 的更多信息,请参阅ZooKeeper 编程指南Confluence ZooKeeper 维基。还有一个基本的教程(使用 Java 客户端),请参阅这里

与 Tokio 的交互

本库中的 futures 期望在 tokio::Runtime 下运行。在常见情况下,您不能仅使用 .wait() 来解决它们,而应使用 tokio::run 或显式创建一个 tokio::Runtime,然后使用 Runtime::block_on

一个有点愚蠢的例子

extern crate tokio;
#[macro_use]
extern crate failure;
extern crate tokio_zookeeper;

use tokio_zookeeper::*;
use tokio::prelude::*;

tokio::run(
    ZooKeeper::connect(&"127.0.0.1:2181".parse().unwrap())
        .and_then(|(zk, default_watcher)| {
            // let's first check if /example exists. the .watch() causes us to be notified
            // the next time the "exists" status of /example changes after the call.
            zk.watch()
                .exists("/example")
                .inspect(|(_, stat)| {
                    // initially, /example does not exist
                    assert_eq!(stat, &None)
                })
                .and_then(|(zk, _)| {
                    // so let's make it!
                    zk.create(
                        "/example",
                        &b"Hello world"[..],
                        Acl::open_unsafe(),
                        CreateMode::Persistent,
                    )
                })
                .inspect(|(_, ref path)| {
                    assert_eq!(path.as_ref().map(String::as_str), Ok("/example"))
                })
                .and_then(|(zk, _)| {
                    // does it exist now?
                    zk.watch().exists("/example")
                })
                .inspect(|(_, stat)| {
                    // looks like it!
                    // note that the creation above also triggered our "exists" watch!
                    assert_eq!(stat.unwrap().data_length as usize, b"Hello world".len())
                })
                .and_then(|(zk, _)| {
                    // did the data get set correctly?
                    zk.get_data("/example")
                })
                .inspect(|(_, res)| {
                    let data = b"Hello world";
                    let res = res.as_ref().unwrap();
                    assert_eq!(res.0, data);
                    assert_eq!(res.1.data_length as usize, data.len());
                })
                .and_then(|(zk, res)| {
                    // let's update the data.
                    zk.set_data("/example", Some(res.unwrap().1.version), &b"Bye world"[..])
                })
                .inspect(|(_, stat)| {
                    assert_eq!(stat.unwrap().data_length as usize, "Bye world".len());
                })
                .and_then(|(zk, _)| {
                    // create a child of /example
                    zk.create(
                        "/example/more",
                        &b"Hello more"[..],
                        Acl::open_unsafe(),
                        CreateMode::Persistent,
                    )
                })
                .inspect(|(_, ref path)| {
                    assert_eq!(path.as_ref().map(String::as_str), Ok("/example/more"))
                })
                .and_then(|(zk, _)| {
                    // it should be visible as a child of /example
                    zk.get_children("/example")
                })
                .inspect(|(_, children)| {
                    assert_eq!(children, &Some(vec!["more".to_string()]));
                })
                .and_then(|(zk, _)| {
                    // it is not legal to delete a node that has children directly
                    zk.delete("/example", None)
                })
                .inspect(|(_, res)| assert_eq!(res, &Err(error::Delete::NotEmpty)))
                .and_then(|(zk, _)| {
                    // instead we must delete the children first
                    zk.delete("/example/more", None)
                })
                .inspect(|(_, res)| assert_eq!(res, &Ok(())))
                .and_then(|(zk, _)| zk.delete("/example", None))
                .inspect(|(_, res)| assert_eq!(res, &Ok(())))
                .and_then(|(zk, _)| {
                    // no /example should no longer exist!
                    zk.exists("/example")
                })
                .inspect(|(_, stat)| assert_eq!(stat, &None))
                .and_then(move |(zk, _)| {
                    // now let's check that the .watch().exists we did in the very
                    // beginning actually triggered!
                    default_watcher
                        .into_future()
                        .map(move |x| (zk, x))
                        .map_err(|e| format_err!("stream error: {:?}", e.0))
                })
                .inspect(|(_, (event, _))| {
                    assert_eq!(
                        event,
                        &Some(WatchedEvent {
                            event_type: WatchedEventType::NodeCreated,
                            keeper_state: KeeperState::SyncConnected,
                            path: String::from("/example"),
                        })
                    );
                })
        })
        .map(|_| ())
        .map_err(|e| panic!("{:?}", e)),
);

现场编码

该库是作为现场编码系列的一部分进行开发的,旨在为已经对 Rust 有一定了解的用户提供,他们想看到更大、更复杂的东西被构建。对于与 futures 相关的内容,我也强烈推荐 @aturon 的Rust 中的异步编程书籍

您可以在这个 YouTube 播单中找到过去会议的录音。这个库最初在这个视频中出现,然后在这个后续视频中更加完善,最后在第 3 部分中基本完成。如果您想跟上,我还建议您查看ZooKeeper 编程指南。要了解未来的流媒体更新,请关注我的PatreonTwitter

谢谢

对于我建设的每一个项目,我都想感谢那些愿意并且能够额外一步支持我在 PatreonLiberapay 上制作这些视频的人。我由衷地感谢你们,我非常激动,你们觉得我做的事情足够有趣,以至于愿意给一个陌生人钱去做他们热爱的事情!

  • 罗德里戈·瓦林
  • 鸽子F
  • 帕特里克·艾伦
  • 马修·奈特

依赖项

~5MB
~88K SLoC