#dataframe #apache-arrow #arrow #convert #machine-learning #database

myval

轻量级的 Apache Arrow 数据帧

17 个版本

0.1.22 2023 年 5 月 6 日
0.1.21 2023 年 5 月 6 日
0.1.20 2023 年 4 月 23 日

#48 in 机器学习

Download history 122/week @ 2024-03-31

179 每月下载量

Apache-2.0

135KB
1.5K SLoC

Myval - Rust 的轻量级 Apache Arrow 数据帧 crates.io 页面 docs.rs 页面

什么是 Myval?

Mýval(发音为 [m'ival])从捷克语翻译而来,意为浣熊。

为什么不取熊的名字呢?

捷克语中浣熊的普通名字是 "medvídek mýval",可以翻译为 "小熊"。

但是 Polars 呢?

Myval 不是 Polars 的竞争对手。Myval 是一个轻量级的 Arrow 数据帧,专注于原地数据转换和 IPC。

因为 Arrow 有标准化的数据布局,数据帧可以无拷贝地转换为 Polars,反之亦然。

let polars_df = polars::frame::DataFrame::from(myval_df);
let myval_df = myval::DataFrame::from(polars_df);

同样,Myval 也是基于 arrow2 开发的。

一些技巧

IPC

假设接收到了来自 RPC 或 Pub/Sub 等的 Arrow 流块(Schema+Chunk)。将块转换为 Myval 数据帧。

let df = myval::DataFrame::from_ipc_block(&buf).unwrap();

需要发送数据帧回去?用一行代码将其转换为 Arrow 流块。

let buf = df.into_ipc_block().unwrap();

需要发送切片?没问题,有方法可以轻松返回切片系列、切片数据帧或 IPC 块。

覆盖数据类型

假设有一个 i64 列 "time",其中包含纳秒时间戳。让我们覆盖其数据类型。

use myval::{DataType, TimeUnit};

df.set_data_type("time",
    DataType::Timestamp(TimeUnit::Nanosecond, None)).unwrap();

从字符串解析数字

假设有一个 utf8 列 "value",需要解析为浮点数。

df.parse::<f64>("value").unwrap();

基本的原地数学运算

df.add("col", 1_000i64).unwrap();
df.sub("col", 1_000i64).unwrap();
df.mul("col", 1_000i64).unwrap();
df.div("col", 1_000i64).unwrap();

自定义原地转换

df.apply("time", |time| time.map(|t: i64| t / 1_000)).unwrap();

水平连接

df.join(df2).unwrap();

连接

let merged = myval::concat(&[&df1, &df2, &df3]).unwrap();

设置列排序

假设有一个 Myval 数据帧,具有 "voltage"、"temp1"、"temp2"、"temp3" 列,这些列以随机顺序从服务器接收数据。让我们将其排序纠正为正常顺序。

df.set_ordering(&["voltage", "temp1", "temp2", "temp3"]);

从/到 JSON

Myval 数据帧可以从 serde_json Value(仅地图)解析,或转换为 Value(地图/数组)。这需要 "json" 包的功能。

// create Object value from a data frame, converted to serde_json::Map
let val = serde_json::Value::Object(df.to_json_map().unwrap());
// define JSON parser
let mut parser = myval::convert::json::Parser::new()
    .with_type_mapping("name", DataType::LargeUtf8);
// add more columns if required
parser = parser.with_type_mapping("time", DataType::Int64);
parser = parser.with_type_mapping("status", DataType::Int32);
let parsed_df = parser.parse_value(val).unwrap();
  • 一些数据类型无法从 Value 对象正确解析(例如,时间戳),请使用 DataFrame 方法将其更正为所需的类型。

  • 如果在json::Parser对象中定义了列但在Value中缺失,则该列将以空值创建。

其他

请查看文档:https://docs.rs/myval

与数据库协作

Arrow提供了多种与数据库协作的方式。Myval还提供了通过流行的sqlxcrate(必须启用"postgres"功能)以简单方式处理PostgreSQL数据库的工具。

从数据库获取数据

use futures::stream::TryStreamExt;

let pool = PgPoolOptions::new()
    .connect("postgres://postgres:welcome@localhost/postgres")
    .await.unwrap();
let max_size = 100_000;
let mut stream = myval::db::postgres::fetch(
    "select * from test".to_owned(), Some(max_size), pool.clone());
// the stream returns data frames one by one with max data frame size (in
// bytes) = max_size
while let Some(df) = stream.try_next().await.unwrap() {
    // do some stuff
}

为什么流对象需要PgPool?有一个重要原因:此类流对象是静态的,可以存储在任何地方,例如在客户端-服务器架构中用作游标。

将数据推入数据库

服务器

let df = DataFrame::from_ipc_block(payload).unwrap();
// The first received data frame must have "database" field in its schema
// metadata. Next data frames can go without it.
if let Some(dbparams) = df.metadata().get("database") {
    let params: myval::db::postgres::Params = serde_json::from_str(dbparams)
        .unwrap();
    let processed_rows: usize = myval::db::postgres::push(&df, &params,
        &pool).await.unwrap();
}

客户端

让我们将Polars数据帧推入PostgreSQL数据库

use serde_json::json;

let mut df = myval::DataFrame::from(polars_df);
df.metadata_mut().insert(
    // set "database" metadata field
    "database".to_owned(),
    serde_json::to_string(&json!({
        // table, required
        "table": "test",
        // PostgreSQL schema, optional
        "postgres": { "schema": "public" },
        // keys, required if the table has got keys/unique indexes
        "keys": ["id"],
        // some field parameters
        "fields": {
            // another way to declare a key field
            //"id": { "key": true },
            // the following data frame columns contain strings which must be
            // sent to the database as JSON (for json/jsonb PostgreSQL types)
            "data1": { "json": true },
            "data2": { "json": true }
        }
    }))?,
);
// send the data frame to the server in a single or multiple chunks/blocks

支持的PostgreSQL类型

  • BOOL, INT2(16位整型),INT4(32位整型),INT8(64位整型),FLOAT4(32位浮点型),FLOAT8(64位浮点型)

  • TIMESTAMP, TIMESTAMPTZ(由于Arrow数组不能有不同的时区,因此丢弃时区信息)

  • CHAR, VARCHAR

  • JSON/JSONB(在检索时编码为LargeUtf8字符串)

一般限制

  • Myval不是为数据工程设计的。请使用Polars。

  • Myval序列只能包含一个块,并且没有计划扩展此功能。当将具有多个块的Polars数据帧转换为Myval时,块将自动聚合。

  • 某些功能(转换为Polars、PostgreSQL)是实验性的,请自行承担风险。

关于

Myval是Bohemia Automation开发EVA ICS Machine Learning kit的一部分。

Bohemia Automation / Altertech是一家拥有15年以上企业自动化和工业物联网经验的公司集团。我们的设置包括发电厂、工厂和城市基础设施。其中最大的拥有超过1M个传感器和受控设备,并且这个数字每天都在不断提高。

依赖关系

~6–21MB
~309K SLoC