17个版本 (10个重大变更)

0.11.0 2024年7月30日
0.10.0 2024年3月5日
0.7.0 2023年9月16日
0.6.1 2023年7月26日
0.1.0 2021年12月14日

#94HTTP服务器

Download history 4785/week @ 2024-05-01 3999/week @ 2024-05-08 3527/week @ 2024-05-15 3382/week @ 2024-05-22 3653/week @ 2024-05-29 3215/week @ 2024-06-05 3381/week @ 2024-06-12 3998/week @ 2024-06-19 4582/week @ 2024-06-26 4540/week @ 2024-07-03 3987/week @ 2024-07-10 3136/week @ 2024-07-17 3573/week @ 2024-07-24 2705/week @ 2024-07-31 2047/week @ 2024-08-07 1388/week @ 2024-08-14

10,289 每月下载量
6 个Crate 中使用 6 个(直接使用4个)

MIT/Apache

2.5MB
54K SLoC

Kafka-Protocol 构建 crates.io docs.rs

Rust实现的Kafka有线协议

与其他Kafka协议实现不同,此项目使用代码生成来覆盖整个Kafka API表面,包括不同的协议版本。请参阅Kafka仓库以获取协议架构的示例。

版本控制

协议消息是根据最近的稳定版Kafka生成的,目前是 3.7.0

尽管Kafka协议相对稳定并力求向后兼容,但偶尔也会添加新字段。为了确保与协议的前向兼容性,此crate将所有导出项标记为 #[non-exhaustive]。可以使用 Default::default 和构建器样式方法来构建协议消息。

处理消息

使用 Default::default

use kafka_protocol::messages::{ApiKey, MetadataRequest, RequestHeader};
use kafka_protocol::protocol::StrBytes;

let mut header = RequestHeader::default();
header.client_id = Some(StrBytes::from_static_str("my-client"));
header.request_api_key = ApiKey::MetadataKey as i16;
header.request_api_version = 12;

let mut request = MetadataRequest::default();
request.topics = None;
request.allow_auto_topic_creation = true;

使用构建器样式方法

use kafka_protocol::messages::{ApiKey, MetadataRequest, RequestHeader};
use kafka_protocol::protocol::StrBytes;

let header = RequestHeader::default()
    .with_client_id(Some(StrBytes::from_static_str("my-client")))
    .with_request_api_key(ApiKey::MetadataKey as i16)
    .with_request_api_version(12);

let request = MetadataRequest::default()
    .with_topics(None)
    .with_allow_auto_topic_creation(true);

序列化

一旦创建消息,可以使用 Encodable 进行序列化,将结构体写入提供的 bytes::BytesMut。必须提供与请求头中指定的版本匹配的消息的API版本。

use bytes::BytesMut;
use kafka_protocol::messages::MetadataRequest;
use kafka_protocol::protocol::Encodable;

let mut bytes = BytesMut::new();
let request = MetadataRequest::default();
request.encode(&mut bytes, 12).unwrap();

反序列化

可以使用 Decodable 对消息进行解码,并从相应的请求中提供匹配的API版本。

use bytes::Bytes;
use kafka_protocol::messages::ApiVersionsRequest;
use kafka_protocol::protocol::Decodable;

let bytes: [u8; 25] = [
        0x12, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2d,
        0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2d, 0x6a, 0x61,
        0x76, 0x61, 0x06, 0x32, 0x2e, 0x38, 0x2e, 0x30,
        0x00
];
 
let res = ApiVersionsRequest::decode(&mut Bytes::from(bytes.to_vec()), 3).unwrap();

开发

在仓库的根路径下运行 cargo run -p protocol_codegen 以通过最新的Kafka协议模式生成/更新Rust代码。

最初由 @Diggsey 在最小化Kafka客户端实现 Franz 中实现。

依赖项

~6MB
~107K SLoC