1个不稳定版本
0.2.0 | 2024年7月5日 |
---|
#41 在 #recipe
111 每月下载量
在 pyspark-arrow-rs 中使用
3KB
Pyspark Arrow Rust (pyspark-arrow-rs)
目的是与Pyo3 (maturin)、arrow-rs、serde_arrow一起使用,以生成可用于Spark ETL作业的Python代码
上下文
尝试使用Spark的原始方法是使用Spark UDF或Pandas UDF。Spark是原生用JVM编写的,通过Py4J与Python交互。将Java RDD转换为Python UDF可以处理的东西的开销相当高。
因此,绕过这个问题的方法是使用Apache Arrow。PySpark已经使用Arrow在JVM和Python之间进行交互,通过直接利用Arrow格式,我们能够跳过任何额外的开销,并在Rust函数中直接读取Arrow对象。
RDD -> Spark (Java) -> Arrow -> Pyspark -> Rust
如何做到这一点
我们能够通过pyspark.sql.DataFrame.mapInArrow
API将Arrow对象直接加载到由Python(通过Maturin)包装的Rust函数中。
此API期望两个参数
- 一个生成Apache Arrow RecordBatch项的Python函数
- 预期的模式
我们可以编写一个Rust函数,该函数返回一个arrow-rs RecordBatch项,可以被Python函数产生。
我们还可以通过使用serde_arrow(以及此包提供的宏)来推导模式,传递预期的模式。
TL;DR
此包提供了一些宏,使得与mapInArrow
一起使用更加容易,并提供了一些关于如何在Pyspark中利用Rust的食谱。
示例用法
use pyspark_arrow_rs::HasArrowSparkSchema;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, HasArrowSparkSchema)]
pub(crate) struct TestStruct {
pub(crate) col: String,
pub(crate) int_col: i64,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_macro() {
let _a = TestStruct {
col: "bla".to_string(),
int_col: 1000,
};
let ddl = TestStruct::get_spark_ddl();
assert_eq!("`col` STRING, `int_col` BIGINT", ddl.unwrap());
}
}
依赖关系
~1.5MB
~35K SLoC