#apache-kafka #kafka #wasi #protocols #async #api #api-bindings

rskafka_wasi

Apache Kafka 的最小化 Rust 客户端

2 个版本

0.3.1 2023 年 2 月 4 日
0.3.0 2022 年 11 月 22 日

#342WebAssembly

Download history 2/week @ 2024-03-25 17/week @ 2024-04-01

57 每月下载量

MIT/Apache

460KB
10K SLoC

RSKafka_wasi

该 crate 旨在为希望将 Kafka 作为分布式预写日志使用的简单工作负载提供一个最小的 Kafka 实现。这是从原始 RSKafka 分支出来的,支持 WebAssembly 编译目标。这允许 Kafka 应用在 WasmEdge 运行时中作为轻量级且安全的替代方案运行,而不是在 Linux 容器中编译的应用。

它不是一个通用 Kafka 实现,而是对简单性进行了大量优化,无论是在实现方面还是其涌现的操作特性方面。特别是,它旨在满足 IOx 的需求。

该 crate 有

  • 不支持偏移量跟踪、消费者组、事务等...
  • 没有内置的缓冲区、聚合、linger 超时等...
  • 每个分区独立的写入流

它非常适合以下工作负载

  • 独立于 Kafka 进行偏移量跟踪
  • 每个分区的读写合理大小的有效负载
  • 高吞吐量分区的数量较低 [^1]

用法

# async fn test() {
use rskafka::{
    client::{
        ClientBuilder,
        partition::{Compression, UnknownTopicHandling},
    },
    record::Record,
};
use chrono::{TimeZone, Utc};
use std::collections::BTreeMap;

// setup client
let connection = "localhost:9093".to_owned();
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();

// create a topic
let topic = "my_topic";
let controller_client = client.controller_client().unwrap();
controller_client.create_topic(
    topic,
    2,      // partitions
    1,      // replication factor
    5_000,  // timeout (ms)
).await.unwrap();

// get a partition-bound client
let partition_client = client
    .partition_client(
        topic.to_owned(),
        0,  // partition
        UnknownTopicHandling::Retry,
     )
     .await
    .unwrap();

// produce some data
let record = Record {
    key: None,
    value: Some(b"hello kafka".to_vec()),
    headers: BTreeMap::from([
        ("foo".to_owned(), b"bar".to_vec()),
    ]),
    timestamp: Utc.timestamp_millis(42),
};
partition_client.produce(vec![record], Compression::default()).await.unwrap();

// consume data
let (records, high_watermark) = partition_client
    .fetch_records(
        0,  // offset
        1..1_000_000,  // min..max bytes
        1_000,  // max wait time
    )
   .await
   .unwrap();
# }

对于更高级的生产和消费,请参阅 crate::client::producercrate::client::consumer

功能

  • compression-gzip(默认):支持使用 gzip 进行消息的压缩和解压缩。
  • compression-lz4(默认):支持使用 LZ4 进行消息的压缩和解压缩。
  • compression-snappy(默认):支持使用 Snappy 进行消息的压缩和解压缩。
  • compression-zstd(默认):支持使用 zstd 进行消息的压缩和解压缩。
  • full: 包括所有稳定特性(compression-gzipcompression-lz4compression-snappycompression-zstdtransport-socks5transport-tls)。
  • transport-socks5: 允许通过 SOCKS5 代理进行传输。

测试

Redpanda

要对 Redpanda 运行集成测试,请在一个会话中运行

$ docker-compose -f docker-compose-redpanda.yml up

然后在另一个会话中运行

$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=redpanda KAFKA_CONNECT=0.0.0.0:9011 cargo test

Apache Kafka

要对 Apache Kafka 运行集成测试,请在另一个会话中运行

$ docker-compose -f docker-compose-kafka.yml up

然后在另一个会话中运行

$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo test

。注意,Apache Kafka 支持的功能集合与 Redpanda 不同,因此我们传递其他环境变量。

许可

根据以下之一许可

贡献

除非您明确声明,否则您有意提交的任何贡献,根据 Apache-2.0 许可证定义的工作内容,应按照上述方式双重许可,不附加任何额外条款或条件。

[^1]: Kafka 的设计使得任何客户端支持反向操作都变得困难,因为最终每个分区都是代理内部的独立写入流。然而,此 crate 并不尝试通过将多个分区的写入批量到单个 ProduceRequest 中等方法来减轻每个分区的开销。

依赖

~5–21MB
~262K SLoC