4个版本
0.2.4 | 2023年12月31日 |
---|---|
0.2.3 | 2023年12月17日 |
0.2.2 | 2023年11月11日 |
0.2.1 | 2023年7月16日 |
0.0.1 |
|
#571 in 编码
37KB
541 行
适用于Rust的高层Kafka客户端库
一个ldap客户端库,它包装了rdkafka,使其易于使用。消息解析使用serde完成。
项目状态
目前这个项目处于早期beta阶段。库只支持异步使用。
使用方法
cargo add high-level-kafka
示例
生产者
#[tokio::main]
async fn main() -> Result<()>{
let producer_options = publisher::ProducerOptiopns::from(
"localhost:9092".to_string(),
"5000".to_string(),
5,
HashMap::new(),
);
let publisher = publisher::KafkaProducer::with_options(producer_options).unwrap();
let data = Data {
attra_one: "123".to_string(),
attra_two: 12,
};
let mut headers = HashMap::new();
headers.insert("header_one".to_string(), "value_one".to_string());
headers.insert("header_two".to_string(), "value_two".to_string());
let data = Message::new("topic".to_string(), headers, data, "key".to_string());
let result = publisher.produce(data).await;
Ok(())
}
#[derive(Serialize, Deserialize, Debug)]
struct Data {
attra_one: String,
attra_two: i8,
}
消费者
#[tokio::main]
async fn main() -> Result<()>{
let consumer = Consumer::from("group_id", "localhost:9092");
let mut_consumer = Arc::new(Mutex::new(consumer));
let mut con = mut_consumer.clone().lock_owned().await;
con.subscribe_to_topic(
"topic".to_string(),
|data: Data, medatad: Metadata| async move {
info!("data: {:?}, metadata: {:?}", data, medatad);
},
)
.await;
}
#[derive(Serialize, Deserialize, Debug)]
struct Data {
attra_one: String,
attra_two: i8,
}
可暂停消费者
这个消费者可以被暂停和恢复。当你想要暂停消费者一段时间后再恢复它时很有用。注意:这不是生产就绪版本(0.0.1)。
#[tokio::main]
async fn main() -> Result<()>{
let publisher = publisher::KafkaProducer::from("localhost:9092");
let data = Data {
attra_one: "one".to_string(),
attra_two: 2,
};
let message = publisher::Message::new(
"topic".to_string(),
HashMap::new(),
data,
"some_key".to_string(),
);
let _result = publisher.produce(message).await;
}
#[derive(Serialize, Deserialize, Debug)]
struct Data {
attra_one: String,
attra_two: i8,
}
依赖项
~14–27MB
~364K SLoC