6个版本 (重大更改)

0.5.0 2023年8月11日
0.4.0 2023年6月1日
0.3.0 2022年7月18日
0.2.0 2022年2月17日
0.0.0 2022年1月27日

#35 in HTTP服务器

Download history 880/week @ 2024-04-20 1093/week @ 2024-04-27 1376/week @ 2024-05-04 1558/week @ 2024-05-11 1281/week @ 2024-05-18 1844/week @ 2024-05-25 3137/week @ 2024-06-01 3366/week @ 2024-06-08 2019/week @ 2024-06-15 3489/week @ 2024-06-22 1532/week @ 2024-06-29 2451/week @ 2024-07-06 1692/week @ 2024-07-13 1396/week @ 2024-07-20 1115/week @ 2024-07-27 1447/week @ 2024-08-03

每月 6,128 次下载

MIT/Apache

475KB
11K SLoC

RSKafka

CircleCI Crates.io Documentation License

该包旨在成为Kafka的简单工作负载的最小实现,这些工作负载希望将Kafka用作分布式预写日志。

它不是一个通用的Kafka实现,而是针对简单性进行了大量优化,包括实现和其涌现的运行特性。特别是,它旨在满足IOx的需求。

该包具有以下特点:

  • 不支持偏移量跟踪、消费者组、事务等...
  • 没有内置的缓冲、聚合、linger超时等...
  • 每个分区独立的写入流

它非常适合以下工作负载:

  • 独立于Kafka执行偏移量跟踪
  • 按分区读写合理大小的有效负载
  • 分区吞吐量低[1]

用法

# async fn test() {
use rskafka::{
    client::{
        ClientBuilder,
        partition::{Compression, UnknownTopicHandling},
    },
    record::Record,
};
use chrono::{TimeZone, Utc};
use std::collections::BTreeMap;

// setup client
let connection = "localhost:9093".to_owned();
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();

// create a topic
let topic = "my_topic";
let controller_client = client.controller_client().unwrap();
controller_client.create_topic(
    topic,
    2,      // partitions
    1,      // replication factor
    5_000,  // timeout (ms)
).await.unwrap();

// get a partition-bound client
let partition_client = client
    .partition_client(
        topic.to_owned(),
        0,  // partition
        UnknownTopicHandling::Retry,
     )
     .await
    .unwrap();

// produce some data
let record = Record {
    key: None,
    value: Some(b"hello kafka".to_vec()),
    headers: BTreeMap::from([
        ("foo".to_owned(), b"bar".to_vec()),
    ]),
    timestamp: Utc.timestamp_millis(42),
};
partition_client.produce(vec![record], Compression::default()).await.unwrap();

// consume data
let (records, high_watermark) = partition_client
    .fetch_records(
        0,  // offset
        1..1_000_000,  // min..max bytes
        1_000,  // max wait time
    )
   .await
   .unwrap();
# }

对于更高级的生产和消费,请参阅crate::client::producercrate::client::consumer

功能

  • compression-gzip(默认): 支持使用gzip压缩和解压缩消息。
  • compression-lz4(默认): 支持使用LZ4压缩和解压缩消息。
  • compression-snappy(默认): 支持使用Snappy压缩和解压缩消息。
  • compression-zstd(默认):支持使用zstd对消息进行压缩和解压缩。
  • full包含所有稳定特性(compression-gzipcompression-lz4compression-snappycompression-zstdtransport-socks5transport-tls)。
  • transport-socks5允许通过SOCKS5代理进行传输。
  • transport-tls允许通过rustls进行TLS传输。
  • unstable-fuzzing暴露一些内部数据结构,以便它们可以被我们的模糊器使用。这不是一个稳定的特性/API!

测试

Redpanda

要针对Redpanda运行集成测试,请在单个会话中运行

$ docker-compose -f docker-compose-redpanda.yml up

然后在另一个会话中运行

$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=redpanda KAFKA_CONNECT=0.0.0.0:9011 cargo test

Apache Kafka

要针对Apache Kafka运行集成测试,请在另一个会话中运行

$ docker-compose -f docker-compose-kafka.yml up

然后在另一个会话中运行

$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 KAFKA_SASL_CONNECT=localhost:9097 cargo test

注意,Apache Kafka支持的特性集与Redpanda不同,因此我们需要传递其他环境变量。

使用SOCKS5代理

要使用SOCKS5代理运行集成测试,您需要设置环境变量SOCKS_PROXY。以下命令需要在本地机器上有一个正在运行的代理。

$ KAFKA_CONNECT=0.0.0.0:9011,kafka-1:9021,redpanda-1:9021 SOCKS_PROXY=localhost:1080 cargo test --features full

SOCKS5代理将由docker compose文件自动启动。请注意,KAFKA_CONNECT已扩展到通过代理可访问的地址。

Java Interopt

要测试RSKafka是否可以向/从官方Java客户端生产/消费记录,您需要安装Java并设置环境变量TEST_JAVA_INTEROPT=1

模糊测试

RSKafka为某些协议解析步骤提供模糊目标。要构建它们,请确保已安装cargo-fuzz。选择以下模糊器之一

  • protocol_reader选择一个API密钥和API版本,然后读取消息帧并尝试解码响应对象。消息帧在没有长度标记的情况下读取,以提高模糊测试效率。
  • record_batch_body_reader读取记录批次的内部部分(不包含包含长度和CRC的长度前缀)并尝试解码它。在理论上,这已被protocol_reader覆盖,但由于长度字段和CRC,这使得模糊器难以遍历此数据结构。

然后使用以下命令运行模糊器

$ cargo +nightly fuzz run protocol_reader
...

让它运行您希望的时间或直到它找到崩溃

...
Failing input:

        fuzz/artifacts/protocol_reader/crash-369f9787d35767c47431161d455aa696a71c23e3

Output of `std::fmt::Debug`:

        [0, 18, 0, 3, 0, 0, 0, 0, 71, 88, 0, 0, 0, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 0, 0, 0, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 0, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 164, 18, 18, 0, 164, 0, 164, 164, 164, 30, 164, 164, 0, 0, 0, 0, 63]

Reproduce with:

        cargo fuzz run protocol_reader fuzz/artifacts/protocol_reader/crash-369f9787d35767c47431161d455aa696a71c23e3

Minimize test case with:

        cargo fuzz tmin protocol_reader fuzz/artifacts/protocol_reader/crash-369f9787d35767c47431161d455aa696a71c23e3

遗憾的是,您可能得到的回溯并不是很有帮助,您需要调试器来检测确切的位置

$ rust-lldb ./target/x86_64-unknown-linux-gnu/release/protocol_reader fuzz/artifacts/protocol_reader/crash-7b824dad6e26002e5488e8cc84ce16728222dcf5
...

(lldb) r
...
Process 177543 launched: '/home/mneumann/src/rskafka/target/x86_64-unknown-linux-gnu/release/protocol_reader' (x86_64)
INFO: Running with entropic power schedule (0xFF, 100).
INFO: Seed: 3549747846
...
==177543==ABORTING
(lldb) AddressSanitizer report breakpoint hit. Use 'thread info -s' to get extended information about the report.
Process 177543 stopped
...

(lldb) bt
* thread #1, name = 'protocol_reader', stop reason = AddressSanitizer detected: allocation-size-too-big
  * frame #0: 0x0000555556c04f20 protocol_reader`::AsanDie() at asan_rtl.cpp:45:7
    frame #1: 0x0000555556c1a33c protocol_reader`__sanitizer::Die() at sanitizer_termination.cpp:55:7
    frame #2: 0x0000555556c01471 protocol_reader`::~ScopedInErrorReport() at asan_report.cpp:190:7
    frame #3: 0x0000555556c021f4 protocol_reader`::ReportAllocationSizeTooBig() at asan_report.cpp:313:1
...

然后创建单元测试并修复错误。

对于内存不足的错误,LLDB不会自动停止。但是,您可以在启动执行之前设置一个断点,直接挂钩到即将退出的地方

(lldb) b fuzzer::PrintStackTrace()

基准测试

安装cargo-criterion,确保您有一个Kafka集群正在运行,然后您可以使用以下命令运行所有基准测试

$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo criterion --all-features

如果您发现某个基准测试运行得太慢,您可能想对其进行性能分析。获取cargo-withperf,然后运行(这里用于parallel/rskafka基准测试)

$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo with 'perf record --call-graph dwarf -- {bin}' -- \
    bench --all-features --bench write_throughput -- \
    --bench --noplot parallel/rskafka

查看报告

$ perf report

许可

根据以下许可之一授权

贡献

除非您明确说明,否则您有意提交的任何贡献,根据Apache-2.0许可证的定义,应按上述方式双许可,不附加任何额外条款或条件。

[^1]: Kafka的设计使得任何客户端都难以支持相反的操作,因为最终每个分区都是代理中独立的写入流。然而,这个包不尝试通过将多个分区的写入批量到单个ProduceRequest中来减轻每个分区的开销。

依赖关系

~5–17MB
~214K SLoC