#kafka #high-level #rdkafka #client #async #message-parser #producer-consumer

high-level-kafka

适用于Rust的高层Kafka客户端

4个版本

0.2.4 2023年12月31日
0.2.3 2023年12月17日
0.2.2 2023年11月11日
0.2.1 2023年7月16日
0.0.1 2023年6月11日

#571 in 编码

MIT/Apache

37KB
541

适用于Rust的高层Kafka客户端库

一个ldap客户端库,它包装了rdkafka,使其易于使用。消息解析使用serde完成。

项目状态

目前这个项目处于早期beta阶段。库只支持异步使用。

使用方法

cargo add high-level-kafka

示例

生产者

#[tokio::main]
async fn main() -> Result<()>{
    let producer_options = publisher::ProducerOptiopns::from(
        "localhost:9092".to_string(),
        "5000".to_string(),
        5,
        HashMap::new(),
    );
    let publisher = publisher::KafkaProducer::with_options(producer_options).unwrap();
    let data  = Data {
        attra_one: "123".to_string(),
        attra_two: 12,
    };

    let mut headers = HashMap::new();
    headers.insert("header_one".to_string(), "value_one".to_string());
    headers.insert("header_two".to_string(), "value_two".to_string());

    let data = Message::new("topic".to_string(), headers, data, "key".to_string());
    let result = publisher.produce(data).await;

    Ok(())
}

#[derive(Serialize, Deserialize, Debug)]
struct Data {
    attra_one: String,
    attra_two: i8,
}

消费者

#[tokio::main]
async fn main() -> Result<()>{

   let consumer = Consumer::from("group_id", "localhost:9092");
    let mut_consumer = Arc::new(Mutex::new(consumer));
    let mut con = mut_consumer.clone().lock_owned().await;
    con.subscribe_to_topic(
        "topic".to_string(),
        |data: Data, medatad: Metadata| async move {
            info!("data: {:?}, metadata: {:?}", data, medatad);
        },
    )
    .await;
}

#[derive(Serialize, Deserialize, Debug)]
struct Data {
    attra_one: String,
    attra_two: i8,
}

可暂停消费者

这个消费者可以被暂停和恢复。当你想要暂停消费者一段时间后再恢复它时很有用。注意:这不是生产就绪版本(0.0.1)。

#[tokio::main]
async fn main() -> Result<()>{

    let publisher = publisher::KafkaProducer::from("localhost:9092");
    let data = Data {
        attra_one: "one".to_string(),
        attra_two: 2,
    };
    let message = publisher::Message::new(
        "topic".to_string(),
        HashMap::new(),
        data,
        "some_key".to_string(),
    );
    let _result = publisher.produce(message).await;
}

#[derive(Serialize, Deserialize, Debug)]
struct Data {
    attra_one: String,
    attra_two: i8,
}

依赖项

~14–27MB
~364K SLoC