7 个版本
| 0.0.1-beta.5 | 2024年7月3日 |
|---|---|
| 0.0.1-beta.4 | 2024年5月10日 |
| 0.0.1-beta.3 | 2024年4月2日 |
| 0.0.1-beta.2 | 2024年3月24日 |
| 0.0.1-beta | 2023年9月20日 |
428 在 数据库接口 中排名
54 每月下载量
400KB
7K SLoC
Apache Spark Connect Client for Rust
该项目包含了为 Spark Connect 编写的 Apache Spark 的实验性客户端,使用 Rust 语言编写。
项目当前状态
目前,Rust 的 Spark Connect 客户端处于高度实验性,**不应该在生产环境中使用**。这是一个“概念验证”,用于确定从 Rust 与 Spark 集群交互的方法。
spark-connect-rs 旨在提供访问 Spark Connect 的入口点,并提供类似 DataFrame API 的交互。
项目布局
├── core <- core implementation in Rust
│ └─ spark <- git submodule for apache/spark
├── rust <- shim for 'spark-connect-rs' from core
├── examples <- examples of using different aspects of the crate
├── datasets <- sample files from the main spark repo
未来的状态将是,在顶层 rust 文件夹旁边,提供其他语言的附加绑定。
入门指南
本节解释了从 0 开始如何在本地运行 Spark Connect Rust。
步骤 1:使用 rustup 安装 rust: https://rust-lang.net.cn/tools/install
步骤 2:确保您已经在您的计算机上安装了 cmake 和 protobuf
步骤 3:运行以下命令以克隆仓库
git clone https://github.com/sjrusso8/spark-connect-rs.git
git submodule update --init --recursive
cargo build
步骤 4:在本地主机上设置 Spark Driver,通过下载 spark 或使用 docker。
使用本地 spark
-
下载 Spark 发行版 (推荐 3.5.1),解压缩包。
-
将您的
SPARK_HOME环境变量设置为 spark 解压到的位置, -
使用以下命令启动 Spark Connect 服务器(确保使用与您的 Spark 发行版匹配的包版本)
$ $SPARK_HOME/sbin/start-connect-server.sh --packages "org.apache.spark:spark-connect_2.12:3.5.1,io.delta:delta-spark_2.12:3.0.0" \
--conf "spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp" \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
使用 docker
- 通过利用此仓库中创建的
docker-compose.yml来启动 Spark Connect 服务器。这将启动一个在端口 15002 上运行的 Spark Connect 服务器
$ docker compose up --build -d
步骤 5:在 /examples 下运行仓库中的示例
特性
以下部分概述了Spark Connect实现中尚未工作的一些较大功能。
通过功能标志
feature = 'tls'实现TLS身份验证和Databricks兼容性StreamingQueryManager
UDFs或任何接受闭包的功能(foreach、foreachBatch等)
SparkSession
Spark Session 类型对象及其实现的特征
| SparkSession | API | 注释 |
|---|---|---|
| 活跃 | ||
| addArtifact(s) | ||
| addTag | ||
| clearTags | ||
| copyFromLocalToFs | ||
| createDataFrame | 部分。仅对 RecordBatch 有效 |
|
| getActiveSessions | ||
| getTags | ||
| interruptAll | splitn | |
| interruptOperation | ||
| interruptTag | ||
| newSession | ||
| range | ||
| removeTag | ||
| sql | ||
| stop | ||
| table | ||
| catalog | 目录 | |
| 客户端 | 仅用于测试的不稳定开发API | |
| conf | Conf | |
| read | DataFrameReader | |
| readStream | DataStreamReader | |
| streams | 流 | |
| udf | Udf - 可能不可用 | |
| udtf | Udtf - 可能不可用 | |
| version |
SparkSessionBuilder
| SparkSessionBuilder | API | 注释 |
|---|---|---|
| appName | ||
| config | ||
| master | ||
| remote | 使用 spark连接字符串 进行验证 |
StreamingQueryManager
| StreamingQueryManager | API | 注释 |
|---|---|---|
| awaitAnyTermination | ||
| get | ||
| resetTerminated | ||
| 活跃 |
StreamingQuery
| StreamingQuery | API | 注释 |
|---|---|---|
| awaitTermination | ||
| exception | ||
| explain | ||
| processAllAvailable | ||
| stop | ||
| id | ||
| isActive | ||
| lastProgress | ||
| name | ||
| recentProgress | ||
| runId | ||
| status |
DataStreamReader
| DataStreamReader | API | 注释 |
|---|---|---|
| csv | ||
| format | ||
| json | ||
| load | ||
| option | ||
| options | ||
| orc | ||
| parquet | ||
| schema | ||
| table | ||
| text |
DataFrameReader
| DataFrameReader | API | 注释 |
|---|---|---|
| csv | ||
| format | ||
| json | ||
| load | ||
| option | ||
| options | ||
| orc | ||
| parquet | ||
| schema | ||
| table | ||
| text |
DataStreamWriter
启动流作业并返回一个 StreamingQuery 对象来处理流操作。
| DataStreamWriter | API | 注释 |
|---|---|---|
| foreach | ||
| foreachBatch | ||
| format | ||
| option | ||
| options | ||
| outputMode | 使用枚举 OutputMode |
|
| partitionBy | ||
| queryName | ||
| start | ||
| toTable | ||
| trigger | 使用枚举 TriggerMode |
StreamingQueryListener
| StreamingQueryListener | API | 注释 |
|---|---|---|
| onQueryIdle | ||
| onQueryProgress | ||
| onQueryStarted | ||
| onQueryTerminated |
UdfRegistration (可能不可用)
| UDFRegistration | API | 注释 |
|---|---|---|
| register | ||
| registerJavaFunction | ||
| registerJavaUDAF |
UdtfRegistration (可能不可用)
| UDTFRegistration | API | 注释 |
|---|---|---|
| register |
RuntimeConfig
| RuntimeConfig | API | 注释 |
|---|---|---|
| get | ||
| isModifiable | ||
| set | ||
| unset |
目录
| 目录 | API | 注释 |
|---|---|---|
| cacheTable | ||
| clearCache | ||
| createExternalTale | ||
| createTable | ||
| currentCatalog | ||
| currentDatabase | ||
| databaseExists | ||
| dropGlobalTempView | ||
| dropTempView | ||
| functionExists | ||
| getDatabase | ||
| getFunction | ||
| getTable | ||
| isCached | ||
| listCatalogs | ||
| listDatabases | ||
| listFunctions | ||
| listTables | ||
| recoverPartitions | ||
| refreshByPath | ||
| refreshTable | ||
| registerFunction | ||
| setCurrentCatalog | ||
| setCurrentDatabase | ||
| tableExists | ||
| uncacheTable |
DataFrame
Spark DataFrame 类型对象及其实现特性。
| DataFrame | API | 注释 |
|---|---|---|
| agg | ||
| alias | ||
| approxQuantile | ||
| cache | ||
| checkpoint | ||
| coalesce | ||
| colRegex | ||
| collect | ||
| columns | ||
| corr | ||
| count | ||
| cov | ||
| createGlobalTempView | ||
| createOrReplaceGlobalTempView | ||
| createOrReplaceTempView | ||
| createTempView | ||
| crossJoin | ||
| crosstab | ||
| cube | ||
| describe | ||
| distinct | ||
| drop | ||
| dropDuplicates | ||
| dropDuplicatesWithinWatermark | 窗口函数目前正在开发中 | |
| drop_duplicates | ||
| dropna | ||
| dtypes | ||
| exceptAll | ||
| explain | ||
| fillna | ||
| filter | ||
| first | ||
| foreach | ||
| foreachPartition | ||
| freqItems | ||
| groupBy | ||
| head | ||
| hint | ||
| inputFiles | ||
| intersect | ||
| intersectAll | ||
| isEmpty | ||
| isLocal | ||
| isStreaming | ||
| join | ||
| limit | ||
| localCheckpoint | ||
| mapInPandas | 关于此特定实现的详细信息待定 | |
| mapInArrow | 关于此特定实现的详细信息待定 | |
| melt | ||
| na | ||
| observe | ||
| offset | ||
| orderBy | ||
| persist | ||
| printSchema | ||
| randomSplit | ||
| registerTempTable | ||
| repartition | ||
| repartitionByRange | ||
| replace | ||
| rollup | ||
| sameSemantics | ||
| sample | ||
| sampleBy | ||
| schema | ||
| select | ||
| selectExpr | ||
| semanticHash | ||
| show | ||
| sort | ||
| sortWithinPartitions | ||
| sparkSession | ||
| stat | ||
| storageLevel | ||
| subtract | ||
| summary | ||
| tail | ||
| take | ||
| to | ||
| toDF | ||
| toJSON | 不返回一个 RDD,而是一个长 JSON 格式的 String |
|
| toLocalIterator | ||
转换为 polars::frame::DataFrame |
||
| new to_datafusion & toDataFusion | 转换为 datafusion::dataframe::DataFrame |
|
| transform | ||
| union | ||
| unionAll | ||
| unionByName | ||
| unpersist | ||
| unpivot | ||
| where | 使用 filter 替代,where 是 rust 的关键字 |
|
| withColumn | ||
| withColumns | ||
| withColumnRenamed | ||
| withColumnsRenamed | ||
| withMetadata | ||
| withWatermark | ||
| write | ||
| writeStream | ||
| writeTo |
DataFrameWriter
Spark Connect 应 尊重格式,只要您的集群支持指定的类型并且具有所需的 jars
| DataFrameWriter | API | 注释 |
|---|---|---|
| bucketBy | ||
| csv | ||
| format | ||
| insertInto | ||
| jdbc | ||
| json | ||
| mode | ||
| option | ||
| options | ||
| orc | ||
| parquet | ||
| partitionBy | ||
| save | ||
| saveAsTable | ||
| sortBy | ||
| text |
DataFrameWriterV2
| DataFrameWriterV2 | API | 注释 |
|---|---|---|
| append | ||
| create | ||
| createOrReplace | ||
| option | ||
| options | ||
| overwrite | ||
| overwritePartitions | ||
| partitionedBy | ||
| replace | ||
| tableProperty | ||
| using |
Column
Spark Column 类型对象及其实现特性
| Column | API | 注释 |
|---|---|---|
| alias | ||
| asc | ||
| asc_nulls_first | ||
| asc_nulls_last | ||
| astype | ||
| between | ||
| cast | ||
| contains | ||
| desc | ||
| desc_nulls_first | ||
| desc_nulls_last | ||
| dropFields | ||
| endswith | ||
| eqNullSafe | ||
| getField | 这已被弃用但需要实现 | |
| getItem | 这已被弃用但需要实现 | |
| ilike | ||
| isNotNull | ||
| isNull | ||
| isin | ||
| like | ||
| name | ||
| otherwise | ||
| over | 请参阅 窗口 以创建窗口规范 | |
| rlike | ||
| startswith | ||
| substr | ||
| when | ||
| withField | ||
eq == |
Rust不喜欢当您尝试重载==并返回除了 |
|
加法 + |
||
减法 - |
||
乘法 * |
||
除法 / |
||
或 | |
||
与 & |
||
异或 ^ |
||
取反 ~ |
函数
只有少数函数被单元测试覆盖。
| 函数 | API | 注释 |
|---|---|---|
| 绝对值 | ||
| 反余弦 | ||
| 反双曲余弦 | ||
| 添加月份 | ||
| 聚合 | ||
| approxCountDistinct | ||
| approx_count_distinct | ||
| 数组 | ||
| array_append | ||
| array_compact | ||
| array_contains | ||
| array_distinct | ||
| array_except | ||
| array_insert | ||
| array_intersect | ||
| array_join | ||
| array_max | ||
| array_min | ||
| array_position | ||
| array_remove | ||
| array_repeat | ||
| array_sort | ||
| array_union | ||
| arrays_overlap | ||
| arrays_zip | ||
| asc | ||
| asc_nulls_first | ||
| asc_nulls_last | ||
| ascii | ||
| 反正弦 | ||
| 反双曲正弦 | ||
| assert_true | ||
| 反正切 | ||
| atan2 | ||
| 反双曲正切 | ||
| 平均值 | ||
| base64 | ||
| bin | ||
| 位长度 | ||
| bitwiseNOT | ||
| bitwise_not | ||
| broadcast | ||
| bround | ||
| bucket | ||
| call_udf | ||
| cbrt | ||
| ceil | ||
| coalesce | ||
| col | ||
| collect_list | ||
| collect_set | ||
| column | ||
| concat | ||
| concat_ws | ||
| conv | ||
| corr | ||
| cos | ||
| cosh | ||
| cot | ||
| count | ||
| countDistinct | ||
| count_distinct | ||
| covar_pop | ||
| covar_samp | ||
| crc32 | ||
| create_map | ||
| csc | ||
| cume_dist | ||
| current_date | ||
| current_timestamp | ||
| date_add | ||
| date_format | ||
| date_sub | ||
| date_trunc | ||
| datediff | ||
| dayofmonth | ||
| dayofweek | ||
| dayofyear | ||
| days | ||
| decode | ||
| degrees | ||
| dense_rank | ||
| desc | ||
| desc_nulls_first | ||
| desc_nulls_last | ||
| element_at | ||
| encode | ||
| exists | ||
| exp | ||
| explode | ||
| explode_outer | ||
| expm1 | ||
| expr | ||
| 阶乘 | ||
| filter | ||
| first | ||
| flatten | ||
| floor | ||
| forall | ||
| format_number | ||
| format_string | ||
| from_csv | ||
| from_json | ||
| from_unixtime | ||
| from_utc_timestamp | ||
| functools | ||
| get | ||
| get_active_spark_context | ||
| get_json_object | ||
| greatest | ||
| grouping | ||
| grouping_id | ||
| has_numpy | ||
| hash | ||
| hex | ||
| hour | ||
| hours | ||
| hypot | ||
| initcap | ||
| inline | ||
| inline_outer | ||
| input_file_name | ||
| inspect | ||
| instr | ||
| isnan | ||
| isnull | ||
| json_tuple | ||
| kurtosis | ||
| lag | ||
| last | ||
| last_day | ||
| lead | ||
| least | ||
| length | ||
| levenshtein | ||
| lit | ||
| localtimestamp | ||
| locate | ||
| log | ||
| log10 | ||
| log1p | ||
| log2 | ||
| lower | ||
| lpad | ||
| ltrim | ||
| make_date | ||
| map_concat | ||
| map_contains_key | ||
| map_entries | ||
| map_filter | ||
| map_from_arrays | ||
| map_from_entries | ||
| map_keys | ||
| map_values | ||
| map_zip_with | ||
| max | ||
| max_by | ||
| md5 | ||
| mean | ||
| median | ||
| min | ||
| min_by | ||
| minute | ||
| mode | ||
| monotonically_increasing_id | ||
| month | ||
| months | ||
| months_between | ||
| nanvl | ||
| next_day | ||
| np | ||
| nth_value | ||
| ntile | ||
| octet_length | ||
| overlay | ||
| overload | ||
| pandas_udf | ||
| percent_rank | ||
| percentile_approx | ||
| pmod | ||
| posexplode | ||
| posexplode_outer | ||
| pow | ||
| product | ||
| quarter | ||
| radians | ||
| raise_error | ||
| rand | ||
| randn | ||
| rank | ||
| regexp_extract | ||
| regexp_replace | ||
| repeat | ||
| reverse | ||
| rint | ||
| round | ||
| row_number | ||
| rpad | ||
| rtrim | ||
| schema_of_csv | ||
| schema_of_json | ||
| sec | ||
| second | ||
| sentences | ||
| sequence | ||
| session_window | ||
| sha1 | ||
| sha2 | ||
| shiftLeft | ||
| shiftRight | ||
| shiftRightUnsigned | ||
| shiftleft | ||
| shiftright | ||
| shiftrightunsigned | ||
| shuffle | ||
| signum | ||
| sin | ||
| sinh | ||
| size | ||
| skewness | ||
| slice | ||
| sort_array | ||
| soundex | ||
| spark_partition_id | ||
| split | ||
| sqrt | ||
| stddev | ||
| stddev_pop | ||
| stddev_samp | ||
| struct | ||
| substring | ||
| substring_index | ||
| sum | ||
| sumDistinct | ||
| sum_distinct | ||
| sys | ||
| tan | ||
| tanh | ||
| timestamp_seconds | ||
| toDegrees | ||
| toRadians | ||
| to_csv | ||
| to_date | ||
| to_json | ||
| to_str | ||
| to_timestamp | ||
| to_utc_timestamp | ||
| transform | ||
| transform_keys | ||
| transform_values | ||
| translate | ||
| trim | ||
| trunc | ||
| try_remote_functions | ||
| udf | ||
| unbase64 | ||
| unhex | ||
| unix_timestamp | ||
| unwrap_udt | ||
| upper | ||
| var_pop | ||
| var_samp | ||
| variance | ||
| warnings | ||
| weekofyear | ||
| when | ||
| window | ||
| window_time | ||
| xxhash64 | ||
| year | ||
| years | ||
| zip_with |
数据类型
数据类型用于创建模式和将列转换为特定类型
| Column | API | 注释 |
|---|---|---|
| 数组类型 | ||
| 二进制类型 | ||
| 布尔类型 | ||
| 字节类型 | ||
| 日期类型 | ||
| 十进制类型 | ||
| 双精度浮点类型 | ||
| 浮点类型 | ||
| 整数类型 | ||
| 长整型 | ||
| 映射类型 | ||
| 空类型 | ||
| 短整型 | ||
| 字符串类型 | ||
| 字符类型 | ||
| 可变长字符串类型 | ||
| 结构字段 | ||
| 结构类型 | ||
| 时间戳类型 | ||
| 无时区时间戳类型 | ||
| 天时间间隔类型 | ||
| 年月间隔类型 |
文字类型
从这些Rust类型创建Spark文字类型。例如,`lit(1_i64)`会在模式中表示为`LongType()`。
数组可以创建为`lit([1_i16, 2_i16, 3_i16])`,这会产生一个`ArrayType(Short)`,因为切片中的所有值都可以转换为文字类型。
| Spark文字类型 | Rust类型 | 状态 |
|---|---|---|
| 空 | ||
| 二进制 | &[u8] |
|
| 布尔 | bool |
|
| 字节 | ||
| 短整型 | i16 |
|
| 整数 | i32 |
|
| 长整型 | i64 |
|
| 浮点 | f32 |
|
| 双精度浮点 | f64 |
|
| 十进制 | ||
| 字符串 | &str / String | |
| 日期 | chrono::NaiveDate |
|
| Timestamp | chrono::DateTime<Tz> |
|
| TimestampNtz | chrono::NaiveDateTime |
|
| CalendarInterval | ||
| YearMonthInterval | ||
| DayTimeInterval | ||
| 数组 | slice / Vec | |
| 映射 | 使用函数`create_map`创建 | |
| 结构 | 使用函数`struct_col`或`named_struct`创建 |
窗口 & 窗口规范
为了便于使用,建议使用 Window 创建 WindowSpec。
| Window | API | 注释 |
|---|---|---|
| currentRow | ||
| orderBy | ||
| partitionBy | ||
| rangeBetween | ||
| rowsBetween | ||
| unboundedFollowing | ||
| unboundedPreceding | ||
| WindowSpec.orderBy | ||
| WindowSpec.partitionBy | ||
| WindowSpec.rangeBetween | ||
| WindowSpec.rowsBetween |
依赖关系
~23–35MB
~568K SLoC