#avro #big-data #data-streaming #json-schema #json-format #data-file

avrow

Avrow 是一个基于 serde 的快速、类型安全的数据序列化库

3 个不稳定版本

0.2.1 2020年11月26日
0.2.0 2020年10月10日
0.1.0 2020年10月8日

#1595编码

每月 26 下载

MIT/Apache

595KB
4K SLoC

avrow

Actions Status crates docs.rs license license Contributor Covenant



Avrow 是 Avro 规范的纯 Rust 实现,并支持 Serde



目录

概述

Avrow 是 Avro 规范的纯 Rust 实现:一种基于行的数据序列化系统。Avro 数据序列化格式在大数据流系统(如 KafkaSpark)中得到了广泛应用。在 avro 的上下文中,一个 avro 编码的文件或字节流被称为“数据文件”。要写入 avro 编码格式的数据,需要一个 JSON 格式的模式。以下是一个用 JSON 表示的 avro 模式示例

{
  "type": "record",
  "name": "LongList",
  "aliases": ["LinkedLongs"],
  "fields" : [
    {"name": "value", "type": "long"},
    {"name": "next", "type": ["null", "LongList"]}
  ]
}

上述模式是记录类型,包含字段,表示一个 64 位整数的链表。在大多数实现中,此模式随后被馈送到一个 Writer 实例,并附带一个用于写入编码数据的缓冲区。然后可以调用写入器上的一个 write 方法来写入数据。Avro 的一个独特之处在于,编码数据的模式被写入数据文件的头部。这意味着在读取数据时,不需要向 Reader 实例提供模式。规范还允许在读取时提供读取器模式以过滤数据。

Avro 规范提供了两种类型的编码

  • 二进制编码 - 高效且占用磁盘空间较少。
  • JSON 编码 - 当您想要 avro 编码数据的可读版本时。也用于调试目的。

这个软件包仅实现了二进制编码,因为这是出于性能和存储原因实际使用的格式。

特性

  • 对递归自引用模式的 Serde 序列化和反序列化提供了全面支持。
  • 根据规范,支持所有压缩编解码器(deflatebzip2snappyxzzstd)。
  • 简单直观的API - 由于使用的底层结构是 ReadWrite 类型,avrow 尝试模仿 Rust 标准库 API,以实现最小化的学习成本。写入 avro 值只需调用 writeserialize(使用 serde),读取 avro 值只需使用迭代器。
  • 减少冗余/轻量级 - Rust 的编译时间成本高昂。avrow 尝试使用最少的第三方包。压缩编解码器和模式指纹支持默认情况下是功能隔离的。要使用它们,请使用相应的功能标志进行编译(例如 --features zstd)。
  • 模式演进 - 可以使用读取模式配置 avrow Reader,并只读取与其用例相关的数据。
  • avrow 中的模式支持查询其规范形式,并具有指纹支持(rabin64sha256md5)。

注意:这并不是完整的规范实现,正在实施的功能列在 待办事项 部分。

入门

将 avrow 添加为 Cargo.toml 的依赖项

[dependencies]
avrow = "0.2.0"

示例

写入 avro 数据


use anyhow::Error;
use avrow::{Schema, Writer};
use std::str::FromStr;

fn main() -> Result<(), Error> {
    // Create schema from json
    let schema = Schema::from_str(r##"{"type":"string"}"##)?;
    // or from a path
    let schema2 = Schema::from_path("./string_schema.avsc")?;
    // Create an output stream
    let stream = Vec::new();
    // Create a writer
    let writer = Writer::new(&schema, stream.as_slice())?;
    // Write your data!
    let res = writer.write("Hey")?;
    // or using serialize method for serde derived types.
    let res = writer.serialize("there!")?;

    Ok(())
}

对于简单的本地 Rust 类型,avrow 提供了一个 From 实现,以将它们转换为 Avro 值类型。对于复合或用户定义类型(结构体或枚举),可以使用依赖 serde 的 serialize 方法。或者,可以构建 avrow::Value 实例,这是一种更冗长的写 avro 值的方法,应作为最后的手段。

读取 avro 数据

fn main() -> Result<(), Error> {
    let schema = Schema::from_str(r##""null""##);
    let data = vec![
        79, 98, 106, 1, 4, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101,
        109, 97, 32, 123, 34, 116, 121, 112, 101, 34, 58, 34, 98, 121, 116,
        101, 115, 34, 125, 20, 97, 118, 114, 111, 46, 99, 111, 100, 101,
        99, 14, 100, 101, 102, 108, 97, 116, 101, 0, 145, 85, 112, 15, 87,
        201, 208, 26, 183, 148, 48, 236, 212, 250, 38, 208, 2, 18, 227, 97,
        96, 100, 98, 102, 97, 5, 0, 145, 85, 112, 15, 87, 201, 208, 26,
        183, 148, 48, 236, 212, 250, 38, 208,
    ];
    // Create a Reader
    let reader = Reader::with_schema(v.as_slice(), &schema)?;
    for i in reader {
        dbg!(&i);
    }

    Ok(())
}

自引用递归模式示例

use anyhow::Error;
use avrow::{from_value, Codec, Reader, Schema, Writer};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
struct LongList {
    value: i64,
    next: Option<Box<LongList>>,
}

fn main() -> Result<(), Error> {
    let schema = r##"
        {
            "type": "record",
            "name": "LongList",
            "aliases": ["LinkedLongs"],
            "fields" : [
              {"name": "value", "type": "long"},
              {"name": "next", "type": ["null", "LongList"]}
            ]
          }
        "##;

    let schema = Schema::from_str(schema)?;
    let mut writer = Writer::with_codec(&schema, vec![], Codec::Null)?;

    let value = LongList {
        value: 1i64,
        next: Some(Box::new(LongList {
            value: 2i64,
            next: Some(Box::new(LongList {
                value: 3i64,
                next: Some(Box::new(LongList {
                    value: 4i64,
                    next: Some(Box::new(LongList {
                        value: 5i64,
                        next: None,
                    })),
                })),
            })),
        })),
    };

    writer.serialize(value)?;

    // Calling into_inner performs flush internally. Alternatively, one can call flush explicitly.
    let buf = writer.into_inner()?;

    // read
    let reader = Reader::with_schema(buf.as_slice(), &schema)?;
    for i in reader {
        let a: LongList = from_value(&i)?;
        dbg!(a);
    }

    Ok(())
}

使用确认模式写入 json 对象的示例。json 对象映射到 avrow::Record 类型。

use anyhow::Error;
use avrow::{from_value, Reader, Record, Schema, Writer};
use serde::{Deserialize, Serialize};
use std::str::FromStr;

#[derive(Debug, Serialize, Deserialize)]
struct Mentees {
    id: i32,
    username: String,
}

#[derive(Debug, Serialize, Deserialize)]
struct RustMentors {
    name: String,
    github_handle: String,
    active: bool,
    mentees: Mentees,
}

fn main() -> Result<(), Error> {
    let schema = Schema::from_str(
        r##"
            {
            "name": "rust_mentors",
            "type": "record",
            "fields": [
                {
                "name": "name",
                "type": "string"
                },
                {
                "name": "github_handle",
                "type": "string"
                },
                {
                "name": "active",
                "type": "boolean"
                },
                {
                    "name":"mentees",
                    "type": {
                        "name":"mentees",
                        "type": "record",
                        "fields": [
                            {"name":"id", "type": "int"},
                            {"name":"username", "type": "string"}
                        ]
                    }
                }
            ]
            }
"##,
    )?;

    let json_data = serde_json::from_str(
        r##"
    { "name": "bob",
        "github_handle":"ghbob",
        "active": true,
        "mentees":{"id":1, "username":"alice"} }"##,
    )?;
    let rec = Record::from_json(json_data, &schema)?;
    let mut writer = crate::Writer::new(&schema, vec![])?;
    writer.write(rec)?;

    let avro_data = writer.into_inner()?;
    let reader = crate::Reader::new(avro_data.as_slice())?;
    for value in reader {
        let mentors: RustMentors = from_value(&value)?;
        dbg!(mentors);
    }
    Ok(())
}

写入器定制

如果您想要更控制 Writer 的参数,请考虑使用以下示例中的 WriterBuilder


use anyhow::Error;
use avrow::{Codec, Reader, Schema, WriterBuilder};

fn main() -> Result<(), Error> {
    let schema = Schema::from_str(r##""null""##)?;
    let v = vec![];
    let mut writer = WriterBuilder::new()
        .set_codec(Codec::Null)
        .set_schema(&schema)
        .set_datafile(v)
        // set any custom metadata in the header
        .set_metadata("hello", "world")
        // set after how many bytes, the writer should flush
        .set_flush_interval(128_000)
        .build()
        .unwrap();
    writer.serialize(())?;
    let v = writer.into_inner()?;

    let reader = Reader::with_schema(v.as_slice(), schema)?;
    for i in reader {
        dbg!(i?);
    }

    Ok(())
}

有关更多代码示例,请参阅 示例

支持的编解码器

为了促进高效的编码,avro 规范还定义了在序列化数据时使用的压缩编解码器。

Avrow 支持 spec 中指定的所有压缩编解码器

这些功能默认是功能隔离的。有关更多详细信息,请检查 Cargo.toml 中的 features 部分。

使用 avrow-cli 工具

您经常需要一种快速检查 avro 文件以进行调试的方法。为此,该存储库还附带了一个名为 avrow-cli 的工具(av),可以通过它从命令行检查 avro 数据文件。

有关详细信息,请参阅 avrow-cli 存储库。

安装 avrow-cli

cd avrow-cli
cargo install avrow-cli

使用 avrow-cli(二进制名称为 av

av read -d data.avro

read 子命令将打印出 data.avro 中的所有行,并以调试格式输出到标准输出。

Rust 本地类型到 Avro 值的映射(通过 Serde)

原语

Rust 本地类型(原语类型) Avro(《Value》)
(), Option::None null
bool boolean
i8, u8, i16, u16, i32, u32 int
i64, u64 long
f32 float
f64 double
&[u8], Vec<u8> bytes
&str, String string

复杂

Rust 本地类型(复杂类型) Avro
struct Foo {..} 记录
枚举 Foo {A,B}(变体中不能包含数据) 枚举
Vec<T> 其中T: Into<> 数组
HashMap<String, T> 其中T: Into<> 映射
T其中T: Into<> 并集
Vec<u8>:长度等于方案中定义的大小 固定

待办事项

  • 逻辑类型支持。
  • 排序读取。
  • 单个对象编码。
  • 将Schema Registry作为特质 - 这将允许avrow从远程模式注册表中读取和写入。
  • AsyncRead + AsyncWrite 读取器和写入器。
  • Avro协议消息和RPC支持。
  • 基准测试和优化。

变更日志

请参阅变更日志以获取发布历史。

贡献

欢迎所有类型的贡献。

请参阅CONTRIBUTING.md以获取贡献指南。

支持

Buy Me A Coffee

ko-fi

MSRV

Avrow在稳定的Rust版本上运行,从1.37+开始。它不使用任何nightly特性。

许可

双许可,任选其一:Apache License, Version 2.0MIT许可

除非您明确声明,否则您有意提交的任何贡献,根据Apache-2.0许可定义,均应双许可,无需任何附加条款或条件。

依赖项

~2–4MB
~77K SLoC