17 个版本
0.1.22 | 2023 年 5 月 6 日 |
---|---|
0.1.21 | 2023 年 5 月 6 日 |
0.1.20 | 2023 年 4 月 23 日 |
#48 in 机器学习
179 每月下载量
135KB
1.5K SLoC
Myval - Rust 的轻量级 Apache Arrow 数据帧

什么是 Myval?
Mýval(发音为 [m'ival])从捷克语翻译而来,意为浣熊。
为什么不取熊的名字呢?
捷克语中浣熊的普通名字是 "medvídek mýval",可以翻译为 "小熊"。
但是 Polars 呢?
Myval 不是 Polars 的竞争对手。Myval 是一个轻量级的 Arrow 数据帧,专注于原地数据转换和 IPC。
因为 Arrow 有标准化的数据布局,数据帧可以无拷贝地转换为 Polars,反之亦然。
let polars_df = polars::frame::DataFrame::from(myval_df);
let myval_df = myval::DataFrame::from(polars_df);
同样,Myval 也是基于 arrow2 开发的。
一些技巧
IPC
假设接收到了来自 RPC 或 Pub/Sub 等的 Arrow 流块(Schema+Chunk)。将块转换为 Myval 数据帧。
let df = myval::DataFrame::from_ipc_block(&buf).unwrap();
需要发送数据帧回去?用一行代码将其转换为 Arrow 流块。
let buf = df.into_ipc_block().unwrap();
需要发送切片?没问题,有方法可以轻松返回切片系列、切片数据帧或 IPC 块。
覆盖数据类型
假设有一个 i64 列 "time",其中包含纳秒时间戳。让我们覆盖其数据类型。
use myval::{DataType, TimeUnit};
df.set_data_type("time",
DataType::Timestamp(TimeUnit::Nanosecond, None)).unwrap();
从字符串解析数字
假设有一个 utf8 列 "value",需要解析为浮点数。
df.parse::<f64>("value").unwrap();
基本的原地数学运算
df.add("col", 1_000i64).unwrap();
df.sub("col", 1_000i64).unwrap();
df.mul("col", 1_000i64).unwrap();
df.div("col", 1_000i64).unwrap();
自定义原地转换
df.apply("time", |time| time.map(|t: i64| t / 1_000)).unwrap();
水平连接
df.join(df2).unwrap();
连接
let merged = myval::concat(&[&df1, &df2, &df3]).unwrap();
设置列排序
假设有一个 Myval 数据帧,具有 "voltage"、"temp1"、"temp2"、"temp3" 列,这些列以随机顺序从服务器接收数据。让我们将其排序纠正为正常顺序。
df.set_ordering(&["voltage", "temp1", "temp2", "temp3"]);
从/到 JSON
Myval 数据帧可以从 serde_json Value(仅地图)解析,或转换为 Value(地图/数组)。这需要 "json" 包的功能。
// create Object value from a data frame, converted to serde_json::Map
let val = serde_json::Value::Object(df.to_json_map().unwrap());
// define JSON parser
let mut parser = myval::convert::json::Parser::new()
.with_type_mapping("name", DataType::LargeUtf8);
// add more columns if required
parser = parser.with_type_mapping("time", DataType::Int64);
parser = parser.with_type_mapping("status", DataType::Int32);
let parsed_df = parser.parse_value(val).unwrap();
-
一些数据类型无法从 Value 对象正确解析(例如,时间戳),请使用 DataFrame 方法将其更正为所需的类型。
-
如果在json::Parser对象中定义了列但在Value中缺失,则该列将以空值创建。
其他
请查看文档:https://docs.rs/myval
与数据库协作
Arrow提供了多种与数据库协作的方式。Myval还提供了通过流行的sqlxcrate(必须启用"postgres"功能)以简单方式处理PostgreSQL数据库的工具。
从数据库获取数据
use futures::stream::TryStreamExt;
let pool = PgPoolOptions::new()
.connect("postgres://postgres:welcome@localhost/postgres")
.await.unwrap();
let max_size = 100_000;
let mut stream = myval::db::postgres::fetch(
"select * from test".to_owned(), Some(max_size), pool.clone());
// the stream returns data frames one by one with max data frame size (in
// bytes) = max_size
while let Some(df) = stream.try_next().await.unwrap() {
// do some stuff
}
为什么流对象需要PgPool?有一个重要原因:此类流对象是静态的,可以存储在任何地方,例如在客户端-服务器架构中用作游标。
将数据推入数据库
服务器
let df = DataFrame::from_ipc_block(payload).unwrap();
// The first received data frame must have "database" field in its schema
// metadata. Next data frames can go without it.
if let Some(dbparams) = df.metadata().get("database") {
let params: myval::db::postgres::Params = serde_json::from_str(dbparams)
.unwrap();
let processed_rows: usize = myval::db::postgres::push(&df, ¶ms,
&pool).await.unwrap();
}
客户端
让我们将Polars数据帧推入PostgreSQL数据库
use serde_json::json;
let mut df = myval::DataFrame::from(polars_df);
df.metadata_mut().insert(
// set "database" metadata field
"database".to_owned(),
serde_json::to_string(&json!({
// table, required
"table": "test",
// PostgreSQL schema, optional
"postgres": { "schema": "public" },
// keys, required if the table has got keys/unique indexes
"keys": ["id"],
// some field parameters
"fields": {
// another way to declare a key field
//"id": { "key": true },
// the following data frame columns contain strings which must be
// sent to the database as JSON (for json/jsonb PostgreSQL types)
"data1": { "json": true },
"data2": { "json": true }
}
}))?,
);
// send the data frame to the server in a single or multiple chunks/blocks
支持的PostgreSQL类型
-
BOOL, INT2(16位整型),INT4(32位整型),INT8(64位整型),FLOAT4(32位浮点型),FLOAT8(64位浮点型)
-
TIMESTAMP, TIMESTAMPTZ(由于Arrow数组不能有不同的时区,因此丢弃时区信息)
-
CHAR, VARCHAR
-
JSON/JSONB(在检索时编码为LargeUtf8字符串)
一般限制
-
Myval不是为数据工程设计的。请使用Polars。
-
Myval序列只能包含一个块,并且没有计划扩展此功能。当将具有多个块的Polars数据帧转换为Myval时,块将自动聚合。
-
某些功能(转换为Polars、PostgreSQL)是实验性的,请自行承担风险。
关于
Myval是Bohemia Automation开发EVA ICS Machine Learning kit的一部分。
Bohemia Automation / Altertech是一家拥有15年以上企业自动化和工业物联网经验的公司集团。我们的设置包括发电厂、工厂和城市基础设施。其中最大的拥有超过1M个传感器和受控设备,并且这个数字每天都在不断提高。
依赖关系
~6–21MB
~309K SLoC