#arrow #python #udf #apache-arrow

arrow-udf-python

Apache Arrow UDF 的 Python 运行时

4 个版本

0.2.2 2024年6月24日
0.2.1 2024年5月23日
0.2.0 2024年5月23日
0.1.0 2024年4月25日

#760开发工具

Download history 42/week @ 2024-04-26 33/week @ 2024-05-03 1180/week @ 2024-05-10 1698/week @ 2024-05-17 707/week @ 2024-05-24 482/week @ 2024-05-31 450/week @ 2024-06-07 460/week @ 2024-06-14 498/week @ 2024-06-21 571/week @ 2024-06-28 497/week @ 2024-07-05 510/week @ 2024-07-12 320/week @ 2024-07-19 354/week @ 2024-07-26 389/week @ 2024-08-02 514/week @ 2024-08-09

每月 1,650 次下载

Apache-2.0

60KB
983

Apache Arrow 的 Python UDF

Crate Docs

注意:运行此库需要 Python 3.12。如果 python3 不是 3.12,请设置环境变量 PYO3_PYTHON=python3.12

将以下行添加到您的 Cargo.toml

[dependencies]
arrow-udf-python = "0.2"

创建一个 Runtime 并以字符串形式定义您的 Python 函数。请注意,函数名必须与您传递给 add_function 的名称匹配。

use arrow_udf_python::{CallMode, Runtime};

let mut runtime = Runtime::new().unwrap();
let python_code = r#"
def gcd(a: int, b: int) -> int:
    while b:
        a, b = b, a % b
    return a
"#;
let return_type = arrow_schema::DataType::Int32;
let mode = CallMode::ReturnNullOnNullInput;
runtime.add_function("gcd", return_type, mode, python_code).unwrap();

然后您可以在 RecordBatch 上调用 Python 函数

let input: RecordBatch = ...;
let output: RecordBatch = runtime.call("gcd", &input).unwrap();

Python 代码将在由 PyO3 驱动的嵌入式 CPython 3.12 解释器中运行。

请参阅 示例 了解更多详情。

结构类型

如果函数返回结构类型,您可以返回类实例或字典。

use arrow_schema::{DataType, Field};
use arrow_udf_python::{CallMode, Runtime};

let mut runtime = Runtime::new().unwrap();
let python_code = r#"
class KeyValue:
    def __init__(self, key, value):
        self.key = key
        self.value = value

def key_value(s: str):
    key, value = s.split('=')

    ## return a class instance
    return KeyValue(key, value)

    ## or return a dict
    return {"key": key, "value": value}
"#;
let return_type = DataType::Struct(
    vec![
        Field::new("key", DataType::Utf8, true),
        Field::new("value", DataType::Utf8, true),
    ]
    .into(),
);
let mode = CallMode::ReturnNullOnNullInput;
runtime.add_function("key_value", return_type, mode, python_code).unwrap();

扩展类型

此包还支持以下 Arrow 扩展类型

扩展类型 物理类型 ARROW:扩展:名称 Python 类型
JSON 字符串 arrowudf.json 任何(由 json.loads 解析)
十进制 字符串 arrowudf.decimal decimal.Decimal
Pickle 二进制 arrowudf.pickle 任何(由 pickle.loads 解析)

Pickle 类型

当字段为 pickle 类型时,数据以序列化形式存储在二进制数组中。

# use arrow_schema::{Field, DataType};
# use arrow_array::BinaryArray;
let pickle_field = Field::new("pickle", DataType::Binary, true)
    .with_metadata([("ARROW:extension:name".into(), "arrowudf.pickle".into())].into());
let pickle_array = BinaryArray::from(vec![&b"xxxxx"[..]]);

Pickle 类型在聚合函数的状态复杂时非常有用。

依赖项

~12–18MB
~254K SLoC