2个版本
| 0.0.1-beta.5 | 2024年7月3日 |
|---|---|
| 0.0.1-beta.4 | 2024年5月10日 |
#1204 在 数据库接口 中
350KB
7K SLoC
Apache Spark Connect Client for Rust
本项目包含Apache Spark的Spark Connect客户端,采用Rust语言编写。
项目当前状态
目前,Rust版本的Spark Connect客户端处于高度实验性阶段,**不建议在任何生产环境中使用**。这是一个“概念验证”,用于确定从Rust与Spark集群交互的方法。
spark-connect-rs旨在提供Spark Connect的入口点,并提供类似的DataFrame API交互。
项目结构
├── core <- core implementation in Rust
│ └─ spark <- git submodule for apache/spark
├── rust <- shim for 'spark-connect-rs' from core
├── examples <- examples of using different aspects of the crate
├── datasets <- sample files from the main spark repo
未来的状态将是,在顶层rust文件夹旁边,为其他语言提供额外的绑定。
入门指南
本节解释了如何从0开始在本地上运行Spark Connect Rust。
步骤 1:通过rustup安装rust: https://rust-lang.net.cn/tools/install
步骤 3:运行以下命令以克隆仓库
git clone https://github.com/sjrusso8/spark-connect-rs.git
git submodule update --init --recursive
cargo build
步骤 4:在本地主机上设置Spark驱动程序,可以通过下载Spark或使用docker进行。
使用本地Spark
-
下载Spark发行版 (推荐3.5.1),解压缩包。
-
将你的
SPARK_HOME环境变量设置为Spark解压到的位置, -
使用以下命令启动Spark Connect服务器(确保使用与你的Spark发行版匹配的包版本)
$ $SPARK_HOME/sbin/start-connect-server.sh --packages "org.apache.spark:spark-connect_2.12:3.5.1,io.delta:delta-spark_2.12:3.0.0" \
--conf "spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp" \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
使用docker
- 通过在这个仓库中创建的
docker-compose.yml启动Spark Connect服务器。这将启动一个在15002端口运行的Spark Connect服务器
$ docker compose up --build -d
步骤 5:在 /examples 下运行仓库中的示例
特性
以下部分概述了Spark Connect实现中尚未支持的一些较大功能。
通过功能标志
feature = 'tls'实现TLS身份验证和Databricks兼容性StreamingQueryManager
UDFs或任何需要闭包的功能(foreach、foreachBatch等)
SparkSession
Spark Session 类型对象及其实现特征
| SparkSession | API | 注释 |
|---|---|---|
| 激活的 | ||
| addArtifact(s) | ||
| addTag | ||
| clearTags | ||
| copyFromLocalToFs | ||
| createDataFrame | 部分。仅适用于 RecordBatch |
|
| getActiveSessions | ||
| getTags | ||
| interruptAll | splitn | |
| interruptOperation | ||
| interruptTag | ||
| newSession | ||
| range | ||
| removeTag | ||
| sql | ||
| stop | ||
| table | ||
| catalog | 目录 | |
| client | 仅用于测试的不可稳定开发API | |
| conf | Conf | |
| read | DataFrameReader | |
| readStream | DataStreamReader | |
| streams | 流 | |
| udf | Udf - 可能不可行 | |
| udtf | Udtf - 可能不可行 | |
| 版本 |
SparkSessionBuilder
| SparkSessionBuilder | API | 注释 |
|---|---|---|
| appName | ||
| config | ||
| master | ||
| remote | 使用 spark连接字符串 进行验证 |
StreamingQueryManager
| StreamingQueryManager | API | 注释 |
|---|---|---|
| awaitAnyTermination | ||
| get | ||
| resetTerminated | ||
| 激活的 |
StreamingQuery
| StreamingQuery | API | 注释 |
|---|---|---|
| awaitTermination | ||
| exception | ||
| explain | ||
| processAllAvailable | ||
| stop | ||
| id | ||
| isActive | ||
| lastProgress | ||
| name | ||
| recentProgress | ||
| runId | ||
| status |
DataStreamReader
| DataStreamReader | API | 注释 |
|---|---|---|
| csv | ||
| format | ||
| json | ||
| load | ||
| option | ||
| options | ||
| orc | ||
| parquet | ||
| schema | ||
| table | ||
| text |
DataFrameReader
| DataFrameReader | API | 注释 |
|---|---|---|
| csv | ||
| format | ||
| json | ||
| load | ||
| option | ||
| options | ||
| orc | ||
| parquet | ||
| schema | ||
| table | ||
| text |
DataStreamWriter
启动流作业并返回一个 StreamingQuery 对象以处理流操作。
| DataStreamWriter | API | 注释 |
|---|---|---|
| foreach | ||
| foreachBatch | ||
| format | ||
| option | ||
| options | ||
| outputMode | 使用Enum为 OutputMode |
|
| partitionBy | ||
| queryName | ||
| start | ||
| toTable | ||
| trigger | 使用Enum为 TriggerMode |
StreamingQueryListener
| StreamingQueryListener | API | 注释 |
|---|---|---|
| onQueryIdle | ||
| onQueryProgress | ||
| onQueryStarted | ||
| onQueryTerminated |
UdfRegistration (可能不可行)
| UDFRegistration | API | 注释 |
|---|---|---|
| register | ||
| registerJavaFunction | ||
| registerJavaUDAF |
UdtfRegistration (可能不可行)
| UDTFRegistration | API | 注释 |
|---|---|---|
| register |
RuntimeConfig
| RuntimeConfig | API | 注释 |
|---|---|---|
| get | ||
| isModifiable | ||
| set | ||
| unset |
目录
| 目录 | API | 注释 |
|---|---|---|
| cacheTable | ||
| clearCache | ||
| createExternalTale | ||
| createTable | ||
| currentCatalog | ||
| currentDatabase | ||
| databaseExists | ||
| dropGlobalTempView | ||
| dropTempView | ||
| functionExists | ||
| getDatabase | ||
| getFunction | ||
| getTable | ||
| isCached | ||
| listCatalogs | ||
| listDatabases | ||
| listFunctions | ||
| listTables | ||
| recoverPartitions | ||
| refreshByPath | ||
| refreshTable | ||
| registerFunction | ||
| setCurrentCatalog | ||
| setCurrentDatabase | ||
| tableExists | ||
| uncacheTable |
DataFrame
Spark DataFrame 类型对象及其实现的特性。
| DataFrame | API | 注释 |
|---|---|---|
| agg | ||
| alias | ||
| approxQuantile | ||
| cache | ||
| checkpoint | ||
| coalesce | ||
| colRegex | ||
| collect | ||
| columns | ||
| corr | ||
| count | ||
| cov | ||
| createGlobalTempView | ||
| createOrReplaceGlobalTempView | ||
| createOrReplaceTempView | ||
| createTempView | ||
| crossJoin | ||
| crosstab | ||
| cube | ||
| describe | ||
| distinct | ||
| drop | ||
| dropDuplicates | ||
| dropDuplicatesWithinWatermark | 窗口函数目前正在开发中 | |
| drop_duplicates | ||
| dropna | ||
| dtypes | ||
| exceptAll | ||
| explain | ||
| fillna | ||
| filter | ||
| first | ||
| foreach | ||
| foreachPartition | ||
| freqItems | ||
| groupBy | ||
| head | ||
| hint | ||
| inputFiles | ||
| intersect | ||
| intersectAll | ||
| isEmpty | ||
| isLocal | ||
| isStreaming | ||
| join | ||
| limit | ||
| localCheckpoint | ||
| mapInPandas | 此确切实现的待定 | |
| mapInArrow | 此确切实现的待定 | |
| melt | ||
| na | ||
| observe | ||
| offset | ||
| orderBy | ||
| persist | ||
| printSchema | ||
| randomSplit | ||
| registerTempTable | ||
| repartition | ||
| repartitionByRange | ||
| replace | ||
| rollup | ||
| sameSemantics | ||
| sample | ||
| sampleBy | ||
| schema | ||
| select | ||
| selectExpr | ||
| semanticHash | ||
| show | ||
| sort | ||
| sortWithinPartitions | ||
| sparkSession | ||
| stat | ||
| storageLevel | ||
| subtract | ||
| summary | ||
| tail | ||
| take | ||
| to | ||
| toDF | ||
| toJSON | 不返回一个 RDD,而是一个长的 JSON 格式化的 String |
|
| toLocalIterator | ||
转换为 polars::frame::DataFrame |
||
| new to_datafusion & toDataFusion | 转换为 datafusion::dataframe::DataFrame |
|
| transform | ||
| union | ||
| unionAll | ||
| unionByName | ||
| unpersist | ||
| unpivot | ||
| where | 使用 filter 代替,where 是 rust 的关键字 |
|
| withColumn | ||
| withColumns | ||
| withColumnRenamed | ||
| withColumnsRenamed | ||
| withMetadata | ||
| withWatermark | ||
| write | ||
| writeStream | ||
| writeTo |
DataFrameWriter
Spark Connect 应当尊重格式,只要您的集群支持指定的类型并且具有所需的 jars
| DataFrameWriter | API | 注释 |
|---|---|---|
| bucketBy | ||
| csv | ||
| format | ||
| insertInto | ||
| jdbc | ||
| json | ||
| mode | ||
| option | ||
| options | ||
| orc | ||
| parquet | ||
| partitionBy | ||
| save | ||
| saveAsTable | ||
| sortBy | ||
| text |
DataFrameWriterV2
| DataFrameWriterV2 | API | 注释 |
|---|---|---|
| append | ||
| create | ||
| createOrReplace | ||
| option | ||
| options | ||
| overwrite | ||
| overwritePartitions | ||
| partitionedBy | ||
| replace | ||
| tableProperty | ||
| using |
Column
Spark Column 类型对象及其实现的特性
| Column | API | 注释 |
|---|---|---|
| alias | ||
| asc | ||
| asc_nulls_first | ||
| asc_nulls_last | ||
| astype | ||
| between | ||
| cast | ||
| contains | ||
| desc | ||
| desc_nulls_first | ||
| desc_nulls_last | ||
| dropFields | ||
| endswith | ||
| eqNullSafe | ||
| getField | 这已被弃用但需要实现 | |
| getItem | 这已被弃用但需要实现 | |
| ilike | ||
| isNotNull | ||
| isNull | ||
| isin | ||
| like | ||
| name | ||
| otherwise | ||
| over | 请参阅 Window 以创建窗口规范 | |
| rlike | ||
| startswith | ||
| substr | ||
| when | ||
| withField | ||
eq == |
Rust 不喜欢当您尝试重载 == 并返回除了 |
|
加法 + |
||
减法 - |
||
乘法 * |
||
除法 / |
||
或 | |
||
与 & |
||
异或 ^ |
||
取反 ~ |
函数
只有少数函数被单元测试所覆盖。
| 函数 | API | 注释 |
|---|---|---|
| abs | ||
| acos | ||
| acosh | ||
| add_months | ||
| aggregate | ||
| approxCountDistinct | ||
| approx_count_distinct | ||
| array | ||
| array_append | ||
| array_compact | ||
| array_contains | ||
| array_distinct | ||
| array_except | ||
| array_insert | ||
| array_intersect | ||
| array_join | ||
| array_max | ||
| array_min | ||
| array_position | ||
| array_remove | ||
| array_repeat | ||
| array_sort | ||
| array_union | ||
| arrays_overlap | ||
| arrays_zip | ||
| asc | ||
| asc_nulls_first | ||
| asc_nulls_last | ||
| ascii | ||
| asin | ||
| asinh | ||
| assert_true | ||
| atan | ||
| atan2 | ||
| atanh | ||
| avg | ||
| base64 | ||
| bin | ||
| bit_length | ||
| bitwiseNOT | ||
| bitwise_not | ||
| broadcast | ||
| bround | ||
| bucket | ||
| call_udf | ||
| cbrt | ||
| ceil | ||
| coalesce | ||
| col | ||
| collect_list | ||
| collect_set | ||
| column | ||
| concat | ||
| concat_ws | ||
| conv | ||
| corr | ||
| cos | ||
| cosh | ||
| cot | ||
| count | ||
| countDistinct | ||
| count_distinct | ||
| covar_pop | ||
| covar_samp | ||
| crc32 | ||
| create_map | ||
| csc | ||
| cume_dist | ||
| current_date | ||
| current_timestamp | ||
| date_add | ||
| date_format | ||
| date_sub | ||
| date_trunc | ||
| datediff | ||
| dayofmonth | ||
| dayofweek | ||
| dayofyear | ||
| days | ||
| decode | ||
| degrees | ||
| dense_rank | ||
| desc | ||
| desc_nulls_first | ||
| desc_nulls_last | ||
| element_at | ||
| encode | ||
| exists | ||
| exp | ||
| explode | ||
| explode_outer | ||
| expm1 | ||
| expr | ||
| factorial | ||
| filter | ||
| first | ||
| flatten | ||
| floor | ||
| forall | ||
| format_number | ||
| format_string | ||
| from_csv | ||
| from_json | ||
| from_unixtime | ||
| from_utc_timestamp | ||
| functools | ||
| get | ||
| get_active_spark_context | ||
| get_json_object | ||
| greatest | ||
| grouping | ||
| grouping_id | ||
| has_numpy | ||
| hash | ||
| hex | ||
| hour | ||
| hours | ||
| hypot | ||
| initcap | ||
| inline | ||
| inline_outer | ||
| input_file_name | ||
| inspect | ||
| instr | ||
| isnan | ||
| isnull | ||
| json_tuple | ||
| kurtosis | ||
| lag | ||
| last | ||
| last_day | ||
| lead | ||
| least | ||
| length | ||
| levenshtein | ||
| lit | ||
| localtimestamp | ||
| locate | ||
| log | ||
| log10 | ||
| log1p | ||
| log2 | ||
| lower | ||
| lpad | ||
| ltrim | ||
| make_date | ||
| map_concat | ||
| map_contains_key | ||
| map_entries | ||
| map_filter | ||
| map_from_arrays | ||
| map_from_entries | ||
| map_keys | ||
| map_values | ||
| map_zip_with | ||
| max | ||
| max_by | ||
| md5 | ||
| mean | ||
| median | ||
| min | ||
| min_by | ||
| minute | ||
| mode | ||
| monotonically_increasing_id | ||
| month | ||
| months | ||
| months_between | ||
| nanvl | ||
| next_day | ||
| np | ||
| nth_value | ||
| ntile | ||
| octet_length | ||
| overlay | ||
| overload | ||
| pandas_udf | ||
| percent_rank | ||
| percentile_approx | ||
| pmod | ||
| posexplode | ||
| posexplode_outer | ||
| pow | ||
| product | ||
| 季度 | ||
| 弧度 | ||
| raise_error | ||
| rand | ||
| randn | ||
| rank | ||
| regexp_extract | ||
| regexp_replace | ||
| repeat | ||
| reverse | ||
| rint | ||
| round | ||
| row_number | ||
| rpad | ||
| rtrim | ||
| schema_of_csv | ||
| schema_of_json | ||
| sec | ||
| second | ||
| sentences | ||
| sequence | ||
| session_window | ||
| sha1 | ||
| sha2 | ||
| shiftLeft | ||
| shiftRight | ||
| shiftRightUnsigned | ||
| shiftleft | ||
| shiftright | ||
| shiftrightunsigned | ||
| shuffle | ||
| signum | ||
| sin | ||
| sinh | ||
| size | ||
| skewness | ||
| slice | ||
| sort_array | ||
| soundex | ||
| spark_partition_id | ||
| split | ||
| sqrt | ||
| stddev | ||
| stddev_pop | ||
| stddev_samp | ||
| struct | ||
| substring | ||
| substring_index | ||
| sum | ||
| sumDistinct | ||
| sum_distinct | ||
| sys | ||
| tan | ||
| tanh | ||
| timestamp_seconds | ||
| toDegrees | ||
| toRadians | ||
| to_csv | ||
| to_date | ||
| to_json | ||
| to_str | ||
| to_timestamp | ||
| to_utc_timestamp | ||
| transform | ||
| transform_keys | ||
| transform_values | ||
| translate | ||
| trim | ||
| trunc | ||
| try_remote_functions | ||
| udf | ||
| unbase64 | ||
| unhex | ||
| unix_timestamp | ||
| unwrap_udt | ||
| upper | ||
| var_pop | ||
| var_samp | ||
| variance | ||
| warnings | ||
| weekofyear | ||
| when | ||
| window | ||
| window_time | ||
| xxhash64 | ||
| year | ||
| years | ||
| zip_with |
数据类型
数据类型用于创建模式并将列转换为特定类型
| Column | API | 注释 |
|---|---|---|
| 数组类型 | ||
| 二进制类型 | ||
| 布尔类型 | ||
| 字节类型 | ||
| 日期类型 | ||
| 十进制类型 | ||
| 双精度类型 | ||
| 单精度类型 | ||
| 整数类型 | ||
| 长整数类型 | ||
| 映射类型 | ||
| 空类型 | ||
| 短整数类型 | ||
| 字符串类型 | ||
| 字符类型 | ||
| 可变长度字符串类型 | ||
| 结构字段 | ||
| 结构类型 | ||
| 时间戳类型 | ||
| 无时区时间戳类型 | ||
| 天时间间隔类型 | ||
| 年月间隔类型 |
文字类型
从这些Rust类型创建Spark文字类型。例如,lit(1_i64) 将在模式中表示为 LongType()。
可以创建数组,例如 lit([1_i16,2_i16,3_i16]) 将产生一个 ArrayType(Short),因为切片中的所有值都可以转换为文字类型。
| Spark文字类型 | Rust类型 | 状态 |
|---|---|---|
| 空值 | ||
| 二进制 | &[u8] |
|
| 布尔 | bool |
|
| 字节 | ||
| 短整数 | i16 |
|
| 整数 | i32 |
|
| 长整数 | i64 |
|
| 浮点数 | f32 |
|
| 双精度浮点数 | f64 |
|
| 十进制 | ||
| 字符串 | &str / String |
|
| 日期 | chrono::NaiveDate |
|
| Timestamp | chrono::DateTime<Tz> |
|
| TimestampNtz | chrono::NaiveDateTime |
|
| CalendarInterval | ||
| YearMonthInterval | ||
| DayTimeInterval | ||
| 数组 | slice / Vec |
|
| 映射 | 使用函数 create_map 创建 |
|
| 结构 | 使用函数 struct_col 或 named_struct 创建 |
窗口 & 窗口规范
为了方便使用,建议使用 Window 创建 WindowSpec。
| 窗口 | API | 注释 |
|---|---|---|
| 当前行 | ||
| orderBy | ||
| partitionBy | ||
| rangeBetween | ||
| rowsBetween | ||
| unboundedFollowing | ||
| unboundedPreceding | ||
| WindowSpec.orderBy | ||
| WindowSpec.partitionBy | ||
| WindowSpec.rangeBetween | ||
| WindowSpec.rowsBetween |
Dependencies
~23–43MB
~719K SLoC