6个版本 (重大更改)
0.5.0 | 2023年8月11日 |
---|---|
0.4.0 | 2023年6月1日 |
0.3.0 | 2022年7月18日 |
0.2.0 | 2022年2月17日 |
0.0.0 | 2022年1月27日 |
#35 in HTTP服务器
每月 6,128 次下载
475KB
11K SLoC
RSKafka
该包旨在成为Kafka的简单工作负载的最小实现,这些工作负载希望将Kafka用作分布式预写日志。
它不是一个通用的Kafka实现,而是针对简单性进行了大量优化,包括实现和其涌现的运行特性。特别是,它旨在满足IOx的需求。
该包具有以下特点:
- 不支持偏移量跟踪、消费者组、事务等...
- 没有内置的缓冲、聚合、linger超时等...
- 每个分区独立的写入流
它非常适合以下工作负载:
- 独立于Kafka执行偏移量跟踪
- 按分区读写合理大小的有效负载
- 分区吞吐量低[1]
用法
# async fn test() {
use rskafka::{
client::{
ClientBuilder,
partition::{Compression, UnknownTopicHandling},
},
record::Record,
};
use chrono::{TimeZone, Utc};
use std::collections::BTreeMap;
// setup client
let connection = "localhost:9093".to_owned();
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
// create a topic
let topic = "my_topic";
let controller_client = client.controller_client().unwrap();
controller_client.create_topic(
topic,
2, // partitions
1, // replication factor
5_000, // timeout (ms)
).await.unwrap();
// get a partition-bound client
let partition_client = client
.partition_client(
topic.to_owned(),
0, // partition
UnknownTopicHandling::Retry,
)
.await
.unwrap();
// produce some data
let record = Record {
key: None,
value: Some(b"hello kafka".to_vec()),
headers: BTreeMap::from([
("foo".to_owned(), b"bar".to_vec()),
]),
timestamp: Utc.timestamp_millis(42),
};
partition_client.produce(vec![record], Compression::default()).await.unwrap();
// consume data
let (records, high_watermark) = partition_client
.fetch_records(
0, // offset
1..1_000_000, // min..max bytes
1_000, // max wait time
)
.await
.unwrap();
# }
对于更高级的生产和消费,请参阅crate::client::producer
和 crate::client::consumer
。
功能
compression-gzip
(默认): 支持使用gzip压缩和解压缩消息。compression-lz4
(默认): 支持使用LZ4压缩和解压缩消息。compression-snappy
(默认): 支持使用Snappy压缩和解压缩消息。compression-zstd
(默认):支持使用zstd对消息进行压缩和解压缩。full
:包含所有稳定特性(compression-gzip
、compression-lz4
、compression-snappy
、compression-zstd
、transport-socks5
、transport-tls
)。transport-socks5
:允许通过SOCKS5代理进行传输。transport-tls
:允许通过rustls进行TLS传输。unstable-fuzzing
:暴露一些内部数据结构,以便它们可以被我们的模糊器使用。这不是一个稳定的特性/API!
测试
Redpanda
要针对Redpanda运行集成测试,请在单个会话中运行
$ docker-compose -f docker-compose-redpanda.yml up
然后在另一个会话中运行
$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=redpanda KAFKA_CONNECT=0.0.0.0:9011 cargo test
。
Apache Kafka
要针对Apache Kafka运行集成测试,请在另一个会话中运行
$ docker-compose -f docker-compose-kafka.yml up
然后在另一个会话中运行
$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 KAFKA_SASL_CONNECT=localhost:9097 cargo test
注意,Apache Kafka支持的特性集与Redpanda不同,因此我们需要传递其他环境变量。
使用SOCKS5代理
要使用SOCKS5代理运行集成测试,您需要设置环境变量SOCKS_PROXY
。以下命令需要在本地机器上有一个正在运行的代理。
$ KAFKA_CONNECT=0.0.0.0:9011,kafka-1:9021,redpanda-1:9021 SOCKS_PROXY=localhost:1080 cargo test --features full
SOCKS5代理将由docker compose文件自动启动。请注意,KAFKA_CONNECT
已扩展到通过代理可访问的地址。
Java Interopt
要测试RSKafka是否可以向/从官方Java客户端生产/消费记录,您需要安装Java并设置环境变量TEST_JAVA_INTEROPT=1
。
模糊测试
RSKafka为某些协议解析步骤提供模糊目标。要构建它们,请确保已安装cargo-fuzz。选择以下模糊器之一
protocol_reader
:选择一个API密钥和API版本,然后读取消息帧并尝试解码响应对象。消息帧在没有长度标记的情况下读取,以提高模糊测试效率。record_batch_body_reader
:读取记录批次的内部部分(不包含包含长度和CRC的长度前缀)并尝试解码它。在理论上,这已被protocol_reader
覆盖,但由于长度字段和CRC,这使得模糊器难以遍历此数据结构。
然后使用以下命令运行模糊器
$ cargo +nightly fuzz run protocol_reader
...
让它运行您希望的时间或直到它找到崩溃
...
Failing input:
fuzz/artifacts/protocol_reader/crash-369f9787d35767c47431161d455aa696a71c23e3
Output of `std::fmt::Debug`:
[0, 18, 0, 3, 0, 0, 0, 0, 71, 88, 0, 0, 0, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 0, 0, 0, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 0, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 18, 18, 0, 164, 0, 164, 164, 164, 30, 164, 164, 0, 0, 0, 0, 63]
Reproduce with:
cargo fuzz run protocol_reader fuzz/artifacts/protocol_reader/crash-369f9787d35767c47431161d455aa696a71c23e3
Minimize test case with:
cargo fuzz tmin protocol_reader fuzz/artifacts/protocol_reader/crash-369f9787d35767c47431161d455aa696a71c23e3
遗憾的是,您可能得到的回溯并不是很有帮助,您需要调试器来检测确切的位置
$ rust-lldb ./target/x86_64-unknown-linux-gnu/release/protocol_reader fuzz/artifacts/protocol_reader/crash-7b824dad6e26002e5488e8cc84ce16728222dcf5
...
(lldb) r
...
Process 177543 launched: '/home/mneumann/src/rskafka/target/x86_64-unknown-linux-gnu/release/protocol_reader' (x86_64)
INFO: Running with entropic power schedule (0xFF, 100).
INFO: Seed: 3549747846
...
==177543==ABORTING
(lldb) AddressSanitizer report breakpoint hit. Use 'thread info -s' to get extended information about the report.
Process 177543 stopped
...
(lldb) bt
* thread #1, name = 'protocol_reader', stop reason = AddressSanitizer detected: allocation-size-too-big
* frame #0: 0x0000555556c04f20 protocol_reader`::AsanDie() at asan_rtl.cpp:45:7
frame #1: 0x0000555556c1a33c protocol_reader`__sanitizer::Die() at sanitizer_termination.cpp:55:7
frame #2: 0x0000555556c01471 protocol_reader`::~ScopedInErrorReport() at asan_report.cpp:190:7
frame #3: 0x0000555556c021f4 protocol_reader`::ReportAllocationSizeTooBig() at asan_report.cpp:313:1
...
然后创建单元测试并修复错误。
对于内存不足的错误,LLDB不会自动停止。但是,您可以在启动执行之前设置一个断点,直接挂钩到即将退出的地方
(lldb) b fuzzer::PrintStackTrace()
基准测试
安装cargo-criterion,确保您有一个Kafka集群正在运行,然后您可以使用以下命令运行所有基准测试
$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo criterion --all-features
如果您发现某个基准测试运行得太慢,您可能想对其进行性能分析。获取cargo-with和perf,然后运行(这里用于parallel/rskafka
基准测试)
$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo with 'perf record --call-graph dwarf -- {bin}' -- \
bench --all-features --bench write_throughput -- \
--bench --noplot parallel/rskafka
查看报告
$ perf report
许可
根据以下许可之一授权
- Apache许可证,版本2.0(LICENSE-APACHE 或 http://apache.ac.cn/licenses/LICENSE-2.0)
- MIT许可证(LICENSE-MIT 或 https://opensource.org/licenses/MIT)
贡献
除非您明确说明,否则您有意提交的任何贡献,根据Apache-2.0许可证的定义,应按上述方式双许可,不附加任何额外条款或条件。
[^1]: Kafka的设计使得任何客户端都难以支持相反的操作,因为最终每个分区都是代理中独立的写入流。然而,这个包不尝试通过将多个分区的写入批量到单个ProduceRequest中来减轻每个分区的开销。
依赖关系
~5–17MB
~214K SLoC