#kafka #message #consumer #producer #topic #broker

simple-kafka

更易于使用 Kafka 消息

8 个版本

0.0.8 2023年11月28日
0.0.7 2023年11月27日
0.0.4 2023年10月9日

14#producer 中排名

Download history 84/week @ 2024-03-11 6/week @ 2024-04-01

每月下载 64

MIT/Apache

16KB
293

simple-kafka

为了更方便地在 Rust 中使用 kafka。

示例

https://github.com/hitolz/simple-kafka-example

使用方法

读取配置文件,并将其转换为 simple_kafka::KafkaConfig

[kafka_config]
brokers = "localhost:9092"
group_id = "test_group2"
#[derive(Deserialize, Default, Debug,Clone)]
pub struct KafkaConfig {
    pub brokers: String,
    pub group_id: String,
}

impl Into<simple_kafka::KafkaConfig> for KafkaConfig {
    fn into(self) -> simple_kafka::KafkaConfig {
        simple_kafka::KafkaConfig{
            brokers: self.brokers,
            group_id: self.group_id,
        }
    }
}

在 main 中,通过 tokio::spawn 线程初始化 kafka 生产者及消费者。

let _init_task = tokio::spawn(async {
    let simple_kafka_config:simple_kafka::KafkaConfig = kafka_config.to_owned().into();
    simple_kafka::kafka_init::init_producers(&simple_kafka_config).await;
    simple_kafka::kafka_init::init_consumers(&simple_kafka_config,"test-topic", message_handler).await;
});

如果有多个 topic 需要进行消费,需要 init_consumers 多次。

发送消息

// let _= kafka_producer::send(topic,"key","测试下kafka消息1111".as_bytes()).await;
// let _= kafka_producer::send_result(topic,"key","测试下kafka消息1111".as_bytes()).await;
let _= kafka_producer::send_timeout(topic,"key","测试下kafka消息1111".as_bytes(),Duration::from_secs(3)).await;

提供了 test_api,

在程序启动之后,可以通过 http://127.0.0.1:8088/test/send 进行发送测试。 由于启动时也初始化了消费者,所以也能消费到这个消息。 在日志 app.log 中有体现。

2023-10-09 22:16:15 INFO [simple_kafka::kafka_producer:19] create kafka producer,brokers=localhost:9092
2023-10-09 22:16:15 INFO [simple_kafka::kafka_init:24] init producer done
2023-10-09 22:16:15 INFO [simple_kafka:61] register consumer: "test-topic"
2023-10-09 22:16:15 INFO [simple_kafka:84] creating consumer topic:test-topic 
2023-10-09 22:16:30 INFO [simple_kafka::kafka_producer:46] 发送kafka消息:partition:None, headers:None, key:"key", topic:test-topic, msg:测试下kafka消息1111
2023-10-09 22:16:31 INFO [simple_kafka:106] kafka consumer message. message = [Message { ptr: 0x1547068e8 }]
2023-10-09 22:16:31 INFO [simple_kafka:163] partition = 0, offset = 1618 message : "测试下kafka消息1111"

依赖项

~15–23MB
~261K SLoC