8 个版本

0.1.7 2024年7月30日
0.1.6 2024年6月21日
0.1.5 2024年5月11日
0.1.4 2024年4月6日
0.1.0 2024年2月26日

Web 编程 中排名 #142

Download history 185/week @ 2024-05-06 16/week @ 2024-05-13 24/week @ 2024-05-20 7/week @ 2024-05-27 16/week @ 2024-06-03 5/week @ 2024-06-10 165/week @ 2024-06-17 8/week @ 2024-06-24 48/week @ 2024-07-01 130/week @ 2024-07-29 5/week @ 2024-08-05 4/week @ 2024-08-12

每月 139 次下载
2 crates 中使用

自定义许可协议

575KB
7K SLoC

Samsa

Rust 原生 Kafka/Redpanda 协议和客户端实现。

此 crate 提供了 Rust 原生的消费者和生产者,以及 Apache Kafka 协议的低级绑定。与使用 FFI 的 librdkafka 的 crate 不同,此 crate 的用户 不需要 C 库,并且将受益于 Rust 的所有优点;这意味着内存安全、安全的并发、低资源使用,以及当然,闪电般的速度。

文档

目标

  • 易于理解的代码
  • 利用像 TokioNom 这样的最佳类库来完成繁重的工作
  • 从坚实的基础开始,并随着时间的推移添加更多高级功能
  • 提供 Kafka 协议的纯 Rust 实现
  • 成为基于 Kafka 的未来工作的良好构建块

目录

入门

使用 samsacargo add samsaCargo.toml 依赖项中的以下片段包含到您的 Rust 项目中

samsa = "0.1"

此项目包含 Docker Compose 文件,以帮助设置 Redpanda 和 Kafka 集群以简化测试。最简单的方法是运行 docker-compose up 以启动一个 2 个代理的 Redpanda 集群。如果您想使用不同的 Kafka 版本,请参阅 DockerCompose.README.md

生产者

生产者向指定的主题和分区发送消息。

它是缓存的,具有超时和体积阈值,当达到时清除缓冲区。这就是如何调整 letency 和 throughout 以实现所需的速率。

要创建一个实例,最简单的方法是使用Stream和ProducerBuilder

use samsa::prelude::*;

let bootstrap_addrs = vec![BrokerAddress {
        host: "127.0.0.1".to_owned(),
        port: 9092,
    }];
let topic_name = "my-topic".to_string();
let partition_id = 0;

// create a stream of 5k messages in batches of 100
let stream = iter(0..5000).map(|_| ProduceMessage {
    topic: topic_name.to_string(),
    partition_id,
    key: Some(bytes::Bytes::from_static(b"Tester")),
    value: Some(bytes::Bytes::from_static(b"Value")),
    headers: vec![
        samsa::prelude::Header::new(String::from("Key"), bytes::Bytes::from("Value"))
    ],
}).chunks(100);

let output_stream =
ProducerBuilder::<TcpConnection>::new(bootstrap_addrs, vec![topic_name.to_string()])
    .await?
    .batch_timeout_ms(100)
    .max_batch_size(100)
    .clone()
    .build_from_stream(stream)
    .await;

tokio::pin!(output_stream);
while (output_stream.next().await).is_some() {}

消费者

Consumer用于从代理中获取消息。它是一个可以配置为自动提交的异步迭代器。要创建一个实例,从ConsumerBuilder开始。

use samsa::prelude::*;

let bootstrap_addrs = vec![BrokerAddress {
        host: "127.0.0.1".to_owned(),
        port: 9092,
    }];
let partitions = vec![0];
let topic_name = "my-topic".to_string();
let assignment = TopicPartitionsBuilder::new()
    .assign(topic_name, partitions)
    .build();

let consumer = ConsumerBuilder::<TcpConnection>::new(
        bootstrap_addrs,
        assignment,
    )
    .await?
    .build();

let stream = consumer.into_stream();
// have to pin streams before iterating
tokio::pin!(stream);

// Stream will do nothing unless consumed.
while let Some(Ok((batch, offsets))) = stream.next().await {
    println!("{:?}", batch);
}

消费者组

您可以使用组ID和分配来设置一个消费者组。组成员的偏移量会自动提交。

use samsa::prelude::*;

let bootstrap_addrs = vec![BrokerAddress {
        host: "127.0.0.1".to_owned(),
        port: 9092,
    }];
let partitions = vec![0];
let topic_name = "my-topic".to_string();
let assignment = TopicPartitionsBuilder::new()
    .assign(topic_name, partitions)
    .build();
let group_id = "The Data Engineering Team".to_string();

let consumer_group_member = ConsumerGroupBuilder::<TcpConnection>::new(
        bootstrap_addrs,
        group_id,
        assignment,
    ).await?
    .build()
    .await?;

let stream = consumer_group_member.into_stream();
// have to pin streams before iterating
tokio::pin!(stream);
 
// Stream will do nothing unless consumed.
while let Some(batch) = stream.next().await {
    println!("{:?}", batch);
}

TLS 支持

您可以为消费者或生产者添加TLS支持以实现安全通信。要启用此功能,从指定TlsConnectionOptions开始,并将其传递给ProducerBuilderConsumerBuilder的一个实例。

TLS支持的生产者示例

use samsa::prelude::*;

let tls_option = TlsConnectionOptions {
        broker_options: vec![BrokerAddress {
          host: "127.0.0.1".to_owned(),
          port: 9092,
        }],
        key: "/path_to_key_file".into(),
        cert: "/path_to_cert_file".into(),
        cafile: Some("/path_to_ca_file".into()),
    };
let topic_name = "my-topic".to_string();
let partition_id = 0;

let message = ProduceMessage {
        topic: topic_name.to_string(),
        partition_id,
        key: Some(bytes::Bytes::from_static(b"Tester")),
        value: Some(bytes::Bytes::from_static(b"Value")),
        headers: vec![
            Header::new(String::from("Key"), bytes::Bytes::from("Value"))
        ],
    };

let producer_client = ProducerBuilder::<TlsConnection>::new(
        tls_option, 
        vec![topic_name.to_string()]
    )
    .await?
    .batch_timeout_ms(1)
    .max_batch_size(2)
    .clone()
    .build()
    .await;

producer_client
    .produce(message)
    .await;

TLS支持的消费者示例

use samsa::prelude::*;

let tls_option = TlsConnectionOptions {
        broker_options: vec![BrokerAddress {
            host: "127.0.0.1".to_owned(),
            port: 9092,
        }],
        key: "/path_to_key_file".into(),
        cert: "/path_to_cert_file".into(),
        cafile: Some("/path_to_ca_file".into()),
    };
let partitions = vec![0];
let topic_name = "my-topic".to_string();
let assignment = TopicPartitionsBuilder::new()
    .assign(topic_name, partitions)
    .build();

let consumer = ConsumerBuilder::<TlsConnection>::new(
        tls_option,
        assignment,
    )
    .await?
    .build();

let stream = consumer.into_stream();
// have to pin streams before iterating
tokio::pin!(stream);

// Stream will do nothing unless consumed.
while let Some(batch) = stream.next().await {
    println!("{:?} messages read", batch.unwrap().count());
}

压缩支持

我们为生产者提供压缩支持,使用Compression枚举。枚举允许指定要使用的压缩类型。消费者将自动知道解压缩消息。

TLS和GZIP压缩支持的生产者示例

use samsa::prelude::*;

let tls_option = TlsConnectionOptions {
        broker_options: vec![BrokerAddress {
            host: "127.0.0.1".to_owned(),
            port: 9092,
        }],
        key: "/path_to_key_file".into(),
        cert: "/path_to_cert_file".into(),
        cafile: Some("/path_to_ca_file".into()),
    };
let topic_name = "my-topic".to_string();
let partition_id = 0;

let message = ProduceMessage {
        topic: topic_name.to_string(),
        partition_id,
        key: Some(bytes::Bytes::from_static(b"Tester")),
        value: Some(bytes::Bytes::from_static(b"Value")),
        headers: vec![
            Header::new(String::from("Key"), bytes::Bytes::from("Value"))
        ],
    };

let producer_client = ProducerBuilder::new(tls_option, vec![topic_name.to_string()])
    .await?
    .compression(Compression::Gzip)
    .batch_timeout_ms(1)
    .max_batch_size(2)
    .clone()
    .build()
    .await;

producer_client
    .produce(message)
    .await;

SASL 支持

我们包括支持SASL的所有典型机制:PLAIN、SCRAM-SHA-256、SCRAM-SHA-512。这表示为另一种类型的BrokerConnection,我们的消费者和生产者将其作为泛型参数接收。所需的一切只是提供凭证。

同时使用TLS和SASL的生产者示例

use samsa::prelude::*;

let tls_config = TlsConnectionOptions {
    broker_options: vec![BrokerAddress {
        host: "127.0.0.1".to_owned(),
        port: 9092,
    }],
    key: "/path_to_key_file".into(),
    cert: "/path_to_cert_file".into(),
    cafile: Some("/path_to_ca_file".into()),
};

let sasl_config = SaslConfig::new(String::from("myuser"), String::from("pass1234"), None, None);

let options = SaslTlsConfig {
    tls_config,
    sasl_config,
};

let topic_name = "atopic";

let s = ConsumerBuilder::<SaslTlsConnection>::new(
    options,
    TopicPartitionsBuilder::new()
        .assign(topic_name.to_owned(), vec![0])
        .build(),
)
.await
.unwrap()
.build()
.into_stream();

tokio::pin!(s);

while let Some(m) = s.next().await {
    tracing::info!("{:?} messages read", m.unwrap().count());
}

示例

我们为有兴趣尝试samsa的人提供高级示例。设置如下

  1. 使用docker-compose up启动您的redpanda集群。
  2. 转到https://127.0.0.1:8080/topics以查看redpanda控制台。
  3. 创建一个名为my-topic的具有4个分区的主题。

生产者

运行以下操作以填充您的主题。

  1. 运行cargo run --example producer
  2. 访问https://127.0.0.1:8080/topics/my-topic以查看数据流!

消费者

消费主题中的消息。

  1. 运行cargo run --example consumer
  2. 在您的终端中观察所有数据洪流。

ConsumerGroup

协调一组4个消费者。此操作被限制以查看组重新平衡。

  1. 运行cargo run --example consumer_group
  2. 访问https://127.0.0.1:8080/groups/Squad以查看单独的成员。
  3. 在另一个终端窗口中,运行cargo run --example consumer_group
  4. 在控制台中查看新成员加入。
  5. 重复2次以查看完整的组。

TLS

我们有一组使用TLS支持的Producer和Consumer示例。您需要运行TLS已启用且在此代码库中具有正确证书的集群。

  1. 设置一个使用正确TLS配置的集群
  2. 将您的证书复制到项目中
  3. 更新以下示例中的路径以正确
  4. 运行 cargo run --example tls_produce
  5. 运行 cargo run --example tls_consume

SASL

我们有一个使用SASL支持的生成者和消费者示例的副本。您需要一个已启用SASL的集群。

  1. 设置一个使用正确SASL配置的集群
  2. 更新以下示例中的凭据以正确
  3. 运行 cargo run --example sasl
  4. 运行 cargo run --example sasl_tls

开发设置

为了设置开发环境,您需要Rust工具链Docker

  • 运行 docker-compose up 以启动Redpanda集群。

测试

要运行测试,请确保集群正在运行。运行 KAFKA_BROKERS=[您的集群URL] cargo test --tests --all-features -- --show-output --test-threads=1

基准测试

我们提供了一种通过Criterion来评估库性能的方法。这需要一些小的设置

  1. 设置一个本地的Redpanda集群
  2. 创建一个名为benchmark的具有1个分区的主题
  3. 运行cargo run --example producer示例来将数据放入该主题
  4. 运行 cargo bench

结果

在2020年Macbook Pro,2 GHz四核Intel Core i5,16 GB 3733 MHz LPDDR4X上

生成100万个10字节的消息

time:   [1.5852 s 1.6119 s 1.6406 s]

消费100万个10字节的消息

time:   [1.8013 s 1.8112 s 1.8225 s]

资源

依赖项

~17–30MB
~564K SLoC