17个版本 (10个重大变更)
0.11.0 | 2024年7月30日 |
---|---|
0.10.0 | 2024年3月5日 |
0.7.0 | 2023年9月16日 |
0.6.1 | 2023年7月26日 |
0.1.0 | 2021年12月14日 |
#94 在 HTTP服务器 中
10,289 每月下载量
在 6 个Crate 中使用 6 个(直接使用4个)
2.5MB
54K SLoC
Kafka-Protocol
Rust实现的Kafka有线协议。
与其他Kafka协议实现不同,此项目使用代码生成来覆盖整个Kafka API表面,包括不同的协议版本。请参阅Kafka仓库以获取协议架构的示例。
版本控制
协议消息是根据最近的稳定版Kafka生成的,目前是 3.7.0。
尽管Kafka协议相对稳定并力求向后兼容,但偶尔也会添加新字段。为了确保与协议的前向兼容性,此crate将所有导出项标记为 #[non-exhaustive]
。可以使用 Default::default
和构建器样式方法来构建协议消息。
处理消息
使用 Default::default
use kafka_protocol::messages::{ApiKey, MetadataRequest, RequestHeader};
use kafka_protocol::protocol::StrBytes;
let mut header = RequestHeader::default();
header.client_id = Some(StrBytes::from_static_str("my-client"));
header.request_api_key = ApiKey::MetadataKey as i16;
header.request_api_version = 12;
let mut request = MetadataRequest::default();
request.topics = None;
request.allow_auto_topic_creation = true;
使用构建器样式方法
use kafka_protocol::messages::{ApiKey, MetadataRequest, RequestHeader};
use kafka_protocol::protocol::StrBytes;
let header = RequestHeader::default()
.with_client_id(Some(StrBytes::from_static_str("my-client")))
.with_request_api_key(ApiKey::MetadataKey as i16)
.with_request_api_version(12);
let request = MetadataRequest::default()
.with_topics(None)
.with_allow_auto_topic_creation(true);
序列化
一旦创建消息,可以使用 Encodable
进行序列化,将结构体写入提供的 bytes::BytesMut
。必须提供与请求头中指定的版本匹配的消息的API版本。
use bytes::BytesMut;
use kafka_protocol::messages::MetadataRequest;
use kafka_protocol::protocol::Encodable;
let mut bytes = BytesMut::new();
let request = MetadataRequest::default();
request.encode(&mut bytes, 12).unwrap();
反序列化
可以使用 Decodable
对消息进行解码,并从相应的请求中提供匹配的API版本。
use bytes::Bytes;
use kafka_protocol::messages::ApiVersionsRequest;
use kafka_protocol::protocol::Decodable;
let bytes: [u8; 25] = [
0x12, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2d,
0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2d, 0x6a, 0x61,
0x76, 0x61, 0x06, 0x32, 0x2e, 0x38, 0x2e, 0x30,
0x00
];
let res = ApiVersionsRequest::decode(&mut Bytes::from(bytes.to_vec()), 3).unwrap();
开发
在仓库的根路径下运行 cargo run -p protocol_codegen
以通过最新的Kafka协议模式生成/更新Rust代码。
依赖项
~6MB
~107K SLoC