2 个版本
0.3.1 | 2023 年 2 月 4 日 |
---|---|
0.3.0 | 2022 年 11 月 22 日 |
#342 在 WebAssembly
57 每月下载量
460KB
10K SLoC
RSKafka_wasi
该 crate 旨在为希望将 Kafka 作为分布式预写日志使用的简单工作负载提供一个最小的 Kafka 实现。这是从原始 RSKafka 分支出来的,支持 WebAssembly 编译目标。这允许 Kafka 应用在 WasmEdge 运行时中作为轻量级且安全的替代方案运行,而不是在 Linux 容器中编译的应用。
它不是一个通用 Kafka 实现,而是对简单性进行了大量优化,无论是在实现方面还是其涌现的操作特性方面。特别是,它旨在满足 IOx 的需求。
该 crate 有
- 不支持偏移量跟踪、消费者组、事务等...
- 没有内置的缓冲区、聚合、linger 超时等...
- 每个分区独立的写入流
它非常适合以下工作负载
- 独立于 Kafka 进行偏移量跟踪
- 每个分区的读写合理大小的有效负载
- 高吞吐量分区的数量较低 [^1]
用法
# async fn test() {
use rskafka::{
client::{
ClientBuilder,
partition::{Compression, UnknownTopicHandling},
},
record::Record,
};
use chrono::{TimeZone, Utc};
use std::collections::BTreeMap;
// setup client
let connection = "localhost:9093".to_owned();
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
// create a topic
let topic = "my_topic";
let controller_client = client.controller_client().unwrap();
controller_client.create_topic(
topic,
2, // partitions
1, // replication factor
5_000, // timeout (ms)
).await.unwrap();
// get a partition-bound client
let partition_client = client
.partition_client(
topic.to_owned(),
0, // partition
UnknownTopicHandling::Retry,
)
.await
.unwrap();
// produce some data
let record = Record {
key: None,
value: Some(b"hello kafka".to_vec()),
headers: BTreeMap::from([
("foo".to_owned(), b"bar".to_vec()),
]),
timestamp: Utc.timestamp_millis(42),
};
partition_client.produce(vec![record], Compression::default()).await.unwrap();
// consume data
let (records, high_watermark) = partition_client
.fetch_records(
0, // offset
1..1_000_000, // min..max bytes
1_000, // max wait time
)
.await
.unwrap();
# }
对于更高级的生产和消费,请参阅 crate::client::producer
和 crate::client::consumer
。
功能
compression-gzip
(默认):支持使用 gzip 进行消息的压缩和解压缩。compression-lz4
(默认):支持使用 LZ4 进行消息的压缩和解压缩。compression-snappy
(默认):支持使用 Snappy 进行消息的压缩和解压缩。compression-zstd
(默认):支持使用 zstd 进行消息的压缩和解压缩。full
: 包括所有稳定特性(compression-gzip
、compression-lz4
、compression-snappy
、compression-zstd
、transport-socks5
、transport-tls
)。transport-socks5
: 允许通过 SOCKS5 代理进行传输。
测试
Redpanda
要对 Redpanda 运行集成测试,请在一个会话中运行
$ docker-compose -f docker-compose-redpanda.yml up
然后在另一个会话中运行
$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=redpanda KAFKA_CONNECT=0.0.0.0:9011 cargo test
。
Apache Kafka
要对 Apache Kafka 运行集成测试,请在另一个会话中运行
$ docker-compose -f docker-compose-kafka.yml up
然后在另一个会话中运行
$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo test
。注意,Apache Kafka 支持的功能集合与 Redpanda 不同,因此我们传递其他环境变量。
许可
根据以下之一许可
- Apache 许可证 2.0(LICENSE-APACHE 或 https://apache.ac.cn/licenses/LICENSE-2.0)
- MIT 许可证(LICENSE-MIT 或 https://opensource.org/licenses/MIT)
贡献
除非您明确声明,否则您有意提交的任何贡献,根据 Apache-2.0 许可证定义的工作内容,应按照上述方式双重许可,不附加任何额外条款或条件。
[^1]: Kafka 的设计使得任何客户端支持反向操作都变得困难,因为最终每个分区都是代理内部的独立写入流。然而,此 crate 并不尝试通过将多个分区的写入批量到单个 ProduceRequest 中等方法来减轻每个分区的开销。
依赖
~5–21MB
~262K SLoC