48个版本 (破坏性更新)
0.36.2 | 2024年1月16日 |
---|---|
0.36.0 | 2023年11月8日 |
0.33.2 | 2023年7月6日 |
0.29.0 | 2022年10月29日 |
0.3.0 | 2016年11月26日 |
6 in 异步
534,813 每月下载量
用于 106 个Crates(95个直接使用)
10MB
138K SLoC
rust-rdkafka
一个完全异步的、启用futures的Apache Kafka客户端库,基于librdkafka。
库
rust-rdkafka
提供了对librdkafka的安全Rust接口。本版本与librdkafka v1.9.2+兼容。
文档
功能
目前提供的主要功能包括
- 支持自0.8.x以来的所有Kafka版本。有关代理兼容性选项的更多信息,请参阅librdkafka文档。
- 从单个或多个主题消费。
- 自动消费者再平衡。
- 可自定义再平衡,带有再平衡前和再平衡后的回调。
- 同步或异步消息生产。
- 可自定义偏移量提交。
- 创建和删除主题,以及添加和编辑分区。
- 修改代理和主题配置。
- 访问集群元数据(主题分区列表、副本、活动代理等)。
- 访问组元数据(列出组、列出组的成员、主机名等)。
- 访问生产者和消费者指标、错误和回调。
- 通过幂等和事务性生产者和只读提交消费者实现一次且仅一次语义(EOS)。
每秒一百万条消息
rust-rdkafka
通过Rust编写的抽象层,设计得既易于使用又安全,同时得益于librdkafka C库,执行速度极快。
以下是使用 BaseProducer
的基准测试结果,向运行在本地的单个 Kafka 0.11 进程(默认配置,3 个分区)发送数据。硬件:戴尔笔记本电脑,配备 Intel Core i7-4712HQ @ 2.30GHz。
-
场景:生产500万条消息,每条10字节,等待所有消息被确认
- 1045413 条消息/秒,9.970 MB/秒(5次运行的平均值)
-
场景:生产10万条消息,每条10 KB,等待所有消息被确认
- 24623 条消息/秒,234.826 MB/秒(5次运行的平均值)
更多信息,请查看 kafka-benchmark 项目。
客户端类型
rust-rdkafka
提供了低级和高级消费者和生产者。
低级
BaseConsumer
:librdkafka消费者的简单封装。必须定期调用poll()
来执行回调、重新平衡和接收消息。BaseProducer
:librdkafka生产者的简单封装。与消费者类似,用户必须定期调用poll()
来执行投递回调。ThreadedProducer
:具有单独线程的BaseProducer
,该线程专门用于轮询生产者。
高级
StreamConsumer
:自动轮询消费者的消息流。FutureProducer
:消息送达 Kafka(或失败)后,将完成的Future
。
有关消费者和生产者的更多信息,请参阅它们的模块级文档。
警告:该库正在积极开发中,API 可能会更改。
使用 Tokio 进行异步数据处理
Tokio 是一个用于在 Rust 中快速处理异步事件的平台。由 StreamConsumer
和 FutureProducer
提供的接口允许 rust-rdkafka 用户轻松地将 Kafka 消费者和生产者集成到 Tokio 平台,并编写异步消息处理代码。请注意,可以使用 rust-rdkafka 而不使用 Tokio。
要查看 rust-rdkafka 与 Tokio 一起使用的示例,请查看示例文件夹中的 异步处理示例。
至少一次投递
至少一次投递语义在许多流式应用程序中很常见:每条消息都保证至少被处理一次;在临时失败的情况下,消息可以被重新处理和/或重新投递,但不会丢失任何消息。
为了实现至少一次投递,流处理应用程序必须在消息被处理后才谨慎提交偏移量。相反,过早提交偏移量可能会导致消息丢失,因为在恢复时消费者将从下一个消息开始,跳过发生故障的那个消息。
要了解如何使用 rdkafka
实现至少一次交付,请查看示例文件夹中的 至少一次交付示例。要了解更多关于交付语义的信息,请查看 Kafka 文档中的 消息交付语义 章节。
精确一次语义
精确一次语义(EOS)可以通过使用事务型生产者来实现,这允许生产的记录和消费者偏移量以原子方式提交或中止。将他们的 isolation.level
设置为 read_committed
的消费者将只会观察到已提交的消息。
EOS 在需要消息恰好处理一次的读-处理-写场景中非常有用。
要了解如何在 rust-rdkafka 中使用事务,请参阅生产者文档中的 事务 部分。
用户
以下是使用 rust-rdkafka 的部分项目:
- timely-dataflow:一个分布式数据并行计算引擎。还可以查看宣布其 Kafka 集成功能的 博客文章。
- kafka-view:Kafka 集群的 Web 界面。
- kafka-benchmark:Kafka 的高性能基准测试工具。
- callysto:Rust 中的流处理框架。
- bytewax:使用 Timely Dataflow 的 Python 流处理框架。
如果您正在使用 rust-rdkafka,请告诉我们!
安装
将此内容添加到您的 Cargo.toml
[dependencies]
rdkafka = { version = "0.25", features = ["cmake-build"] }
此包将从源代码编译 librdkafka 并将其静态链接到您的可执行文件。要编译 librdkafka,您将需要
- GNU 工具链
- GNU
make
pthreads
zlib
:可选,但默认包含(功能:libz
)cmake
:可选,默认不包含(功能:cmake-build
)libssl-dev
:可选,默认不包含(功能:ssl
)libsasl2-dev
:可选,默认不包含(功能:gssapi
)libzstd-dev
:可选,默认不包含(功能:zstd-pkg-config
)
请注意,如果您可以使用 CMake,则鼓励使用 CMake 构建系统(通过 cmake-build
功能)进行编译。
默认情况下,将使用一个子模块,其中包含针对特定提交的 librdkafka 源代码,以编译和静态链接库。可以使用 dynamic-linking
功能将 rdkafka 动态链接到系统中的 librdkafka 版本。示例
[dependencies]
rdkafka = { version = "0.25", features = ["dynamic-linking"] }
要查看功能列表,请参阅 rdkafka-sys 包的文档。rdkafka-sys 的所有功能都作为 rdkafka 功能重新导出。
最低支持的 Rust 版本(MSRV)
当前最低支持的 Rust 版本(MSRV)是 1.61.0。请注意,提高 MSRV 并不被视为破坏性变更。任何 rust-rdkafka 的发布都可能提高 MSRV。
异步运行时
StreamConsumer和FutureProducer的一些特性依赖于Tokio,这可能对只打算使用低级消费者和生产者的用户来说是一个重量级的依赖。Tokio集成默认启用,但可以通过关闭默认功能来禁用它。
[dependencies]
rdkafka = { version = "0.25", default-features = false }
如果您想使用除Tokio之外的异步运行时,可以通过提供一个实现AsyncRuntime
特质的shim来与rust-rdkafka集成。有关详细信息,请参阅以下示例。
示例
您可以在examples
文件夹中找到示例。要运行它们
cargo run --example <example_name> -- <example_args>
调试
rust-rdkafka使用log
crate来处理日志。可选地,启用tracing
功能以发出tracing
事件,而不是log
记录。
在测试和示例中,rust-rdkafka使用env_logger
crate来格式化日志。在这些上下文中,可以使用RUST_LOG
环境变量来启用日志记录,例如
RUST_LOG="librdkafka=trace,rdkafka::client=debug" cargo test
这将配置librdkafka的日志级别为跟踪,Rust客户端模块的日志级别为调试。要实际接收librdkafka的日志,还必须在生产者或消费者配置中设置debug
选项(见librdkafka 配置)。
要启用项目中的调试,请确保使用env_logger::init()
初始化记录器,或任何与log
兼容的日志框架的等效项。
rdkafka-sys
请参阅rdkafka-sys
。
贡献者
感谢
- Thijs Cadier - thijsc
替代方案
- kafka-rust:Kafka客户端的纯Rust实现。
依赖关系
~3–17MB
~232K SLoC