#arrow #macro #recipe #proc #proc-macro #pyspark #pyspark-arrow-rs

pyspark-arrow-rs-impl

pyspark-arrow-rs的内部过程宏实现

1个不稳定版本

0.2.0 2024年7月5日

#41#recipe

Download history 100/week @ 2024-07-02 11/week @ 2024-07-09

111 每月下载量
pyspark-arrow-rs 中使用

MIT/Apache

3KB

Pyspark Arrow Rust (pyspark-arrow-rs)

目的是与Pyo3 (maturin)、arrow-rs、serde_arrow一起使用,以生成可用于Spark ETL作业的Python代码

上下文

尝试使用Spark的原始方法是使用Spark UDF或Pandas UDF。Spark是原生用JVM编写的,通过Py4J与Python交互。将Java RDD转换为Python UDF可以处理的东西的开销相当高。

因此,绕过这个问题的方法是使用Apache Arrow。PySpark已经使用Arrow在JVM和Python之间进行交互,通过直接利用Arrow格式,我们能够跳过任何额外的开销,并在Rust函数中直接读取Arrow对象。

RDD -> Spark (Java) -> Arrow -> Pyspark -> Rust

如何做到这一点

我们能够通过pyspark.sql.DataFrame.mapInArrow API将Arrow对象直接加载到由Python(通过Maturin)包装的Rust函数中。

此API期望两个参数

  • 一个生成Apache Arrow RecordBatch项的Python函数
  • 预期的模式

我们可以编写一个Rust函数,该函数返回一个arrow-rs RecordBatch项,可以被Python函数产生。

我们还可以通过使用serde_arrow(以及此包提供的宏)来推导模式,传递预期的模式。

TL;DR

此包提供了一些宏,使得与mapInArrow一起使用更加容易,并提供了一些关于如何在Pyspark中利用Rust的食谱。

示例用法

use pyspark_arrow_rs::HasArrowSparkSchema;
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug, HasArrowSparkSchema)]
pub(crate) struct TestStruct {
    pub(crate) col: String,
    pub(crate) int_col: i64,
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_macro() {
        let _a = TestStruct {
            col: "bla".to_string(),
            int_col: 1000,
        };
        let ddl = TestStruct::get_spark_ddl();
        assert_eq!("`col` STRING, `int_col` BIGINT", ddl.unwrap());
    }
}

依赖关系

~1.5MB
~35K SLoC