5 个版本 (有破坏性变更)
0.17.0 | 2024年8月5日 |
---|---|
0.16.0 | 2023年9月26日 |
0.15.0 | 2023年7月10日 |
0.14.0 | 2022年8月3日 |
0.0.1 | 2022年1月31日 |
#41 in 编码
150,649 每月下载量
在 68 个 软件包中使用 68 个(直接使用)
1MB
19K SLoC
apache-avro
Apache Avro 在 Rust 中的库。
请参阅我们的文档以获取示例、教程和API参考。
Apache Avro 是一种数据序列化系统,它提供了丰富的数据结构和紧凑、快速的二进制数据格式。
Avro中的所有数据都是模式化的,如下例所示
{
"type": "record",
"name": "test",
"fields": [
{"name": "a", "type": "long", "default": 42},
{"name": "b", "type": "string"}
]
}
在Rust中处理Avro数据的基本有两种方式
- 基于Avro模式的数据类型;
- 兼容Rust serde的类型,实现/推导
Serialize
和Deserialize
;
apache-avro 提供了一种轻松高效地读取和写入这两种数据表示的方法。
安装库
添加到您的 Cargo.toml
[dependencies]
apache-avro = "x.y"
如果您想利用 Snappy 编码器
[dependencies.apache-avro]
version = "x.y"
features = ["snappy"]
如果您想利用 Zstandard 编码器
[dependencies.apache-avro]
version = "x.y"
features = ["zstandard"]
如果您想利用 Bzip2 编码器
[dependencies.apache-avro]
version = "x.y"
features = ["bzip"]
如果您想利用 Xz 编码器
[dependencies.apache-avro]
version = "x.y"
features = ["xz"]
升级到较新的小版本
该库仍在beta版本中,因此小版本之间可能存在向后不兼容的更改。如果您在升级时遇到问题,请查看版本升级指南。
定义一个模式
Avro 数据不能没有 Avro 架构。在编写和读取时必须使用架构,它们携带有关我们正在处理的数据类型的详细信息。Avro 架构用于架构验证和解析 Avro 数据。
Avro 架构以 JSON 格式定义,可以直接从原始字符串中解析出来。
use apache_avro::Schema;
let raw_schema = r#"
{
"type": "record",
"name": "test",
"fields": [
{"name": "a", "type": "long", "default": 42},
{"name": "b", "type": "string"}
]
}
"#;
// if the schema is not valid, this function will return an error
let schema = Schema::parse_str(raw_schema).unwrap();
// schemas can be printed for debugging
println!("{:?}", schema);
此外,可以给出一系列定义(它们可能相互依赖),所有这些定义都将解析为相应的架构。
use apache_avro::Schema;
let raw_schema_1 = r#"{
"name": "A",
"type": "record",
"fields": [
{"name": "field_one", "type": "float"}
]
}"#;
// This definition depends on the definition of A above
let raw_schema_2 = r#"{
"name": "B",
"type": "record",
"fields": [
{"name": "field_one", "type": "A"}
]
}"#;
// if the schemas are not valid, this function will return an error
let schemas = Schema::parse_list(&[raw_schema_1, raw_schema_2]).unwrap();
// schemas can be printed for debugging
println!("{:?}", schemas);
注意。 架构定义的组成需要具有名称的架构。因此,应仅将 Record、Enum 和 Fixed 类型的架构输入到此函数。
该库还提供了一个程序接口来定义架构,无需将其编码为 JSON(用于高级用途),但我们强烈建议使用 JSON 接口。如果您感兴趣,请阅读 API 参考。
有关架构和您可以在其中封装的信息类型的更多信息,请参阅Avro 规范的相关部分。
写入数据
一旦我们定义了一个架构,我们就可以开始将数据序列化为 Avro,同时验证它们与提供的架构。如前所述,在 Rust 中处理 Avro 数据有两种方式。
注意: 该库还提供了一个低级接口,用于在不生成标记和头信息的情况下以 Avro 字节码编码单个数据(用于高级用途),但我们强烈建议使用 Writer
接口以实现完全的 Avro 兼容性。如果您感兴趣,请阅读 API 参考。
Avro 方法
鉴于我们上面定义的架构是 Avro Record 的架构,我们将使用库提供的关联类型来指定我们想要序列化的数据
use apache_avro::types::Record;
use apache_avro::Writer;
// a writer needs a schema and something to write to
let mut writer = Writer::new(&schema, Vec::new());
// the Record type models our Record schema
let mut record = Record::new(writer.schema()).unwrap();
record.put("a", 27i64);
record.put("b", "foo");
// schema validation happens here
writer.append(record).unwrap();
// this is how to get back the resulting avro bytecode
// this performs a flush operation to make sure data has been written, so it can fail
// you can also call `writer.flush()` yourself without consuming the writer
let encoded = writer.into_inner().unwrap();
大多数情况下,架构倾向于定义记录为一个顶级容器,封装所有要转换为字段的值并提供它们的文档,但如果我们想直接定义 Avro 值,库通过 Value
接口提供了这种能力。
use apache_avro::types::Value;
let mut value = Value::String("foo".to_string());
serde 方法
鉴于我们上面定义的架构是 Avro Record,我们可以直接使用派生自 Serialize
的 Rust 结构体来模拟我们的数据
use apache_avro::Writer;
#[derive(Debug, Serialize)]
struct Test {
a: i64,
b: String,
}
// a writer needs a schema and something to write to
let mut writer = Writer::new(&schema, Vec::new());
// the structure models our Record schema
let test = Test {
a: 27,
b: "foo".to_owned(),
};
// schema validation happens here
writer.append_ser(test).unwrap();
// this is how to get back the resulting avro bytecode
// this performs a flush operation to make sure data is written, so it can fail
// you can also call `writer.flush()` yourself without consuming the writer
let encoded = writer.into_inner();
大多数情况下,架构倾向于定义记录为一个顶级容器,封装所有要转换为字段的值并提供它们的文档,但如果我们想直接定义 Avro 值,任何实现 Serialize
的类型都应该工作。
let mut value = "foo".to_string();
使用编解码器压缩数据
Avro 在编码数据时支持三种不同的压缩编解码器
- Null:不压缩数据;
- Deflate:使用 RFC 1951 中指定的 deflate 算法写入数据块,通常使用 zlib 库实现。请注意,此格式(与 RFC 1950 中的“zlib 格式”不同)没有校验和。
- Snappy:使用 Google 的 Snappy 压缩库。每个压缩块后面跟随 4 字节的大端 CRC32 校验和,该校验和对应于块中未压缩数据的校验和。您必须启用
snappy
功能才能使用此编解码器。 - Zstandard:使用 Facebook 的 Zstandard 压缩库。您必须启用
zstandard
功能才能使用此编解码器。 - Bzip2:使用 BZip2 压缩库。您必须启用
bzip
功能才能使用此编解码器。 - Xz:使用xz2压缩库。您必须启用
xz
功能才能使用此编解码器。
要指定用于压缩数据的编解码器,只需在创建Writer
时指定即可。
use apache_avro::Writer;
use apache_avro::Codec;
let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Deflate);
读取数据
就读取Avro编码数据而言,我们可以直接使用与数据一起编码的架构来读取它们。库将自动为我们完成,就像它对压缩编解码器所做的那样。
use apache_avro::Reader;
// reader creation can fail in case the input to read from is not Avro-compatible or malformed
let reader = Reader::new(&input[..]).unwrap();
如果我们想指定一个与数据编写时使用的架构不同(但兼容)的读取架构,我们可以按照以下步骤操作
use apache_avro::Schema;
use apache_avro::Reader;
let reader_raw_schema = r#"
{
"type": "record",
"name": "test",
"fields": [
{"name": "a", "type": "long", "default": 42},
{"name": "b", "type": "string"},
{"name": "c", "type": "long", "default": 43}
]
}
"#;
let reader_schema = Schema::parse_str(reader_raw_schema).unwrap();
// reader creation can fail in case the input to read from is not Avro-compatible or malformed
let reader = Reader::with_schema(&reader_schema, &input[..]).unwrap();
在读取数据时,库也将自动执行架构解析。
有关架构兼容性和解析的更多信息,请参阅Avro规范。
通常,在Rust中处理Avro数据有两种方法,如下所示。
注意:该库还提供了一个低级接口,用于解码Avro字节码中的单个数据项,而不带标记和头(用于高级使用),但我们强烈建议使用Reader
接口来利用所有Avro功能。如果您感兴趣,请阅读API参考。
Avro 方法
我们可以直接从Reader
迭代器中读取Value
实例。
use apache_avro::Reader;
let reader = Reader::new(&input[..]).unwrap();
// value is a Result of an Avro Value in case the read operation fails
for value in reader {
println!("{:?}", value.unwrap());
}
serde 方法
或者,我们可以使用实现Deserialize
并代表我们的架构的Rust类型来读取数据。
use apache_avro::Reader;
use apache_avro::from_value;
#[derive(Debug, Deserialize)]
struct Test {
a: i64,
b: String,
}
let reader = Reader::new(&input[..]).unwrap();
// value is a Result in case the read operation fails
for value in reader {
println!("{:?}", from_value::<Test>(&value.unwrap()));
}
整合一切
以下是如何结合到目前为止所展示的一切的示例,它旨在作为库接口的快速参考。
use apache_avro::{Codec, Reader, Schema, Writer, from_value, types::Record, Error};
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize)]
struct Test {
a: i64,
b: String,
}
fn main() -> Result<(), Error> {
let raw_schema = r#"
{
"type": "record",
"name": "test",
"fields": [
{"name": "a", "type": "long", "default": 42},
{"name": "b", "type": "string"}
]
}
"#;
let schema = Schema::parse_str(raw_schema)?;
println!("{:?}", schema);
let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Deflate);
let mut record = Record::new(writer.schema()).unwrap();
record.put("a", 27i64);
record.put("b", "foo");
writer.append(record)?;
let test = Test {
a: 27,
b: "foo".to_owned(),
};
writer.append_ser(test)?;
let input = writer.into_inner()?;
let reader = Reader::with_schema(&schema, &input[..])?;
for record in reader {
println!("{:?}", from_value::<Test>(&record?));
}
Ok(())
}
apache-avro
支持Avro规范中列出的逻辑类型。
Decimal
使用num_bigint
包。- UUID使用
uuid
包。 - 日期和时间(毫秒)作为
i32
,时间(微秒)作为i64
。 - 时间戳(毫秒和微秒)作为
i64
。 - 本地时间戳(毫秒和微秒)作为
i64
。 - 持续时间作为一个具有
months
、days
和millis
访问器方法的自定义类型,每个访问器方法都返回一个i32
。
请注意,磁盘上的表示与底层原始/复杂数据类型相同。
读取和写入逻辑类型
use apache_avro::{
types::Record, types::Value, Codec, Days, Decimal, Duration, Millis, Months, Reader, Schema,
Writer, Error,
};
use num_bigint::ToBigInt;
fn main() -> Result<(), Error> {
let raw_schema = r#"
{
"type": "record",
"name": "test",
"fields": [
{
"name": "decimal_fixed",
"type": {
"type": "fixed",
"size": 2,
"name": "decimal"
},
"logicalType": "decimal",
"precision": 4,
"scale": 2
},
{
"name": "decimal_var",
"type": "bytes",
"logicalType": "decimal",
"precision": 10,
"scale": 3
},
{
"name": "uuid",
"type": "string",
"logicalType": "uuid"
},
{
"name": "date",
"type": "int",
"logicalType": "date"
},
{
"name": "time_millis",
"type": "int",
"logicalType": "time-millis"
},
{
"name": "time_micros",
"type": "long",
"logicalType": "time-micros"
},
{
"name": "timestamp_millis",
"type": "long",
"logicalType": "timestamp-millis"
},
{
"name": "timestamp_micros",
"type": "long",
"logicalType": "timestamp-micros"
},
{
"name": "local_timestamp_millis",
"type": "long",
"logicalType": "local-timestamp-millis"
},
{
"name": "local_timestamp_micros",
"type": "long",
"logicalType": "local-timestamp-micros"
},
{
"name": "duration",
"type": {
"type": "fixed",
"size": 12,
"name": "duration"
},
"logicalType": "duration"
}
]
}
"#;
let schema = Schema::parse_str(raw_schema)?;
println!("{:?}", schema);
let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Deflate);
let mut record = Record::new(writer.schema()).unwrap();
record.put("decimal_fixed", Decimal::from(9936.to_bigint().unwrap().to_signed_bytes_be()));
record.put("decimal_var", Decimal::from((-32442.to_bigint().unwrap()).to_signed_bytes_be()));
record.put("uuid", uuid::Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000").unwrap());
record.put("date", Value::Date(1));
record.put("time_millis", Value::TimeMillis(2));
record.put("time_micros", Value::TimeMicros(3));
record.put("timestamp_millis", Value::TimestampMillis(4));
record.put("timestamp_micros", Value::TimestampMicros(5));
record.put("timestamp_nanos", Value::TimestampNanos(6));
record.put("local_timestamp_millis", Value::LocalTimestampMillis(4));
record.put("local_timestamp_micros", Value::LocalTimestampMicros(5));
record.put("local_timestamp_nanos", Value::LocalTimestampMicros(6));
record.put("duration", Duration::new(Months::new(6), Days::new(7), Millis::new(8)));
writer.append(record)?;
let input = writer.into_inner()?;
let reader = Reader::with_schema(&schema, &input[..])?;
for record in reader {
println!("{:?}", record?);
}
Ok(())
}
计算Avro架构指纹
此库支持计算以下指纹
- SHA-256
- MD5
- Rabin
以下是对支持的指纹进行指纹识别的示例
use apache_avro::rabin::Rabin;
use apache_avro::{Schema, Error};
use md5::Md5;
use sha2::Sha256;
fn main() -> Result<(), Error> {
let raw_schema = r#"
{
"type": "record",
"name": "test",
"fields": [
{"name": "a", "type": "long", "default": 42},
{"name": "b", "type": "string"}
]
}
"#;
let schema = Schema::parse_str(raw_schema)?;
println!("{}", schema.fingerprint::<Sha256>());
println!("{}", schema.fingerprint::<Md5>());
println!("{}", schema.fingerprint::<Rabin>());
Ok(())
}
格式错误的数据
为了简化解码,Avro数据的二进制编码规范要求某些字段在其数据旁边编码长度。
如果传递给Reader
的编码数据格式错误,可能存在包含数据长度的字节是无效的情况,这可能导致过度的内存分配。
为了保护用户免受格式错误的数据的影响,apache-avro
在解码数据时对其将执行的任何分配设置了限制(默认:512MB)。
如果您预计某些数据字段将超过此限制,请在读取任何数据之前务必使用 max_allocation_bytes
函数(我们利用 Rust 的 std::sync::Once
机制来初始化此值,如果在使用 max_allocation_bytes
之前调用了解码,则整个程序生命周期中的限制将为 512MB)。
use apache_avro::max_allocation_bytes;
max_allocation_bytes(2 * 1024 * 1024 * 1024); // 2GB
// ... happily decode large data
检查模式兼容性
此库支持检查模式兼容性。
检查兼容性的示例
- 兼容的模式
说明:int 数组模式可以被 long 数组模式读取——一个 int(32位有符号整数)可以放入一个 long(64位有符号整数)中
use apache_avro::{Schema, schema_compatibility::SchemaCompatibility};
let writers_schema = Schema::parse_str(r#"{"type": "array", "items":"int"}"#).unwrap();
let readers_schema = Schema::parse_str(r#"{"type": "array", "items":"long"}"#).unwrap();
assert!(SchemaCompatibility::can_read(&writers_schema, &readers_schema).is_ok());
- 不兼容的模式(long 数组模式不能被 int 数组模式读取)
说明:long 数组模式不能被 int 数组模式读取——一个 long(64位有符号整数)不能放入一个 int(32位有符号整数)中
use apache_avro::{Schema, schema_compatibility::SchemaCompatibility};
let writers_schema = Schema::parse_str(r#"{"type": "array", "items":"long"}"#).unwrap();
let readers_schema = Schema::parse_str(r#"{"type": "array", "items":"int"}"#).unwrap();
assert!(SchemaCompatibility::can_read(&writers_schema, &readers_schema).is_err());
自定义名称验证器
默认情况下,库遵循Avro 规范的规则!
其他一些 Apache Avro 语言 SDK 不那么严格,允许在名称中使用更多字符。为了与这些 SDK 兼容,库提供了一种自定义名称验证的方法。
use apache_avro::AvroResult;
use apache_avro::schema::Namespace;
use apache_avro::validator::{SchemaNameValidator, set_schema_name_validator};
struct MyCustomValidator;
impl SchemaNameValidator for MyCustomValidator {
fn validate(&self, name: &str) -> AvroResult<(String, Namespace)> {
todo!()
}
}
// don't parse any schema before registering the custom validator(s) !
set_schema_name_validator(Box::new(MyCustomValidator));
// ... use the library
可以应用类似的逻辑来验证模式命名空间、枚举符号和字段名称。
注意:库允许在每个应用程序生命周期中仅设置验证器一次!如果应用程序在设置验证器之前解析了模式,则将注册并使用默认验证器!
自定义模式相等比较器
库提供了两个模式相等比较器的实现
SpecificationEq
- 一个将模式序列化为它们的规范形式(即 JSON)并将它们作为字符串比较的比较器。这是直到 apache_avro 0.16.0 的唯一实现。有关更多信息,请参阅Avro 规范!StructFieldEq
- 一个结构上比较模式的比较器。它比SpecificationEq
快,因为它一发现差异就返回false
,并建议使用!自 apache_avro 0.17.0 起是默认比较器。
要使用自定义比较器,您需要实现 SchemataEq
特性并使用 set_schemata_equality_comparator
函数设置它
use apache_avro::{AvroResult, Schema};
use apache_avro::schema::Namespace;
use apache_avro::schema_equality::{SchemataEq, set_schemata_equality_comparator};
#[derive(Debug)]
struct MyCustomSchemataEq;
impl SchemataEq for MyCustomSchemataEq {
fn compare(&self, schema_one: &Schema, schema_two: &Schema) -> bool {
todo!()
}
}
// don't parse any schema before registering the custom comparator !
set_schemata_equality_comparator(Box::new(MyCustomSchemataEq));
// ... use the library
注意:库允许在每个应用程序生命周期中仅设置比较器一次!如果应用程序在设置比较器之前解析了模式,则将注册并使用默认比较器!
最小支持的 Rust 版本
1.73.0
许可证
此项目受Apache 许可证 2.0许可。
贡献
鼓励每个人贡献!您可以通过在 GitHub 仓库上分叉并提交拉取请求或打开一个问题来贡献。所有贡献都将受Apache 许可证 2.0许可。
请考虑添加文档和测试!如果您引入了不兼容的更改,请考虑在迁移指南中添加迁移说明。如果您修改了 lib.rs
中的 crate 文档,请运行 make readme
以同步 README 文件。
依赖关系
约 5–7MB
~128K SLoC