1个不稳定版本
新功能 0.1.0 | 2024年7月29日 |
---|
#56 在 #pubsub
每月137次下载
18KB
197 行
Ene Kafka:Rust的意见式Kafka框架
简介
Ene kafka是Rust的Kafka客户端,旨在简化Rust程序与Apache Kafka的交互。Ene kafka的主要目标是使构建使用Kafka作为其分布式消息系统的事件驱动微服务的过程变得更加简单快捷。
Ene Kafka使用云事件来定义消费和生产的消息结构。
示例
创建一个事件
#[derive(KafkaMessage, Serialize, CloudEvent, Debug, Deserialize, DeserializeFrom)]
#[kafka(topic = "test", serde = Json, key = entity_id, headers = CloudEvent)]
#[cloud_event(
content_type = "application/json",
version = "1.0",
event_type = "com.ene.entity.created.v1",
event_source = "https://ene-kafka.com/docs/cloudevents/entity/created"
)]
struct EntityCreated {
pub entity_id: i64,
pub organisation_id: i64,
}
生产一个事件
let producer: KafkaProducer = kafka_producer!(bootstrap_servers = bootstrap_servers.clone());
let event = EntityCreated {
entity_id: 1755,
organisation_id: 42,
};
producer.send(event).await?;
消费一个事件
// Create a handler to process the event
#[derive(EventHandler)]
#[event_handler(event = EntityCreated, handler = handle_entity_created_event)]
struct EntityCreatedEventHandler {}
impl EntityCreatedEventHandler {
async fn handle_entity_created_event(&self, event: &EntityCreated) -> ene_kafka::KafkaResult<()> {
println!("EntityCreatedEventHandler: {:?}", event);
// Do something with the event
Ok(())
}
}
// Create a consumer that listens to the topic and registers the handler
let consumer = kafka_consumer!(
topic = KafkaTopic {
name: "test".to_string(),
content_type: ContentType::Json
},
// Dead letter queueing is included in the consumer!
dlq_topic = KafkaTopic {
name: "test-dlq".to_string(),
content_type: ContentType::Json
},
consumer_group_id = "test-group",
bootstrap_servers = bootstrap_servers,
handlers = [
EntityCreatedEventHandler -> EntityCreatedEventHandler {}
]
);
更多示例,请查看仓库中的examples文件夹。
功能
-
得益于Rust宏的易用性:Ene Kafka使用Rust宏来简化定义Kafka消息以及生产和使用它们的过程。宏帮助抽象了大量样板代码,使您能够专注于最重要的部分:您的业务逻辑。
-
云事件:Ene Kafka支持事件消息的CloudEvents规范。
-
死信队列:Ene Kafka支持死信队列,用于处理无法处理的消息。
-
自动(反)序列化:Ene Kafka自动将消息序列化和反序列化为指定的事件类型。
-
可扩展性:Ene Kafka的设计考虑了可扩展性(尽管这仍是一个持续进行的工作)。应该可以使用不同的底层客户端来使用Kafka,或者使用其他序列化库代替serde。
限制
-
仅支持JSON格式:Ene Kafka目前仅支持JSON序列化和反序列化。不支持Avro或Protobuf。
-
仅支持rdKafka作为Kafka客户端
-
Ene Kafka围绕云事件构建,这可能不适合所有用例。
-
有限的消费者和生产者配置:Ene Kafka 的一个目标是抽象掉 Kafka 的所有技术细节,使其更容易快速且可靠地实现事件驱动的微服务。然而,如果消费者和生产者配置能够更加灵活那就更好了。这实际上是一件可以相当容易实现的事情。
欢迎贡献
Ene Kafka 是一个开源项目,我们欢迎社区的贡献。如果您有任何改进或新功能的想法,请随时提交问题或拉取请求。
依赖项
~3MB
~64K SLoC