5 个版本 (有破坏性变更)

0.17.0 2024年8月5日
0.16.0 2023年9月26日
0.15.0 2023年7月10日
0.14.0 2022年8月3日
0.0.1 2022年1月31日

#41 in 编码

Download history 22401/week @ 2024-04-30 26228/week @ 2024-05-07 27216/week @ 2024-05-14 28217/week @ 2024-05-21 28877/week @ 2024-05-28 27484/week @ 2024-06-04 31310/week @ 2024-06-11 32218/week @ 2024-06-18 26945/week @ 2024-06-25 32509/week @ 2024-07-02 31774/week @ 2024-07-09 35185/week @ 2024-07-16 37455/week @ 2024-07-23 36472/week @ 2024-07-30 35115/week @ 2024-08-06 34627/week @ 2024-08-13

150,649 每月下载量
68 软件包中使用 68 个(直接使用)

Apache-2.0

1MB
19K SLoC

apache-avro

Latest Version Rust Continuous Integration Latest Documentation Apache License 2.0

Apache Avro 在 Rust 中的库。

请参阅我们的文档以获取示例、教程和API参考。

Apache Avro 是一种数据序列化系统,它提供了丰富的数据结构和紧凑、快速的二进制数据格式。

Avro中的所有数据都是模式化的,如下例所示

{
    "type": "record",
    "name": "test",
    "fields": [
        {"name": "a", "type": "long", "default": 42},
        {"name": "b", "type": "string"}
    ]
}

在Rust中处理Avro数据的基本有两种方式

  • 基于Avro模式的数据类型;
  • 兼容Rust serde的类型,实现/推导 SerializeDeserialize;

apache-avro 提供了一种轻松高效地读取和写入这两种数据表示的方法。

安装库

添加到您的 Cargo.toml

[dependencies]
apache-avro = "x.y"

如果您想利用 Snappy 编码器

[dependencies.apache-avro]
version = "x.y"
features = ["snappy"]

如果您想利用 Zstandard 编码器

[dependencies.apache-avro]
version = "x.y"
features = ["zstandard"]

如果您想利用 Bzip2 编码器

[dependencies.apache-avro]
version = "x.y"
features = ["bzip"]

如果您想利用 Xz 编码器

[dependencies.apache-avro]
version = "x.y"
features = ["xz"]

升级到较新的小版本

该库仍在beta版本中,因此小版本之间可能存在向后不兼容的更改。如果您在升级时遇到问题,请查看版本升级指南

定义一个模式

Avro 数据不能没有 Avro 架构。在编写和读取时必须使用架构,它们携带有关我们正在处理的数据类型的详细信息。Avro 架构用于架构验证和解析 Avro 数据。

Avro 架构以 JSON 格式定义,可以直接从原始字符串中解析出来。

use apache_avro::Schema;

let raw_schema = r#"
    {
        "type": "record",
        "name": "test",
        "fields": [
            {"name": "a", "type": "long", "default": 42},
            {"name": "b", "type": "string"}
        ]
    }
"#;

// if the schema is not valid, this function will return an error
let schema = Schema::parse_str(raw_schema).unwrap();

// schemas can be printed for debugging
println!("{:?}", schema);

此外,可以给出一系列定义(它们可能相互依赖),所有这些定义都将解析为相应的架构。

use apache_avro::Schema;

let raw_schema_1 = r#"{
        "name": "A",
        "type": "record",
        "fields": [
            {"name": "field_one", "type": "float"}
        ]
    }"#;

// This definition depends on the definition of A above
let raw_schema_2 = r#"{
        "name": "B",
        "type": "record",
        "fields": [
            {"name": "field_one", "type": "A"}
        ]
    }"#;

// if the schemas are not valid, this function will return an error
let schemas = Schema::parse_list(&[raw_schema_1, raw_schema_2]).unwrap();

// schemas can be printed for debugging
println!("{:?}", schemas);

注意。 架构定义的组成需要具有名称的架构。因此,应仅将 Record、Enum 和 Fixed 类型的架构输入到此函数。

该库还提供了一个程序接口来定义架构,无需将其编码为 JSON(用于高级用途),但我们强烈建议使用 JSON 接口。如果您感兴趣,请阅读 API 参考。

有关架构和您可以在其中封装的信息类型的更多信息,请参阅Avro 规范的相关部分。

写入数据

一旦我们定义了一个架构,我们就可以开始将数据序列化为 Avro,同时验证它们与提供的架构。如前所述,在 Rust 中处理 Avro 数据有两种方式。

注意: 该库还提供了一个低级接口,用于在不生成标记和头信息的情况下以 Avro 字节码编码单个数据(用于高级用途),但我们强烈建议使用 Writer 接口以实现完全的 Avro 兼容性。如果您感兴趣,请阅读 API 参考。

Avro 方法

鉴于我们上面定义的架构是 Avro Record 的架构,我们将使用库提供的关联类型来指定我们想要序列化的数据

use apache_avro::types::Record;
use apache_avro::Writer;
// a writer needs a schema and something to write to
let mut writer = Writer::new(&schema, Vec::new());

// the Record type models our Record schema
let mut record = Record::new(writer.schema()).unwrap();
record.put("a", 27i64);
record.put("b", "foo");

// schema validation happens here
writer.append(record).unwrap();

// this is how to get back the resulting avro bytecode
// this performs a flush operation to make sure data has been written, so it can fail
// you can also call `writer.flush()` yourself without consuming the writer
let encoded = writer.into_inner().unwrap();

大多数情况下,架构倾向于定义记录为一个顶级容器,封装所有要转换为字段的值并提供它们的文档,但如果我们想直接定义 Avro 值,库通过 Value 接口提供了这种能力。

use apache_avro::types::Value;

let mut value = Value::String("foo".to_string());

serde 方法

鉴于我们上面定义的架构是 Avro Record,我们可以直接使用派生自 Serialize 的 Rust 结构体来模拟我们的数据

use apache_avro::Writer;

#[derive(Debug, Serialize)]
struct Test {
    a: i64,
    b: String,
}

// a writer needs a schema and something to write to
let mut writer = Writer::new(&schema, Vec::new());

// the structure models our Record schema
let test = Test {
    a: 27,
    b: "foo".to_owned(),
};

// schema validation happens here
writer.append_ser(test).unwrap();

// this is how to get back the resulting avro bytecode
// this performs a flush operation to make sure data is written, so it can fail
// you can also call `writer.flush()` yourself without consuming the writer
let encoded = writer.into_inner();

大多数情况下,架构倾向于定义记录为一个顶级容器,封装所有要转换为字段的值并提供它们的文档,但如果我们想直接定义 Avro 值,任何实现 Serialize 的类型都应该工作。

let mut value = "foo".to_string();

使用编解码器压缩数据

Avro 在编码数据时支持三种不同的压缩编解码器

  • Null:不压缩数据;
  • Deflate:使用 RFC 1951 中指定的 deflate 算法写入数据块,通常使用 zlib 库实现。请注意,此格式(与 RFC 1950 中的“zlib 格式”不同)没有校验和。
  • Snappy:使用 Google 的 Snappy 压缩库。每个压缩块后面跟随 4 字节的大端 CRC32 校验和,该校验和对应于块中未压缩数据的校验和。您必须启用 snappy 功能才能使用此编解码器。
  • Zstandard:使用 Facebook 的 Zstandard 压缩库。您必须启用 zstandard 功能才能使用此编解码器。
  • Bzip2:使用 BZip2 压缩库。您必须启用 bzip 功能才能使用此编解码器。
  • Xz:使用xz2压缩库。您必须启用xz功能才能使用此编解码器。

要指定用于压缩数据的编解码器,只需在创建Writer时指定即可。

use apache_avro::Writer;
use apache_avro::Codec;
let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Deflate);

读取数据

就读取Avro编码数据而言,我们可以直接使用与数据一起编码的架构来读取它们。库将自动为我们完成,就像它对压缩编解码器所做的那样。

use apache_avro::Reader;
// reader creation can fail in case the input to read from is not Avro-compatible or malformed
let reader = Reader::new(&input[..]).unwrap();

如果我们想指定一个与数据编写时使用的架构不同(但兼容)的读取架构,我们可以按照以下步骤操作

use apache_avro::Schema;
use apache_avro::Reader;

let reader_raw_schema = r#"
    {
        "type": "record",
        "name": "test",
        "fields": [
            {"name": "a", "type": "long", "default": 42},
            {"name": "b", "type": "string"},
            {"name": "c", "type": "long", "default": 43}
        ]
    }
"#;

let reader_schema = Schema::parse_str(reader_raw_schema).unwrap();

// reader creation can fail in case the input to read from is not Avro-compatible or malformed
let reader = Reader::with_schema(&reader_schema, &input[..]).unwrap();

在读取数据时,库也将自动执行架构解析。

有关架构兼容性和解析的更多信息,请参阅Avro规范

通常,在Rust中处理Avro数据有两种方法,如下所示。

注意:该库还提供了一个低级接口,用于解码Avro字节码中的单个数据项,而不带标记和头(用于高级使用),但我们强烈建议使用Reader接口来利用所有Avro功能。如果您感兴趣,请阅读API参考。

Avro 方法

我们可以直接从Reader迭代器中读取Value实例。

use apache_avro::Reader;
let reader = Reader::new(&input[..]).unwrap();

// value is a Result  of an Avro Value in case the read operation fails
for value in reader {
    println!("{:?}", value.unwrap());
}

serde 方法

或者,我们可以使用实现Deserialize并代表我们的架构的Rust类型来读取数据。

use apache_avro::Reader;
use apache_avro::from_value;

#[derive(Debug, Deserialize)]
struct Test {
    a: i64,
    b: String,
}

let reader = Reader::new(&input[..]).unwrap();

// value is a Result in case the read operation fails
for value in reader {
    println!("{:?}", from_value::<Test>(&value.unwrap()));
}

整合一切

以下是如何结合到目前为止所展示的一切的示例,它旨在作为库接口的快速参考。

use apache_avro::{Codec, Reader, Schema, Writer, from_value, types::Record, Error};
use serde::{Deserialize, Serialize};

#[derive(Debug, Deserialize, Serialize)]
struct Test {
    a: i64,
    b: String,
}

fn main() -> Result<(), Error> {
    let raw_schema = r#"
        {
            "type": "record",
            "name": "test",
            "fields": [
                {"name": "a", "type": "long", "default": 42},
                {"name": "b", "type": "string"}
            ]
        }
    "#;

    let schema = Schema::parse_str(raw_schema)?;

    println!("{:?}", schema);

    let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Deflate);

    let mut record = Record::new(writer.schema()).unwrap();
    record.put("a", 27i64);
    record.put("b", "foo");

    writer.append(record)?;

    let test = Test {
        a: 27,
        b: "foo".to_owned(),
    };

    writer.append_ser(test)?;

    let input = writer.into_inner()?;
    let reader = Reader::with_schema(&schema, &input[..])?;

    for record in reader {
        println!("{:?}", from_value::<Test>(&record?));
    }
    Ok(())
}

apache-avro支持Avro规范中列出的逻辑类型。

  1. Decimal使用num_bigint包。
  2. UUID使用uuid包。
  3. 日期和时间(毫秒)作为i32,时间(微秒)作为i64
  4. 时间戳(毫秒和微秒)作为i64
  5. 本地时间戳(毫秒和微秒)作为i64
  6. 持续时间作为一个具有monthsdaysmillis访问器方法的自定义类型,每个访问器方法都返回一个i32

请注意,磁盘上的表示与底层原始/复杂数据类型相同。

读取和写入逻辑类型

use apache_avro::{
    types::Record, types::Value, Codec, Days, Decimal, Duration, Millis, Months, Reader, Schema,
    Writer, Error,
};
use num_bigint::ToBigInt;

fn main() -> Result<(), Error> {
    let raw_schema = r#"
    {
      "type": "record",
      "name": "test",
      "fields": [
        {
          "name": "decimal_fixed",
          "type": {
            "type": "fixed",
            "size": 2,
            "name": "decimal"
          },
          "logicalType": "decimal",
          "precision": 4,
          "scale": 2
        },
        {
          "name": "decimal_var",
          "type": "bytes",
          "logicalType": "decimal",
          "precision": 10,
          "scale": 3
        },
        {
          "name": "uuid",
          "type": "string",
          "logicalType": "uuid"
        },
        {
          "name": "date",
          "type": "int",
          "logicalType": "date"
        },
        {
          "name": "time_millis",
          "type": "int",
          "logicalType": "time-millis"
        },
        {
          "name": "time_micros",
          "type": "long",
          "logicalType": "time-micros"
        },
        {
          "name": "timestamp_millis",
          "type": "long",
          "logicalType": "timestamp-millis"
        },
        {
          "name": "timestamp_micros",
          "type": "long",
          "logicalType": "timestamp-micros"
        },
        {
          "name": "local_timestamp_millis",
          "type": "long",
          "logicalType": "local-timestamp-millis"
        },
        {
          "name": "local_timestamp_micros",
          "type": "long",
          "logicalType": "local-timestamp-micros"
        },
        {
          "name": "duration",
          "type": {
            "type": "fixed",
            "size": 12,
            "name": "duration"
          },
          "logicalType": "duration"
        }
      ]
    }
    "#;

    let schema = Schema::parse_str(raw_schema)?;

    println!("{:?}", schema);

    let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Deflate);

    let mut record = Record::new(writer.schema()).unwrap();
    record.put("decimal_fixed", Decimal::from(9936.to_bigint().unwrap().to_signed_bytes_be()));
    record.put("decimal_var", Decimal::from((-32442.to_bigint().unwrap()).to_signed_bytes_be()));
    record.put("uuid", uuid::Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000").unwrap());
    record.put("date", Value::Date(1));
    record.put("time_millis", Value::TimeMillis(2));
    record.put("time_micros", Value::TimeMicros(3));
    record.put("timestamp_millis", Value::TimestampMillis(4));
    record.put("timestamp_micros", Value::TimestampMicros(5));
    record.put("timestamp_nanos", Value::TimestampNanos(6));
    record.put("local_timestamp_millis", Value::LocalTimestampMillis(4));
    record.put("local_timestamp_micros", Value::LocalTimestampMicros(5));
    record.put("local_timestamp_nanos", Value::LocalTimestampMicros(6));
    record.put("duration", Duration::new(Months::new(6), Days::new(7), Millis::new(8)));

    writer.append(record)?;

    let input = writer.into_inner()?;
    let reader = Reader::with_schema(&schema, &input[..])?;

    for record in reader {
        println!("{:?}", record?);
    }
    Ok(())
}

计算Avro架构指纹

此库支持计算以下指纹

  • SHA-256
  • MD5
  • Rabin

以下是对支持的指纹进行指纹识别的示例

use apache_avro::rabin::Rabin;
use apache_avro::{Schema, Error};
use md5::Md5;
use sha2::Sha256;

fn main() -> Result<(), Error> {
    let raw_schema = r#"
        {
            "type": "record",
            "name": "test",
            "fields": [
                {"name": "a", "type": "long", "default": 42},
                {"name": "b", "type": "string"}
            ]
        }
    "#;
    let schema = Schema::parse_str(raw_schema)?;
    println!("{}", schema.fingerprint::<Sha256>());
    println!("{}", schema.fingerprint::<Md5>());
    println!("{}", schema.fingerprint::<Rabin>());
    Ok(())
}

格式错误的数据

为了简化解码,Avro数据的二进制编码规范要求某些字段在其数据旁边编码长度。

如果传递给Reader的编码数据格式错误,可能存在包含数据长度的字节是无效的情况,这可能导致过度的内存分配。

为了保护用户免受格式错误的数据的影响,apache-avro在解码数据时对其将执行的任何分配设置了限制(默认:512MB)。

如果您预计某些数据字段将超过此限制,请在读取任何数据之前务必使用 max_allocation_bytes 函数(我们利用 Rust 的 std::sync::Once 机制来初始化此值,如果在使用 max_allocation_bytes 之前调用了解码,则整个程序生命周期中的限制将为 512MB)。

use apache_avro::max_allocation_bytes;

max_allocation_bytes(2 * 1024 * 1024 * 1024);  // 2GB

// ... happily decode large data

检查模式兼容性

此库支持检查模式兼容性。

检查兼容性的示例

  1. 兼容的模式

说明:int 数组模式可以被 long 数组模式读取——一个 int(32位有符号整数)可以放入一个 long(64位有符号整数)中

use apache_avro::{Schema, schema_compatibility::SchemaCompatibility};

let writers_schema = Schema::parse_str(r#"{"type": "array", "items":"int"}"#).unwrap();
let readers_schema = Schema::parse_str(r#"{"type": "array", "items":"long"}"#).unwrap();
assert!(SchemaCompatibility::can_read(&writers_schema, &readers_schema).is_ok());
  1. 不兼容的模式(long 数组模式不能被 int 数组模式读取)

说明:long 数组模式不能被 int 数组模式读取——一个 long(64位有符号整数)不能放入一个 int(32位有符号整数)中

use apache_avro::{Schema, schema_compatibility::SchemaCompatibility};

let writers_schema = Schema::parse_str(r#"{"type": "array", "items":"long"}"#).unwrap();
let readers_schema = Schema::parse_str(r#"{"type": "array", "items":"int"}"#).unwrap();
assert!(SchemaCompatibility::can_read(&writers_schema, &readers_schema).is_err());

自定义名称验证器

默认情况下,库遵循Avro 规范的规则!

其他一些 Apache Avro 语言 SDK 不那么严格,允许在名称中使用更多字符。为了与这些 SDK 兼容,库提供了一种自定义名称验证的方法。

use apache_avro::AvroResult;
use apache_avro::schema::Namespace;
use apache_avro::validator::{SchemaNameValidator, set_schema_name_validator};

struct MyCustomValidator;

impl SchemaNameValidator for MyCustomValidator {
    fn validate(&self, name: &str) -> AvroResult<(String, Namespace)> {
        todo!()
    }
}

// don't parse any schema before registering the custom validator(s) !

set_schema_name_validator(Box::new(MyCustomValidator));

// ... use the library

可以应用类似的逻辑来验证模式命名空间、枚举符号和字段名称。

注意:库允许在每个应用程序生命周期中仅设置验证器一次!如果应用程序在设置验证器之前解析了模式,则将注册并使用默认验证器!

自定义模式相等比较器

库提供了两个模式相等比较器的实现

  1. SpecificationEq - 一个将模式序列化为它们的规范形式(即 JSON)并将它们作为字符串比较的比较器。这是直到 apache_avro 0.16.0 的唯一实现。有关更多信息,请参阅Avro 规范
  2. StructFieldEq - 一个结构上比较模式的比较器。它比 SpecificationEq 快,因为它一发现差异就返回 false,并建议使用!自 apache_avro 0.17.0 起是默认比较器。

要使用自定义比较器,您需要实现 SchemataEq 特性并使用 set_schemata_equality_comparator 函数设置它

use apache_avro::{AvroResult, Schema};
use apache_avro::schema::Namespace;
use apache_avro::schema_equality::{SchemataEq, set_schemata_equality_comparator};

#[derive(Debug)]
struct MyCustomSchemataEq;

impl SchemataEq for MyCustomSchemataEq {
    fn compare(&self, schema_one: &Schema, schema_two: &Schema) -> bool {
        todo!()
    }
}

// don't parse any schema before registering the custom comparator !

set_schemata_equality_comparator(Box::new(MyCustomSchemataEq));

// ... use the library

注意:库允许在每个应用程序生命周期中仅设置比较器一次!如果应用程序在设置比较器之前解析了模式,则将注册并使用默认比较器!

最小支持的 Rust 版本

1.73.0

许可证

此项目受Apache 许可证 2.0许可。

贡献

鼓励每个人贡献!您可以通过在 GitHub 仓库上分叉并提交拉取请求或打开一个问题来贡献。所有贡献都将受Apache 许可证 2.0许可。

请考虑添加文档和测试!如果您引入了不兼容的更改,请考虑在迁移指南中添加迁移说明。如果您修改了 lib.rs 中的 crate 文档,请运行 make readme 以同步 README 文件。

依赖关系

约 5–7MB
~128K SLoC