#producer-consumer #kafka #producer #consumer #streaming #rust-client

tiny_kafka

一个具有生产者和消费者功能的迷你 Kafka 客户端库

5 个稳定版本

1.0.4 2023 年 10 月 21 日

#8 in #producer

MIT 许可证

21KB
206 行代码(不包括注释)

Rust 中的 Kafka 生产者和消费者

此仓库包含基于 Rust 的 Kafka 生产者和消费者实现。它利用 rdkafka 库与 Kafka 通信并管理消息的生产和消费。

先决条件

  • Rust:确保您的机器上已安装 Rust 和 Cargo。
  • Kafka:一个可以连接的正在运行的 Kafka 集群。

设置

  1. 克隆仓库:

    git clone github.com/cploutarchou/tiny_kafka
    
  2. 导航到仓库:

    cd path-to-repository
    
  3. 构建代码:

    cargo build
    

生产者

Kafka 生产者是向 Kafka 发送消息的高级接口。它封装了初始化与 Kafka 代理的连接并向指定主题发送消息的过程。

用法

初始化生产者

let producer = KafkaProducer::new("localhost:9092", None);

向 Kafka 主题发送消息

let msg = Message::new("key1", "value1");
producer.send_message("my-topic", msg).await;

自定义配置

初始化生产者时可以提供自定义配置

let mut configs = HashMap::new();
configs.insert("max.in.flight.requests.per.connection", "5");
let producer_with_configs = KafkaProducer::new("localhost:9092", Some(configs));

消费者

Kafka 消费者提供了从 Kafka 主题消费消息的功能。

用法

初始化消费者

let consumer = KafkaConsumer::new("localhost:9092", "my_group", "my_topic");

消费消息

if let Some(msg) = consumer.poll().await {
    println!("Received: {} -> {}", msg.key, msg.value);
}

包含 Tokio 和异步的完整主函数示例

以下是如何利用 Tokio 在异步上下文中使用 Kafka 生产者和消费者的详细示例。

use log::info;
use std::sync::Arc;
use tiny_kafka::consumer::KafkaConsumer;
use tiny_kafka::producer::{KafkaProducer, Message};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {

   let rt = tokio::runtime::Runtime::new().unwrap();
   // Assuming kafka_bootstrap_servers is of type String
   let brokers = Arc::new("localhost:9092".to_string());
   let topics = Arc::new(vec!["test".to_string()]);

   // Consumer task
   let brokers_for_task1 = brokers.clone();
   let topics_for_task1 = topics.clone();
   let task1 = async move {
      let consumer = KafkaConsumer::new(
         brokers_for_task1.as_str(),
         "kafka-to-elastic",
         topics_for_task1.get(0).unwrap(),
      );
      loop {
         if let Some(msg) = consumer.poll().await {
            info!(
                    "Consumed message with key: {} and value: {}",
                    msg.key, msg.value
                );
         }
      }
   };
   rt.spawn(task1);

   // Producer task
   let brokers_for_task2 = brokers.clone();
   let topics_for_task2 = topics.clone();
   let task2 = async move {
      let producer = KafkaProducer::new(brokers_for_task2.as_str(), Option::None);

      for i in 0..100 {
         let key = format!("test_key_{}", i);
         let value = format!("test_value_{}", i);
         let message = Message::new(&key, &value);

         producer
                 .send_message(topics_for_task2.get(0).unwrap(), message)
                 .await;
         tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
      }
   };
   rt.spawn(task2);

   // Wait for a ctrl-c signal
   tokio::signal::ctrl_c().await?;
   println!("ctrl-c received!");

   Ok(())
}

自定义配置

与生产者一样,消费者也支持自定义配置

let mut new_configs = HashMap::new();
new_configs.insert("auto.offset.reset".to_string(), "earliest".to_string());
consumer.set_client_config("localhost:9092", "my_group", "my_topic", new_configs);

运行测试

运行测试

cargo test

确保您的 Kafka 代理正在运行且测试主题存在。

贡献

我们欢迎贡献!请随时提出问题、提交拉取请求或只是传播信息。

许可证

此项目采用 MIT 许可证

依赖项

~17–30MB
~354K SLoC