#apache-arrow #python #spark #interop #macro-derive #etl #pyspark

pyspark-arrow-rs

派生宏,用于将一些辅助函数添加到 Rust 结构体中,以便在 PySpark 的 mapInArrow 中使用

3 个不稳定版本

新增 0.3.0 2024 年 8 月 16 日
0.2.1 2024 年 7 月 7 日
0.2.0 2024 年 7 月 5 日

#1285 in Rust 模式

MIT/Apache

11KB
135

PySpark Arrow Rust (pyspark-arrow-rs)

目的是与 Pyo3 (maturin)、arrow-rs、serde_arrow 一起使用,生成可用于 Spark ETL 作业的 Python 代码

上下文

尝试使用 Spark 的直接方法是使用 Spark UDF 或 Pandas UDF。Spark 主要是用 JVM 编写的,并通过 Py4J 与 Python 交互。从 Java RDD 转换为 Python UDF 可以处理的格式带来的转换开销相当高。

因此,解决这个问题的一种方法是使用 Apache Arrow。PySpark 已经使用 Arrow 在 JVM 和 Python 之间进行互操作,通过直接利用 Arrow 格式,我们可以跳过任何额外的开销,并在 Rust 函数中直接读取 Arrow 对象。

RDD -> Spark (Java) -> Arrow -> Pyspark -> Rust

如何做到这一点

我们能够通过 pyspark.sql.DataFrame.mapInArrow API 将 Arrow 对象直接加载到通过 Maturin 包装的 Python 函数中。

此 API 预期两个参数

  • 一个产生 Apache Arrow RecordBatch 项目的 Python 函数
  • 预期的模式

我们能够编写一个返回可以由 Python 函数产生的 arrow-rs RecordBatch 项的 Rust 函数。

我们还可以通过使用 serde_arrow(以及此包提供的宏)派生模式来传递预期的模式。

总结

此包提供了一些宏,使与 mapInArrow 一起使用更加容易,并提供了一些关于如何在 PySpark 中利用 Rust 的配方。

示例用法

  • 请参阅 tests 文件夹中的真实示例。但基本思想是,给定一个结构体,您可以在 PySpark DataFrame 的 map_in_arrow 上生成一个 Spark DDL。
use pyspark_arrow_rs::HasArrowSparkSchema;
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug, HasArrowSparkSchema)]
pub(crate) struct TestStruct {
    pub(crate) col: String,
    pub(crate) int_col: i64,
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_macro() {
        let _a = TestStruct {
            col: "bla".to_string(),
            int_col: 1000,
        };
        let ddl = TestStruct::get_spark_ddl();
        assert_eq!("`col` STRING, `int_col` BIGINT", ddl.unwrap());
    }
}

依赖项

~16–24MB
~363K SLoC