#json-schema #schema #kafka #avro #protobuf #codec #async-version

schema_registry_converter

使用康弗伦斯模式注册表(Confluent Schema Registry)对kafka进行数据编码和解码

15个版本 (稳定版)

4.1.0 2024年6月16日
4.0.0 2023年10月21日
3.1.0 2022年11月20日
2.1.0 2021年7月11日
0.3.1 2018年9月29日

#57 in 编码

Download history 4380/week @ 2024-04-27 6026/week @ 2024-05-04 5782/week @ 2024-05-11 5254/week @ 2024-05-18 5672/week @ 2024-05-25 6705/week @ 2024-06-01 5459/week @ 2024-06-08 7662/week @ 2024-06-15 7823/week @ 2024-06-22 7059/week @ 2024-06-29 9812/week @ 2024-07-06 8367/week @ 2024-07-13 8509/week @ 2024-07-20 7198/week @ 2024-07-27 6868/week @ 2024-08-03 5301/week @ 2024-08-10

每月下载量 29,074
3 crates 中使用

MIT/Apache

525KB
11K SLoC

schema_registry_converter

Build Status codecov Crates.io Crates.io docs.rs

该库提供了一种使用与Java客户端兼容的方式使用康弗伦斯模式注册表(Confluent Schema Registry)。由于 Karapace 兼容API,因此也可以与该库一起使用。发布说明可以在 github 上找到。支持消费/解码和生成/编码。解码时还可以提供要使用的模式。也可以在解码时包含引用。如果没有提供模式,将使用与同一 subject 相同的最新模式。

与Java版本相比,它应该具有完整的特性。如果缺少任何内容或不符合预期,请创建一个问题或在 github讨论 中发起讨论。关于如何使用此库以异步方式与protobuf一起将数据发送到Kafka的示例可以在 ksqlDB-GraphQL-poc 中找到。关于此库的一些背景信息可以在标题为 confluent Schema Registry and Rust 的博客中找到。

入门指南

schema_registry_converter.rs 在crates.io上可用。建议在那里查找最新和更详细的文档。它有几个功能标志,请确保正确设置它们。

要使用它进行avro异步转换

[dependencies]
schema_registry_converter = { version = "4.0.0", features = ["avro"] }

为了简化,存在一些 easy 变体,它们在内部有一个arc。这使得使用它更加容易,但代价是增加了一些开销。要使用 easy 变体,请添加 easy 功能,并使用以 Easy 开头的结构体来进行转换。

[dependencies]
schema_registry_converter = { version = "4.0.0", features = ["easy", "avro"] }

...有关如何使用的更多信息,请参阅 文档

所有转换器也都有阻塞(非异步)版本,在这种情况下可以使用如下方式:

[dependencies]
schema_registry_converter = { version = "4.0.0", default-features = false, features = ["avro", "blocking"] }

如果你需要在项目中同时使用它们,可以使用类似的方式,但必须注意根据你的使用导入正确的路径。

[dependencies]
schema_registry_converter = { version = "4.0.0", features = ["avro", "blocking"] }

消费者

对于使用模式注册表编码的消息,您需要从模式注册表中获取正确的模式以将其转换为记录。为了清晰起见,图中省略了错误处理。

Consumer activity flow

生产者

为了产生其他客户端可以正确消费的消息,必须将适当的ID与消息一起编码。为了获取正确的ID,可能需要注册新的模式。为了清晰起见,图中省略了错误处理。

Producer activity flow

使用Avro(阻塞)的消费者和生产者示例

这些示例同时执行消费/解码和生产/编码。要使用Avro中的struct,它们必须实现serde::Deserializeserde::Serialize trait。这些示例对于从1.x.x版本开始更新特别有用,当你开始时,你可能想使用异步版本。

use rdkafka::message::{Message, BorrowedMessage};
use apache_avro::types::Value;
use schema_registry_converter::blocking::{Decoder, Encoder};
use schema_registry_converter::blocking::schema_registry::SubjectNameStrategy;

fn main() {
    let decoder = Decoder::new(SrSettings::new(String::from("https://127.0.0.1:8081")));
    let encoder = Encoder::new(SrSettings::new(String::from("https://127.0.0.1:8081")));
    let hb = get_heartbeat(msg, &decoder);
    let record = get_future_record_from_struct("hb", Some("id"), hb, &encoder);
    producer.send(record);
}

fn get_value<'a>(
    msg: &'a BorrowedMessage,
    decoder: &'a Decoder,
) -> Value {
    match decoder.decode(msg.payload()) {
        Ok(v) => v,
        Err(e) => panic!("Error getting value: {}", e),
    }
}

fn get_heartbeat<'a>(
    msg: &'a BorrowedMessage,
    decoder: &'a Decoder,
) -> Heartbeat {
    match decoder.decode_with_name(msg.payload()) {
        Ok((name, value)) => {
            match name.name.as_str() {
                "Heartbeat" => {
                    match name.namespace {
                        Some(namespace) => {
                            match namespace.as_str() {
                                "nl.openweb.data" => from_value::<Heartbeat>(&value).unwrap(),
                                ns => panic!("Unexpected namespace {}", ns),
                            }
                        }
                        None => panic!("No namespace in schema, while expected"),
                    }
                }
                name => panic!("Unexpected name {}", name),
            }
        }
        Err(e) => panic!("error getting heartbeat: {}", e),
    }
}

fn get_future_record<'a>(
    topic: &'a str,
    key: Option<&'a str>,
    values: Vec<(&'static str, Value)>,
    encoder: &'a Encoder,
) -> FutureRecord<'a> {
    let subject_name_strategy = SubjectNameStrategy::TopicNameStrategy(topic, false);
    let payload = match encoder.encode(values, &subject_name_strategy) {
        Ok(v) => v,
        Err(e) => panic!("Error getting payload: {}", e),
    };
    FutureRecord {
        topic,
        partition: None,
        payload: Some(&payload),
        key,
        timestamp: None,
        headers: None,
    }
}

fn get_future_record_from_struct<'a>(
    topic: &'a str,
    key: Option<&'a str>,
    heartbeat: Heartbeat,
    encoder: &'a Encoder,
) -> FutureRecord<'a> {
    let subject_name_strategy = SubjectNameStrategy::TopicNameStrategy(topic, false);
    let payload = match encoder.encode_struct(heartbeat, &subject_name_strategy) {
        Ok(v) => v,
        Err(e) => panic!("Error getting payload: {}", e),
    };
    FutureRecord {
        topic,
        partition: None,
        payload: Some(&payload),
        key,
        timestamp: None,
        headers: None,
    }
}

直接与模式注册表交互

一些函数已被公开,以便可以使用此库直接获取所有主题、主题的所有版本或带有主题和版本的原始模式。对于这些,请参阅集成测试的异步阻塞版本。

将模式发布到模式注册表的示例

use schema_registry_converter::blocking::schema_registry::{
    post_schema,
    SuppliedSchema
};

fn main() {
    let schema = SuppliedSchema {
        name: String::from("nl.openweb.data.Heartbeat"),
        schema_type: SchemaType::AVRO,
        schema: String::from(r#"{"type":"record","name":"Heartbeat","namespace":"nl.openweb.data","fields":[{"name":"beat","type":"long"}]}"#),
        references: vec![],
    };
    let result = post_schema("https://127.0.0.1:8081/subjects/test-value/versions", heartbeat_schema);
}

转换的avro部分由avro-rs处理。因此,我没有包含每个可能模式的测试。虽然我使用了rdkafka来成功地从Kafka消费和向Kafka生产,并且在示例中使用它,但此crate没有直接依赖它。此crate所做的一切只是转换[u8] <-> Some Value(基于使用的转换器)。使用Json和Protobuf时,会引入其他依赖项,通过使用这些功能。我已经尝试将所有错误封装在SRCError类型中。因此,即使你得到一个panic/error,它也可能是一个SRCError,它可能来自依赖项之一。请在创建问题之前确保你正确地使用了库,并且错误不是由依赖项引起的。

集成测试

集成测试需要运行在默认端口上的Kafka集群。它将创建主题、注册模式、生产并消费一些消息。它们只有在使用kafka_test功能编译时才包括在内,因此要包括它们在测试中,需要运行cargo test --verbose --all-features -- --test-threads=1。'prepare_integration_test.sh'脚本可以用来创建测试所需的3个主题。为了确保Java兼容性,还需要运行schema-registry-test-app docker镜像。

许可证

此项目受以下任一许可证的许可:

任选其一。

贡献

除非您明确说明,否则根据 Apache-2.0 许可证定义,您有意提交以供 Schema Registry Converter 包含在内的任何贡献,应双许可如上所述,无需任何额外条款或条件。

依赖关系

~5–19MB
~290K SLoC