15个版本 (稳定版)
4.1.0 | 2024年6月16日 |
---|---|
4.0.0 | 2023年10月21日 |
3.1.0 | 2022年11月20日 |
2.1.0 | 2021年7月11日 |
0.3.1 | 2018年9月29日 |
#57 in 编码
每月下载量 29,074
在 3 crates 中使用
525KB
11K SLoC
schema_registry_converter
该库提供了一种使用与Java客户端兼容的方式使用康弗伦斯模式注册表(Confluent Schema Registry)。由于 Karapace 兼容API,因此也可以与该库一起使用。发布说明可以在 github 上找到。支持消费/解码和生成/编码。解码时还可以提供要使用的模式。也可以在解码时包含引用。如果没有提供模式,将使用与同一 subject
相同的最新模式。
与Java版本相比,它应该具有完整的特性。如果缺少任何内容或不符合预期,请创建一个问题或在 github讨论 中发起讨论。关于如何使用此库以异步方式与protobuf一起将数据发送到Kafka的示例可以在 ksqlDB-GraphQL-poc 中找到。关于此库的一些背景信息可以在标题为 confluent Schema Registry and Rust 的博客中找到。
入门指南
schema_registry_converter.rs 在crates.io上可用。建议在那里查找最新和更详细的文档。它有几个功能标志,请确保正确设置它们。
要使用它进行avro异步转换
[dependencies]
schema_registry_converter = { version = "4.0.0", features = ["avro"] }
为了简化,存在一些 easy
变体,它们在内部有一个arc。这使得使用它更加容易,但代价是增加了一些开销。要使用 easy
变体,请添加 easy
功能,并使用以 Easy
开头的结构体来进行转换。
[dependencies]
schema_registry_converter = { version = "4.0.0", features = ["easy", "avro"] }
...有关如何使用的更多信息,请参阅 文档。
所有转换器也都有阻塞(非异步)版本,在这种情况下可以使用如下方式:
[dependencies]
schema_registry_converter = { version = "4.0.0", default-features = false, features = ["avro", "blocking"] }
如果你需要在项目中同时使用它们,可以使用类似的方式,但必须注意根据你的使用导入正确的路径。
[dependencies]
schema_registry_converter = { version = "4.0.0", features = ["avro", "blocking"] }
消费者
对于使用模式注册表编码的消息,您需要从模式注册表中获取正确的模式以将其转换为记录。为了清晰起见,图中省略了错误处理。
生产者
为了产生其他客户端可以正确消费的消息,必须将适当的ID与消息一起编码。为了获取正确的ID,可能需要注册新的模式。为了清晰起见,图中省略了错误处理。
使用Avro(阻塞)的消费者和生产者示例
这些示例同时执行消费/解码和生产/编码。要使用Avro中的struct,它们必须实现serde::Deserialize
或serde::Serialize
trait。这些示例对于从1.x.x版本开始更新特别有用,当你开始时,你可能想使用异步版本。
use rdkafka::message::{Message, BorrowedMessage};
use apache_avro::types::Value;
use schema_registry_converter::blocking::{Decoder, Encoder};
use schema_registry_converter::blocking::schema_registry::SubjectNameStrategy;
fn main() {
let decoder = Decoder::new(SrSettings::new(String::from("http://localhost:8081")));
let encoder = Encoder::new(SrSettings::new(String::from("http://localhost:8081")));
let hb = get_heartbeat(msg, &decoder);
let record = get_future_record_from_struct("hb", Some("id"), hb, &encoder);
producer.send(record);
}
fn get_value<'a>(
msg: &'a BorrowedMessage,
decoder: &'a Decoder,
) -> Value {
match decoder.decode(msg.payload()) {
Ok(v) => v,
Err(e) => panic!("Error getting value: {}", e),
}
}
fn get_heartbeat<'a>(
msg: &'a BorrowedMessage,
decoder: &'a Decoder,
) -> Heartbeat {
match decoder.decode_with_name(msg.payload()) {
Ok((name, value)) => {
match name.name.as_str() {
"Heartbeat" => {
match name.namespace {
Some(namespace) => {
match namespace.as_str() {
"nl.openweb.data" => from_value::<Heartbeat>(&value).unwrap(),
ns => panic!("Unexpected namespace {}", ns),
}
}
None => panic!("No namespace in schema, while expected"),
}
}
name => panic!("Unexpected name {}", name),
}
}
Err(e) => panic!("error getting heartbeat: {}", e),
}
}
fn get_future_record<'a>(
topic: &'a str,
key: Option<&'a str>,
values: Vec<(&'static str, Value)>,
encoder: &'a Encoder,
) -> FutureRecord<'a> {
let subject_name_strategy = SubjectNameStrategy::TopicNameStrategy(topic, false);
let payload = match encoder.encode(values, &subject_name_strategy) {
Ok(v) => v,
Err(e) => panic!("Error getting payload: {}", e),
};
FutureRecord {
topic,
partition: None,
payload: Some(&payload),
key,
timestamp: None,
headers: None,
}
}
fn get_future_record_from_struct<'a>(
topic: &'a str,
key: Option<&'a str>,
heartbeat: Heartbeat,
encoder: &'a Encoder,
) -> FutureRecord<'a> {
let subject_name_strategy = SubjectNameStrategy::TopicNameStrategy(topic, false);
let payload = match encoder.encode_struct(heartbeat, &subject_name_strategy) {
Ok(v) => v,
Err(e) => panic!("Error getting payload: {}", e),
};
FutureRecord {
topic,
partition: None,
payload: Some(&payload),
key,
timestamp: None,
headers: None,
}
}
直接与模式注册表交互
一些函数已被公开,以便可以使用此库直接获取所有主题、主题的所有版本或带有主题和版本的原始模式。对于这些,请参阅集成测试的异步或阻塞版本。
将模式发布到模式注册表的示例
use schema_registry_converter::blocking::schema_registry::{
post_schema,
SuppliedSchema
};
fn main() {
let schema = SuppliedSchema {
name: String::from("nl.openweb.data.Heartbeat"),
schema_type: SchemaType::AVRO,
schema: String::from(r#"{"type":"record","name":"Heartbeat","namespace":"nl.openweb.data","fields":[{"name":"beat","type":"long"}]}"#),
references: vec![],
};
let result = post_schema("http://localhost:8081/subjects/test-value/versions", heartbeat_schema);
}
与其他库的关系
转换的avro部分由avro-rs处理。因此,我没有包含每个可能模式的测试。虽然我使用了rdkafka来成功地从Kafka消费和向Kafka生产,并且在示例中使用它,但此crate没有直接依赖它。此crate所做的一切只是转换[u8] <-> Some Value(基于使用的转换器)。使用Json和Protobuf时,会引入其他依赖项,通过使用这些功能。我已经尝试将所有错误封装在SRCError类型中。因此,即使你得到一个panic/error,它也可能是一个SRCError,它可能来自依赖项之一。请在创建问题之前确保你正确地使用了库,并且错误不是由依赖项引起的。
集成测试
集成测试需要运行在默认端口上的Kafka集群。它将创建主题、注册模式、生产并消费一些消息。它们只有在使用kafka_test
功能编译时才包括在内,因此要包括它们在测试中,需要运行cargo test --verbose --all-features -- --test-threads=1
。'prepare_integration_test.sh'脚本可以用来创建测试所需的3个主题。为了确保Java兼容性,还需要运行schema-registry-test-app docker镜像。
许可证
此项目受以下任一许可证的许可:
- Apache License, Version 2.0, (LICENSE-APACHE或http://www.apache.org/licenses/LICENSE-2.0)
- 麻省理工学院许可证(LICENSE-MIT 或 http://opensource.org/licenses/MIT)
任选其一。
贡献
除非您明确说明,否则根据 Apache-2.0 许可证定义,您有意提交以供 Schema Registry Converter 包含在内的任何贡献,应双许可如上所述,无需任何额外条款或条件。
依赖关系
~5–19MB
~290K SLoC