23个版本 (12个破坏性更新)
0.13.0 | 2021年1月29日 |
---|---|
0.12.0 | 2020年11月27日 |
0.11.0 | 2020年8月13日 |
0.10.0 | 2020年5月31日 |
0.4.1 | 2018年6月17日 |
#146 in 数据结构
每月41,390次下载
用于 27 个crate(18个直接使用)
315KB
6.5K SLoC
avro-rs
在Rust中处理Apache Avro的库。
请查阅我们的文档以获取示例、教程和API参考。
Apache Avro是一个数据序列化系统,它提供了丰富的数据结构和一个紧凑、快速的二进制数据格式。
Avro中的所有数据都是模式化的,如下例所示
{
"type": "record",
"name": "test",
"fields": [
{"name": "a", "type": "long", "default": 42},
{"name": "b", "type": "string"}
]
}
在Rust中处理Avro数据有两种基本方法
- 基于Avro模式的数据类型;
- 实现/推导
Serialize
和Deserialize
的通用Rust serde兼容类型;
avro-rs提供了一种轻松高效地读取和写入这两种数据表示的方法。
安装库
将其添加到您的Cargo.toml
[dependencies]
avro-rs = "x.y"
或者如果您想利用Snappy
编解码器
[dependencies.avro-rs]
version = "x.y"
features = ["snappy"]
升级到较新的次版本
该库仍在测试阶段,因此次版本之间可能存在不兼容的更改。如果您在升级时遇到问题,请查看版本升级指南。
定义模式
没有Avro模式,Avro数据无法存在。在写入时必须使用模式,在读取时也可以使用模式,它们携带有关我们处理的数据类型的信
息。Avro模式用于模式验证和解析Avro数据。
use avro_rs::Schema;
let raw_schema = r#"
{
"type": "record",
"name": "test",
"fields": [
{"name": "a", "type": "long", "default": 42},
{"name": "b", "type": "string"}
]
}
"#;
// if the schema is not valid, this function will return an error
let schema = Schema::parse_str(raw_schema).unwrap();
// schemas can be printed for debugging
println!("{:?}", schema);
Avro模式以JSON
格式定义,可以直接从原始字符串解析出来。
use avro_rs::Schema;
let raw_schema_1 = r#"{
"name": "A",
"type": "record",
"fields": [
{"name": "field_one", "type": "float"}
]
}"#;
// This definition depends on the definition of A above
let raw_schema_2 = r#"{
"name": "B",
"type": "record",
"fields": [
{"name": "field_one", "type": "A"}
]
}"#;
// if the schemas are not valid, this function will return an error
let schemas = Schema::parse_list(&[raw_schema_1, raw_schema_2]).unwrap();
// schemas can be printed for debugging
println!("{:?}", schemas);
注意。 需要注意的是,模式定义的组成需要带有名称的方案。因此,只有类型为Record、Enum和Fixed的方案应输入此函数。
此外,该库还提供了一种程序接口来定义不将其编码在JSON中的方案(用于高级用途),但我们强烈推荐使用JSON接口。如果您对此感兴趣,请阅读API参考。
有关方案以及您可以在其中封装的信息类型的更多信息,请参阅Avro规范的相关部分。
写入数据
一旦我们定义了方案,我们就可以在Avro中对数据进行序列化,并在过程中对其提供的方案进行验证。如前所述,在Rust中处理Avro数据有两种方式。
注意: 该库还提供了一个低级接口,用于对单个数据项进行Avro字节码编码,而不生成标记和标题(用于高级用途),但我们强烈推荐使用Writer
接口以实现完全的Avro兼容性。如果您对此感兴趣,请阅读API参考。
Avro方法
鉴于我们上面定义的方案是Avro Record的方案,我们将使用库提供的关联类型来指定我们想要序列化的数据。
use avro_rs::types::Record;
use avro_rs::Writer;
#
// a writer needs a schema and something to write to
let mut writer = Writer::new(&schema, Vec::new());
// the Record type models our Record schema
let mut record = Record::new(writer.schema()).unwrap();
record.put("a", 27i64);
record.put("b", "foo");
// schema validation happens here
writer.append(record).unwrap();
// this is how to get back the resulting avro bytecode
// this performs a flush operation to make sure data has been written, so it can fail
// you can also call `writer.flush()` yourself without consuming the writer
let encoded = writer.into_inner().unwrap();
在大多数情况下,方案倾向于将记录定义为一个顶层容器,封装所有要转换的字段并为其提供文档,但如果我们想要直接定义一个Avro值,库通过Value
接口提供了这种能力。
use avro_rs::types::Value;
let mut value = Value::String("foo".to_string());
serde方法
鉴于我们上面定义的方案是Avro Record,我们可以直接使用Rust结构体,该结构体继承自Serialize
来模拟我们的数据。
use avro_rs::Writer;
#[derive(Debug, Serialize)]
struct Test {
a: i64,
b: String,
}
// a writer needs a schema and something to write to
let mut writer = Writer::new(&schema, Vec::new());
// the structure models our Record schema
let test = Test {
a: 27,
b: "foo".to_owned(),
};
// schema validation happens here
writer.append_ser(test).unwrap();
// this is how to get back the resulting avro bytecode
// this performs a flush operation to make sure data is written, so it can fail
// you can also call `writer.flush()` yourself without consuming the writer
let encoded = writer.into_inner();
在大多数情况下,方案倾向于将记录定义为一个顶层容器,封装所有要转换的字段并为其提供文档,但如果我们想要直接定义一个Avro值,任何实现Serialize
的类型都应适用。
let mut value = "foo".to_string();
使用编解码器压缩数据
在编码数据时,Avro支持三种不同的压缩编解码器。
- Null:不压缩数据;
- Deflate:使用RFC 1951中指定的deflate算法编写数据块,通常使用zlib库实现。请注意,此格式(与RFC 1950中的“zlib格式”不同)没有校验和。
- Snappy:使用Google的Snappy压缩库。每个压缩块后面跟着该块中未压缩数据的4字节、大端CRC32校验和。您必须启用
snappy
功能才能使用此编解码器。
要指定用于压缩数据的编解码器,只需在创建Writer
时指定即可。
use avro_rs::Writer;
use avro_rs::Codec;
#
let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Deflate);
读取数据
至于读取Avro编码数据,我们只需使用与数据编码一起编码的方案即可读取它们。库会自动为我们完成,就像它已经为我们完成压缩编解码器一样。
use avro_rs::Reader;
#
// reader creation can fail in case the input to read from is not Avro-compatible or malformed
let reader = Reader::new(&input[..]).unwrap();
如果,相反,我们想要指定与数据写入时使用的方案不同(但兼容)的读取方案,我们可以按照以下方式操作
use avro_rs::Schema;
use avro_rs::Reader;
#
let reader_raw_schema = r#"
{
"type": "record",
"name": "test",
"fields": [
{"name": "a", "type": "long", "default": 42},
{"name": "b", "type": "string"},
{"name": "c", "type": "long", "default": 43}
]
}
"#;
let reader_schema = Schema::parse_str(reader_raw_schema).unwrap();
// reader creation can fail in case the input to read from is not Avro-compatible or malformed
let reader = Reader::with_schema(&reader_schema, &input[..]).unwrap();
在读取数据时,库也会自动执行方案解析。
有关方案兼容性和解析的更多信息,请参阅Avro规范。
通常,在Rust中处理Avro数据有两种方式,如下所示。
注意: 该库还提供了一个低级接口,用于解码Avro字节码中的单个数据项,而不进行标记和标题(用于高级用途),但我们强烈推荐使用Reader
接口以利用所有Avro功能。如果您对此感兴趣,请阅读API参考。
Avro方法
我们可以直接从 Reader
迭代器中读取 Value
的实例
use avro_rs::Reader;
#
let reader = Reader::new(&input[..]).unwrap();
// value is a Result of an Avro Value in case the read operation fails
for value in reader {
println!("{:?}", value.unwrap());
}
serde方法
或者,我们可以使用实现 Deserialize
的 Rust 类型来表示我们的模式,并将数据读入
use avro_rs::Reader;
use avro_rs::from_value;
#[derive(Debug, Deserialize)]
struct Test {
a: i64,
b: String,
}
let reader = Reader::new(&input[..]).unwrap();
// value is a Result in case the read operation fails
for value in reader {
println!("{:?}", from_value::<Test>(&value.unwrap()));
}
将一切组合在一起
以下是如何结合到目前为止所展示的一切的示例,它旨在成为库接口的快速参考
use avro_rs::{Codec, Reader, Schema, Writer, from_value, types::Record, Error};
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize)]
struct Test {
a: i64,
b: String,
}
fn main() -> Result<(), Error> {
let raw_schema = r#"
{
"type": "record",
"name": "test",
"fields": [
{"name": "a", "type": "long", "default": 42},
{"name": "b", "type": "string"}
]
}
"#;
let schema = Schema::parse_str(raw_schema)?;
println!("{:?}", schema);
let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Deflate);
let mut record = Record::new(writer.schema()).unwrap();
record.put("a", 27i64);
record.put("b", "foo");
writer.append(record)?;
let test = Test {
a: 27,
b: "foo".to_owned(),
};
writer.append_ser(test)?;
let input = writer.into_inner()?;
let reader = Reader::with_schema(&schema, &input[..])?;
for record in reader {
println!("{:?}", from_value::<Test>(&record?));
}
Ok(())
}
avro-rs
也支持 Avro 规范中列出的逻辑类型(见Avro 规范)
- 使用
num_bigint
包来使用Decimal
- 使用
uuid
包来使用 UUID - 日期和时间(毫秒)作为
i32
,时间(微秒)作为i64
- 时间戳(毫秒和微秒)作为
i64
- 持续时间作为具有
months
、days
和millis
访问器的自定义类型,每个访问器都返回一个i32
注意,磁盘表示形式与底层原始/复杂类型相同。
读取和写入逻辑类型
use avro_rs::{
types::Record, types::Value, Codec, Days, Decimal, Duration, Millis, Months, Reader, Schema,
Writer, Error,
};
use num_bigint::ToBigInt;
fn main() -> Result<(), Error> {
let raw_schema = r#"
{
"type": "record",
"name": "test",
"fields": [
{
"name": "decimal_fixed",
"type": {
"type": "fixed",
"size": 2,
"name": "decimal"
},
"logicalType": "decimal",
"precision": 4,
"scale": 2
},
{
"name": "decimal_var",
"type": "bytes",
"logicalType": "decimal",
"precision": 10,
"scale": 3
},
{
"name": "uuid",
"type": "string",
"logicalType": "uuid"
},
{
"name": "date",
"type": "int",
"logicalType": "date"
},
{
"name": "time_millis",
"type": "int",
"logicalType": "time-millis"
},
{
"name": "time_micros",
"type": "long",
"logicalType": "time-micros"
},
{
"name": "timestamp_millis",
"type": "long",
"logicalType": "timestamp-millis"
},
{
"name": "timestamp_micros",
"type": "long",
"logicalType": "timestamp-micros"
},
{
"name": "duration",
"type": {
"type": "fixed",
"size": 12,
"name": "duration"
},
"logicalType": "duration"
}
]
}
"#;
let schema = Schema::parse_str(raw_schema)?;
println!("{:?}", schema);
let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Deflate);
let mut record = Record::new(writer.schema()).unwrap();
record.put("decimal_fixed", Decimal::from(9936.to_bigint().unwrap().to_signed_bytes_be()));
record.put("decimal_var", Decimal::from((-32442.to_bigint().unwrap()).to_signed_bytes_be()));
record.put("uuid", uuid::Uuid::new_v4());
record.put("date", Value::Date(1));
record.put("time_millis", Value::TimeMillis(2));
record.put("time_micros", Value::TimeMicros(3));
record.put("timestamp_millis", Value::TimestampMillis(4));
record.put("timestamp_micros", Value::TimestampMicros(5));
record.put("duration", Duration::new(Months::new(6), Days::new(7), Millis::new(8)));
writer.append(record)?;
let input = writer.into_inner()?;
let reader = Reader::with_schema(&schema, &input[..])?;
for record in reader {
println!("{:?}", record?);
}
Ok(())
}
计算 Avro 架构指纹
此库支持计算以下指纹
- SHA-256
- MD5
- Rabin
支持指纹的指纹示例
use avro_rs::rabin::Rabin;
use avro_rs::{Schema, Error};
use md5::Md5;
use sha2::Sha256;
fn main() -> Result<(), Error> {
let raw_schema = r#"
{
"type": "record",
"name": "test",
"fields": [
{"name": "a", "type": "long", "default": 42},
{"name": "b", "type": "string"}
]
}
"#;
let schema = Schema::parse_str(raw_schema)?;
println!("{}", schema.fingerprint::<Sha256>());
println!("{}", schema.fingerprint::<Md5>());
println!("{}", schema.fingerprint::<Rabin>());
Ok(())
}
不规范的 数据
为了简化解码,Avro 数据的二进制编码规范要求某些字段在数据旁边有长度编码。
如果传递给 Reader
的编码数据不规范,可能会出现包含数据长度的字节是虚假的,这可能导致过度的内存分配。
为了保护用户免受不规范数据的影响,avro-rs
在解码数据时对其任何分配都设置了一个限制(默认:512MB)。
如果您预计某些数据字段的大小将超过此限制,请在读取 任何 数据之前确保使用 max_allocation_bytes
函数(我们利用 Rust 的 std::sync::Once
机制来初始化此值,如果在进行解码调用之前没有调用 max_allocation_bytes
,则限制在整个程序生命周期内为 512MB)。
use avro_rs::max_allocation_bytes;
max_allocation_bytes(2 * 1024 * 1024 * 1024); // 2GB
// ... happily decode large data
检查架构兼容性
此库支持检查架构兼容性。
注意:它尚不支持命名架构(更多信息请见 https://github.com/flavray/avro-rs/pull/76)。
兼容性检查的示例
- 兼容架构
说明:int 数组架构可以被 long 数组架构读取 - int(32位有符号整数)适合于 long(64位有符号整数)
use avro_rs::{Schema, schema_compatibility::SchemaCompatibility};
let writers_schema = Schema::parse_str(r#"{"type": "array", "items":"int"}"#).unwrap();
let readers_schema = Schema::parse_str(r#"{"type": "array", "items":"long"}"#).unwrap();
assert_eq!(true, SchemaCompatibility::can_read(&writers_schema, &readers_schema));
- 不兼容架构(long 数组架构不能被 int 数组架构读取)
说明:long 数组架构不能被 int 数组架构读取 - long(64位有符号整数)不适合于 int(32位有符号整数)
use avro_rs::{Schema, schema_compatibility::SchemaCompatibility};
let writers_schema = Schema::parse_str(r#"{"type": "array", "items":"long"}"#).unwrap();
let readers_schema = Schema::parse_str(r#"{"type": "array", "items":"int"}"#).unwrap();
assert_eq!(false, SchemaCompatibility::can_read(&writers_schema, &readers_schema));
许可
本项目受 MIT 许可证 许可。请注意,这不是由 Apache Avro 维护的官方项目。
贡献
鼓励每个人贡献力量!您可以通过在 GitHub 仓库上分叉并提交拉取请求或打开一个问题来贡献力量。所有贡献都将受 MIT 许可证 许可。
请考虑在变更日志中的未发布部分添加文档、测试和您更改的一行。如果您引入了向后不兼容的更改,请考虑在迁移指南中添加迁移说明。如果您修改了lib.rs
中的crate文档,请运行make readme
以同步README文件。
依赖项
~5.5MB
~97K SLoC