8 个版本
0.1.7 | 2024年7月30日 |
---|---|
0.1.6 | 2024年6月21日 |
0.1.5 | 2024年5月11日 |
0.1.4 | 2024年4月6日 |
0.1.0 | 2024年2月26日 |
在 Web 编程 中排名 #142
每月 139 次下载
在 2 crates 中使用
575KB
7K SLoC
Samsa
Rust 原生 Kafka/Redpanda 协议和客户端实现。
此 crate 提供了 Rust 原生的消费者和生产者,以及 Apache Kafka 协议的低级绑定。与使用 FFI 的 librdkafka 的 crate 不同,此 crate 的用户 不需要 C 库,并且将受益于 Rust 的所有优点;这意味着内存安全、安全的并发、低资源使用,以及当然,闪电般的速度。
目标
- 易于理解的代码
- 利用像 Tokio、Nom 这样的最佳类库来完成繁重的工作
- 从坚实的基础开始,并随着时间的推移添加更多高级功能
- 提供 Kafka 协议的纯 Rust 实现
- 成为基于 Kafka 的未来工作的良好构建块
目录
入门
使用 samsa
将 cargo add samsa
将 Cargo.toml
依赖项中的以下片段包含到您的 Rust 项目中
samsa = "0.1"
此项目包含 Docker Compose 文件,以帮助设置 Redpanda 和 Kafka 集群以简化测试。最简单的方法是运行 docker-compose up
以启动一个 2 个代理的 Redpanda 集群。如果您想使用不同的 Kafka 版本,请参阅 DockerCompose.README.md
生产者
生产者向指定的主题和分区发送消息。
它是缓存的,具有超时和体积阈值,当达到时清除缓冲区。这就是如何调整 letency 和 throughout 以实现所需的速率。
要创建一个实例,最简单的方法是使用Stream和ProducerBuilder
。
use samsa::prelude::*;
let bootstrap_addrs = vec![BrokerAddress {
host: "127.0.0.1".to_owned(),
port: 9092,
}];
let topic_name = "my-topic".to_string();
let partition_id = 0;
// create a stream of 5k messages in batches of 100
let stream = iter(0..5000).map(|_| ProduceMessage {
topic: topic_name.to_string(),
partition_id,
key: Some(bytes::Bytes::from_static(b"Tester")),
value: Some(bytes::Bytes::from_static(b"Value")),
headers: vec![
samsa::prelude::Header::new(String::from("Key"), bytes::Bytes::from("Value"))
],
}).chunks(100);
let output_stream =
ProducerBuilder::<TcpConnection>::new(bootstrap_addrs, vec![topic_name.to_string()])
.await?
.batch_timeout_ms(100)
.max_batch_size(100)
.clone()
.build_from_stream(stream)
.await;
tokio::pin!(output_stream);
while (output_stream.next().await).is_some() {}
消费者
Consumer
用于从代理中获取消息。它是一个可以配置为自动提交的异步迭代器。要创建一个实例,从ConsumerBuilder
开始。
use samsa::prelude::*;
let bootstrap_addrs = vec![BrokerAddress {
host: "127.0.0.1".to_owned(),
port: 9092,
}];
let partitions = vec![0];
let topic_name = "my-topic".to_string();
let assignment = TopicPartitionsBuilder::new()
.assign(topic_name, partitions)
.build();
let consumer = ConsumerBuilder::<TcpConnection>::new(
bootstrap_addrs,
assignment,
)
.await?
.build();
let stream = consumer.into_stream();
// have to pin streams before iterating
tokio::pin!(stream);
// Stream will do nothing unless consumed.
while let Some(Ok((batch, offsets))) = stream.next().await {
println!("{:?}", batch);
}
消费者组
您可以使用组ID和分配来设置一个消费者组。组成员的偏移量会自动提交。
use samsa::prelude::*;
let bootstrap_addrs = vec![BrokerAddress {
host: "127.0.0.1".to_owned(),
port: 9092,
}];
let partitions = vec![0];
let topic_name = "my-topic".to_string();
let assignment = TopicPartitionsBuilder::new()
.assign(topic_name, partitions)
.build();
let group_id = "The Data Engineering Team".to_string();
let consumer_group_member = ConsumerGroupBuilder::<TcpConnection>::new(
bootstrap_addrs,
group_id,
assignment,
).await?
.build()
.await?;
let stream = consumer_group_member.into_stream();
// have to pin streams before iterating
tokio::pin!(stream);
// Stream will do nothing unless consumed.
while let Some(batch) = stream.next().await {
println!("{:?}", batch);
}
TLS 支持
您可以为消费者或生产者添加TLS支持以实现安全通信。要启用此功能,从指定TlsConnectionOptions
开始,并将其传递给ProducerBuilder
或ConsumerBuilder
的一个实例。
TLS支持的生产者示例
use samsa::prelude::*;
let tls_option = TlsConnectionOptions {
broker_options: vec![BrokerAddress {
host: "127.0.0.1".to_owned(),
port: 9092,
}],
key: "/path_to_key_file".into(),
cert: "/path_to_cert_file".into(),
cafile: Some("/path_to_ca_file".into()),
};
let topic_name = "my-topic".to_string();
let partition_id = 0;
let message = ProduceMessage {
topic: topic_name.to_string(),
partition_id,
key: Some(bytes::Bytes::from_static(b"Tester")),
value: Some(bytes::Bytes::from_static(b"Value")),
headers: vec![
Header::new(String::from("Key"), bytes::Bytes::from("Value"))
],
};
let producer_client = ProducerBuilder::<TlsConnection>::new(
tls_option,
vec![topic_name.to_string()]
)
.await?
.batch_timeout_ms(1)
.max_batch_size(2)
.clone()
.build()
.await;
producer_client
.produce(message)
.await;
TLS支持的消费者示例
use samsa::prelude::*;
let tls_option = TlsConnectionOptions {
broker_options: vec![BrokerAddress {
host: "127.0.0.1".to_owned(),
port: 9092,
}],
key: "/path_to_key_file".into(),
cert: "/path_to_cert_file".into(),
cafile: Some("/path_to_ca_file".into()),
};
let partitions = vec![0];
let topic_name = "my-topic".to_string();
let assignment = TopicPartitionsBuilder::new()
.assign(topic_name, partitions)
.build();
let consumer = ConsumerBuilder::<TlsConnection>::new(
tls_option,
assignment,
)
.await?
.build();
let stream = consumer.into_stream();
// have to pin streams before iterating
tokio::pin!(stream);
// Stream will do nothing unless consumed.
while let Some(batch) = stream.next().await {
println!("{:?} messages read", batch.unwrap().count());
}
压缩支持
我们为生产者提供压缩支持,使用Compression
枚举。枚举允许指定要使用的压缩类型。消费者将自动知道解压缩消息。
TLS和GZIP压缩支持的生产者示例
use samsa::prelude::*;
let tls_option = TlsConnectionOptions {
broker_options: vec![BrokerAddress {
host: "127.0.0.1".to_owned(),
port: 9092,
}],
key: "/path_to_key_file".into(),
cert: "/path_to_cert_file".into(),
cafile: Some("/path_to_ca_file".into()),
};
let topic_name = "my-topic".to_string();
let partition_id = 0;
let message = ProduceMessage {
topic: topic_name.to_string(),
partition_id,
key: Some(bytes::Bytes::from_static(b"Tester")),
value: Some(bytes::Bytes::from_static(b"Value")),
headers: vec![
Header::new(String::from("Key"), bytes::Bytes::from("Value"))
],
};
let producer_client = ProducerBuilder::new(tls_option, vec![topic_name.to_string()])
.await?
.compression(Compression::Gzip)
.batch_timeout_ms(1)
.max_batch_size(2)
.clone()
.build()
.await;
producer_client
.produce(message)
.await;
SASL 支持
我们包括支持SASL的所有典型机制:PLAIN、SCRAM-SHA-256、SCRAM-SHA-512。这表示为另一种类型的BrokerConnection,我们的消费者和生产者将其作为泛型参数接收。所需的一切只是提供凭证。
同时使用TLS和SASL的生产者示例
use samsa::prelude::*;
let tls_config = TlsConnectionOptions {
broker_options: vec![BrokerAddress {
host: "127.0.0.1".to_owned(),
port: 9092,
}],
key: "/path_to_key_file".into(),
cert: "/path_to_cert_file".into(),
cafile: Some("/path_to_ca_file".into()),
};
let sasl_config = SaslConfig::new(String::from("myuser"), String::from("pass1234"), None, None);
let options = SaslTlsConfig {
tls_config,
sasl_config,
};
let topic_name = "atopic";
let s = ConsumerBuilder::<SaslTlsConnection>::new(
options,
TopicPartitionsBuilder::new()
.assign(topic_name.to_owned(), vec![0])
.build(),
)
.await
.unwrap()
.build()
.into_stream();
tokio::pin!(s);
while let Some(m) = s.next().await {
tracing::info!("{:?} messages read", m.unwrap().count());
}
示例
我们为有兴趣尝试samsa
的人提供高级示例。设置如下
- 使用
docker-compose up
启动您的redpanda集群。 - 转到https://127.0.0.1:8080/topics以查看redpanda控制台。
- 创建一个名为
my-topic
的具有4个分区的主题。
生产者
运行以下操作以填充您的主题。
- 运行
cargo run --example producer
- 访问https://127.0.0.1:8080/topics/my-topic以查看数据流!
消费者
消费主题中的消息。
- 运行
cargo run --example consumer
- 在您的终端中观察所有数据洪流。
ConsumerGroup
协调一组4个消费者。此操作被限制以查看组重新平衡。
- 运行
cargo run --example consumer_group
- 访问https://127.0.0.1:8080/groups/Squad以查看单独的成员。
- 在另一个终端窗口中,运行
cargo run --example consumer_group
- 在控制台中查看新成员加入。
- 重复2次以查看完整的组。
TLS
我们有一组使用TLS支持的Producer和Consumer示例。您需要运行TLS已启用且在此代码库中具有正确证书的集群。
- 设置一个使用正确TLS配置的集群
- 将您的证书复制到项目中
- 更新以下示例中的路径以正确
- 运行
cargo run --example tls_produce
- 运行
cargo run --example tls_consume
SASL
我们有一个使用SASL支持的生成者和消费者示例的副本。您需要一个已启用SASL的集群。
- 设置一个使用正确SASL配置的集群
- 更新以下示例中的凭据以正确
- 运行
cargo run --example sasl
- 运行
cargo run --example sasl_tls
开发设置
- 运行
docker-compose up
以启动Redpanda集群。
测试
要运行测试,请确保集群正在运行。运行 KAFKA_BROKERS=[您的集群URL] cargo test --tests --all-features -- --show-output --test-threads=1
基准测试
我们提供了一种通过Criterion来评估库性能的方法。这需要一些小的设置
- 设置一个本地的Redpanda集群
- 创建一个名为
benchmark
的具有1个分区的主题 - 运行
cargo run --example producer
示例来将数据放入该主题 - 运行
cargo bench
结果
在2020年Macbook Pro,2 GHz四核Intel Core i5,16 GB 3733 MHz LPDDR4X上
生成100万个10字节的消息
time: [1.5852 s 1.6119 s 1.6406 s]
消费100万个10字节的消息
time: [1.8013 s 1.8112 s 1.8225 s]
资源
依赖项
~17–30MB
~564K SLoC