23个版本 (12个破坏性更新)

0.13.0 2021年1月29日
0.12.0 2020年11月27日
0.11.0 2020年8月13日
0.10.0 2020年5月31日
0.4.1 2018年6月17日

#146 in 数据结构

Download history 9467/week @ 2024-03-14 4143/week @ 2024-03-21 21494/week @ 2024-03-28 4903/week @ 2024-04-04 5938/week @ 2024-04-11 6323/week @ 2024-04-18 5946/week @ 2024-04-25 4791/week @ 2024-05-02 4049/week @ 2024-05-09 12490/week @ 2024-05-16 11750/week @ 2024-05-23 5467/week @ 2024-05-30 12499/week @ 2024-06-06 8643/week @ 2024-06-13 8738/week @ 2024-06-20 10135/week @ 2024-06-27

每月41,390次下载
用于 27 个crate(18个直接使用)

MIT 许可证

315KB
6.5K SLoC

avro-rs

Latest Version Continuous Integration Latest Documentation MIT licensed

在Rust中处理Apache Avro的库。

请查阅我们的文档以获取示例、教程和API参考。

Apache Avro是一个数据序列化系统,它提供了丰富的数据结构和一个紧凑、快速的二进制数据格式。

Avro中的所有数据都是模式化的,如下例所示

{
    "type": "record",
    "name": "test",
    "fields": [
        {"name": "a", "type": "long", "default": 42},
        {"name": "b", "type": "string"}
    ]
}

在Rust中处理Avro数据有两种基本方法

  • 基于Avro模式的数据类型
  • 实现/推导SerializeDeserialize的通用Rust serde兼容类型

    ;

avro-rs提供了一种轻松高效地读取和写入这两种数据表示的方法。

安装库

将其添加到您的Cargo.toml

[dependencies]
avro-rs = "x.y"

或者如果您想利用Snappy编解码器

[dependencies.avro-rs]
version = "x.y"
features = ["snappy"]

升级到较新的次版本

该库仍在测试阶段,因此次版本之间可能存在不兼容的更改。如果您在升级时遇到问题,请查看版本升级指南

定义模式

没有Avro模式,Avro数据无法存在。在写入时必须使用模式,在读取时也可以使用模式,它们携带有关我们处理的数据类型的信

息。Avro模式用于模式验证和解析Avro数据。

use avro_rs::Schema;

let raw_schema = r#"
    {
        "type": "record",
        "name": "test",
        "fields": [
            {"name": "a", "type": "long", "default": 42},
            {"name": "b", "type": "string"}
        ]
    }
"#;

// if the schema is not valid, this function will return an error
let schema = Schema::parse_str(raw_schema).unwrap();

// schemas can be printed for debugging
println!("{:?}", schema);

Avro模式以JSON格式定义,可以直接从原始字符串解析出来。

use avro_rs::Schema;

let raw_schema_1 = r#"{
        "name": "A",
        "type": "record",
        "fields": [
            {"name": "field_one", "type": "float"}
        ]
    }"#;

// This definition depends on the definition of A above
let raw_schema_2 = r#"{
        "name": "B",
        "type": "record",
        "fields": [
            {"name": "field_one", "type": "A"}
        ]
    }"#;

// if the schemas are not valid, this function will return an error
let schemas = Schema::parse_list(&[raw_schema_1, raw_schema_2]).unwrap();

// schemas can be printed for debugging
println!("{:?}", schemas);

注意。 需要注意的是,模式定义的组成需要带有名称的方案。因此,只有类型为Record、Enum和Fixed的方案应输入此函数。

此外,该库还提供了一种程序接口来定义不将其编码在JSON中的方案(用于高级用途),但我们强烈推荐使用JSON接口。如果您对此感兴趣,请阅读API参考。

有关方案以及您可以在其中封装的信息类型的更多信息,请参阅Avro规范的相关部分。

写入数据

一旦我们定义了方案,我们就可以在Avro中对数据进行序列化,并在过程中对其提供的方案进行验证。如前所述,在Rust中处理Avro数据有两种方式。

注意: 该库还提供了一个低级接口,用于对单个数据项进行Avro字节码编码,而不生成标记和标题(用于高级用途),但我们强烈推荐使用Writer接口以实现完全的Avro兼容性。如果您对此感兴趣,请阅读API参考。

Avro方法

鉴于我们上面定义的方案是Avro Record的方案,我们将使用库提供的关联类型来指定我们想要序列化的数据。

use avro_rs::types::Record;
use avro_rs::Writer;
#
// a writer needs a schema and something to write to
let mut writer = Writer::new(&schema, Vec::new());

// the Record type models our Record schema
let mut record = Record::new(writer.schema()).unwrap();
record.put("a", 27i64);
record.put("b", "foo");

// schema validation happens here
writer.append(record).unwrap();

// this is how to get back the resulting avro bytecode
// this performs a flush operation to make sure data has been written, so it can fail
// you can also call `writer.flush()` yourself without consuming the writer
let encoded = writer.into_inner().unwrap();

在大多数情况下,方案倾向于将记录定义为一个顶层容器,封装所有要转换的字段并为其提供文档,但如果我们想要直接定义一个Avro值,库通过Value接口提供了这种能力。

use avro_rs::types::Value;

let mut value = Value::String("foo".to_string());

serde方法

鉴于我们上面定义的方案是Avro Record,我们可以直接使用Rust结构体,该结构体继承自Serialize来模拟我们的数据。

use avro_rs::Writer;

#[derive(Debug, Serialize)]
struct Test {
    a: i64,
    b: String,
}

// a writer needs a schema and something to write to
let mut writer = Writer::new(&schema, Vec::new());

// the structure models our Record schema
let test = Test {
    a: 27,
    b: "foo".to_owned(),
};

// schema validation happens here
writer.append_ser(test).unwrap();

// this is how to get back the resulting avro bytecode
// this performs a flush operation to make sure data is written, so it can fail
// you can also call `writer.flush()` yourself without consuming the writer
let encoded = writer.into_inner();

在大多数情况下,方案倾向于将记录定义为一个顶层容器,封装所有要转换的字段并为其提供文档,但如果我们想要直接定义一个Avro值,任何实现Serialize的类型都应适用。

let mut value = "foo".to_string();

使用编解码器压缩数据

在编码数据时,Avro支持三种不同的压缩编解码器。

  • Null:不压缩数据;
  • Deflate:使用RFC 1951中指定的deflate算法编写数据块,通常使用zlib库实现。请注意,此格式(与RFC 1950中的“zlib格式”不同)没有校验和。
  • Snappy:使用Google的Snappy压缩库。每个压缩块后面跟着该块中未压缩数据的4字节、大端CRC32校验和。您必须启用snappy功能才能使用此编解码器。

要指定用于压缩数据的编解码器,只需在创建Writer时指定即可。

use avro_rs::Writer;
use avro_rs::Codec;
#
let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Deflate);

读取数据

至于读取Avro编码数据,我们只需使用与数据编码一起编码的方案即可读取它们。库会自动为我们完成,就像它已经为我们完成压缩编解码器一样。

use avro_rs::Reader;
#
// reader creation can fail in case the input to read from is not Avro-compatible or malformed
let reader = Reader::new(&input[..]).unwrap();

如果,相反,我们想要指定与数据写入时使用的方案不同(但兼容)的读取方案,我们可以按照以下方式操作

use avro_rs::Schema;
use avro_rs::Reader;
#

let reader_raw_schema = r#"
    {
        "type": "record",
        "name": "test",
        "fields": [
            {"name": "a", "type": "long", "default": 42},
            {"name": "b", "type": "string"},
            {"name": "c", "type": "long", "default": 43}
        ]
    }
"#;

let reader_schema = Schema::parse_str(reader_raw_schema).unwrap();

// reader creation can fail in case the input to read from is not Avro-compatible or malformed
let reader = Reader::with_schema(&reader_schema, &input[..]).unwrap();

在读取数据时,库也会自动执行方案解析。

有关方案兼容性和解析的更多信息,请参阅Avro规范

通常,在Rust中处理Avro数据有两种方式,如下所示。

注意: 该库还提供了一个低级接口,用于解码Avro字节码中的单个数据项,而不进行标记和标题(用于高级用途),但我们强烈推荐使用Reader接口以利用所有Avro功能。如果您对此感兴趣,请阅读API参考。

Avro方法

我们可以直接从 Reader 迭代器中读取 Value 的实例

use avro_rs::Reader;
#
let reader = Reader::new(&input[..]).unwrap();

// value is a Result  of an Avro Value in case the read operation fails
for value in reader {
    println!("{:?}", value.unwrap());
}

serde方法

或者,我们可以使用实现 Deserialize 的 Rust 类型来表示我们的模式,并将数据读入

use avro_rs::Reader;
use avro_rs::from_value;

#[derive(Debug, Deserialize)]
struct Test {
    a: i64,
    b: String,
}

let reader = Reader::new(&input[..]).unwrap();

// value is a Result in case the read operation fails
for value in reader {
    println!("{:?}", from_value::<Test>(&value.unwrap()));
}

将一切组合在一起

以下是如何结合到目前为止所展示的一切的示例,它旨在成为库接口的快速参考

use avro_rs::{Codec, Reader, Schema, Writer, from_value, types::Record, Error};
use serde::{Deserialize, Serialize};

#[derive(Debug, Deserialize, Serialize)]
struct Test {
    a: i64,
    b: String,
}

fn main() -> Result<(), Error> {
    let raw_schema = r#"
        {
            "type": "record",
            "name": "test",
            "fields": [
                {"name": "a", "type": "long", "default": 42},
                {"name": "b", "type": "string"}
            ]
        }
    "#;

    let schema = Schema::parse_str(raw_schema)?;

    println!("{:?}", schema);

    let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Deflate);

    let mut record = Record::new(writer.schema()).unwrap();
    record.put("a", 27i64);
    record.put("b", "foo");

    writer.append(record)?;

    let test = Test {
        a: 27,
        b: "foo".to_owned(),
    };

    writer.append_ser(test)?;

    let input = writer.into_inner()?;
    let reader = Reader::with_schema(&schema, &input[..])?;

    for record in reader {
        println!("{:?}", from_value::<Test>(&record?));
    }
    Ok(())
}

avro-rs 也支持 Avro 规范中列出的逻辑类型(见Avro 规范

  1. 使用 num_bigint 包来使用 Decimal
  2. 使用 uuid 包来使用 UUID
  3. 日期和时间(毫秒)作为 i32,时间(微秒)作为 i64
  4. 时间戳(毫秒和微秒)作为 i64
  5. 持续时间作为具有 monthsdaysmillis 访问器的自定义类型,每个访问器都返回一个 i32

注意,磁盘表示形式与底层原始/复杂类型相同。

读取和写入逻辑类型

use avro_rs::{
    types::Record, types::Value, Codec, Days, Decimal, Duration, Millis, Months, Reader, Schema,
    Writer, Error,
};
use num_bigint::ToBigInt;

fn main() -> Result<(), Error> {
    let raw_schema = r#"
    {
      "type": "record",
      "name": "test",
      "fields": [
        {
          "name": "decimal_fixed",
          "type": {
            "type": "fixed",
            "size": 2,
            "name": "decimal"
          },
          "logicalType": "decimal",
          "precision": 4,
          "scale": 2
        },
        {
          "name": "decimal_var",
          "type": "bytes",
          "logicalType": "decimal",
          "precision": 10,
          "scale": 3
        },
        {
          "name": "uuid",
          "type": "string",
          "logicalType": "uuid"
        },
        {
          "name": "date",
          "type": "int",
          "logicalType": "date"
        },
        {
          "name": "time_millis",
          "type": "int",
          "logicalType": "time-millis"
        },
        {
          "name": "time_micros",
          "type": "long",
          "logicalType": "time-micros"
        },
        {
          "name": "timestamp_millis",
          "type": "long",
          "logicalType": "timestamp-millis"
        },
        {
          "name": "timestamp_micros",
          "type": "long",
          "logicalType": "timestamp-micros"
        },
        {
          "name": "duration",
          "type": {
            "type": "fixed",
            "size": 12,
            "name": "duration"
          },
          "logicalType": "duration"
        }
      ]
    }
    "#;

    let schema = Schema::parse_str(raw_schema)?;

    println!("{:?}", schema);

    let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Deflate);

    let mut record = Record::new(writer.schema()).unwrap();
    record.put("decimal_fixed", Decimal::from(9936.to_bigint().unwrap().to_signed_bytes_be()));
    record.put("decimal_var", Decimal::from((-32442.to_bigint().unwrap()).to_signed_bytes_be()));
    record.put("uuid", uuid::Uuid::new_v4());
    record.put("date", Value::Date(1));
    record.put("time_millis", Value::TimeMillis(2));
    record.put("time_micros", Value::TimeMicros(3));
    record.put("timestamp_millis", Value::TimestampMillis(4));
    record.put("timestamp_micros", Value::TimestampMicros(5));
    record.put("duration", Duration::new(Months::new(6), Days::new(7), Millis::new(8)));

    writer.append(record)?;

    let input = writer.into_inner()?;
    let reader = Reader::with_schema(&schema, &input[..])?;

    for record in reader {
        println!("{:?}", record?);
    }
    Ok(())
}

计算 Avro 架构指纹

此库支持计算以下指纹

  • SHA-256
  • MD5
  • Rabin

支持指纹的指纹示例

use avro_rs::rabin::Rabin;
use avro_rs::{Schema, Error};
use md5::Md5;
use sha2::Sha256;

fn main() -> Result<(), Error> {
    let raw_schema = r#"
        {
            "type": "record",
            "name": "test",
            "fields": [
                {"name": "a", "type": "long", "default": 42},
                {"name": "b", "type": "string"}
            ]
        }
    "#;
    let schema = Schema::parse_str(raw_schema)?;
    println!("{}", schema.fingerprint::<Sha256>());
    println!("{}", schema.fingerprint::<Md5>());
    println!("{}", schema.fingerprint::<Rabin>());
    Ok(())
}

不规范的 数据

为了简化解码,Avro 数据的二进制编码规范要求某些字段在数据旁边有长度编码。

如果传递给 Reader 的编码数据不规范,可能会出现包含数据长度的字节是虚假的,这可能导致过度的内存分配。

为了保护用户免受不规范数据的影响,avro-rs 在解码数据时对其任何分配都设置了一个限制(默认:512MB)。

如果您预计某些数据字段的大小将超过此限制,请在读取 任何 数据之前确保使用 max_allocation_bytes 函数(我们利用 Rust 的 std::sync::Once 机制来初始化此值,如果在进行解码调用之前没有调用 max_allocation_bytes,则限制在整个程序生命周期内为 512MB)。

use avro_rs::max_allocation_bytes;

max_allocation_bytes(2 * 1024 * 1024 * 1024);  // 2GB

// ... happily decode large data

检查架构兼容性

此库支持检查架构兼容性。

注意:它尚不支持命名架构(更多信息请见 https://github.com/flavray/avro-rs/pull/76)。

兼容性检查的示例

  1. 兼容架构

说明:int 数组架构可以被 long 数组架构读取 - int(32位有符号整数)适合于 long(64位有符号整数)

use avro_rs::{Schema, schema_compatibility::SchemaCompatibility};

let writers_schema = Schema::parse_str(r#"{"type": "array", "items":"int"}"#).unwrap();
let readers_schema = Schema::parse_str(r#"{"type": "array", "items":"long"}"#).unwrap();
assert_eq!(true, SchemaCompatibility::can_read(&writers_schema, &readers_schema));
  1. 不兼容架构(long 数组架构不能被 int 数组架构读取)

说明:long 数组架构不能被 int 数组架构读取 - long(64位有符号整数)不适合于 int(32位有符号整数)

use avro_rs::{Schema, schema_compatibility::SchemaCompatibility};

let writers_schema = Schema::parse_str(r#"{"type": "array", "items":"long"}"#).unwrap();
let readers_schema = Schema::parse_str(r#"{"type": "array", "items":"int"}"#).unwrap();
assert_eq!(false, SchemaCompatibility::can_read(&writers_schema, &readers_schema));

许可

本项目受 MIT 许可证 许可。请注意,这不是由 Apache Avro 维护的官方项目。

贡献

鼓励每个人贡献力量!您可以通过在 GitHub 仓库上分叉并提交拉取请求或打开一个问题来贡献力量。所有贡献都将受 MIT 许可证 许可。

请考虑在变更日志中的未发布部分添加文档、测试和您更改的一行。如果您引入了向后不兼容的更改,请考虑在迁移指南中添加迁移说明。如果您修改了lib.rs中的crate文档,请运行make readme以同步README文件。

依赖项

~5.5MB
~97K SLoC