7 个版本

0.0.1-beta.52024年7月3日
0.0.1-beta.42024年5月10日
0.0.1-beta.32024年4月2日
0.0.1-beta.22024年3月24日
0.0.1-beta2023年9月20日

428数据库接口 中排名

Download history 138/week @ 2024-05-04 25/week @ 2024-05-11 4/week @ 2024-05-18 2/week @ 2024-05-25 117/week @ 2024-06-29 10/week @ 2024-07-06 54/week @ 2024-07-27

54 每月下载量

Apache-2.0

400KB
7K SLoC

Apache Spark Connect Client for Rust

该项目包含了为 Spark Connect 编写的 Apache Spark 的实验性客户端,使用 Rust 语言编写。

项目当前状态

目前,Rust 的 Spark Connect 客户端处于高度实验性,**不应该在生产环境中使用**。这是一个“概念验证”,用于确定从 Rust 与 Spark 集群交互的方法。

spark-connect-rs 旨在提供访问 Spark Connect 的入口点,并提供类似 DataFrame API 的交互。

项目布局

├── core       <- core implementation in Rust
│   └─ spark   <- git submodule for apache/spark
├── rust       <- shim for 'spark-connect-rs' from core
├── examples   <- examples of using different aspects of the crate
├── datasets   <- sample files from the main spark repo

未来的状态将是,在顶层 rust 文件夹旁边,提供其他语言的附加绑定。

入门指南

本节解释了从 0 开始如何在本地运行 Spark Connect Rust。

步骤 1:使用 rustup 安装 rust: https://www.rust-lang.net.cn/tools/install

步骤 2:确保您已经在您的计算机上安装了 cmakeprotobuf

步骤 3:运行以下命令以克隆仓库

git clone https://github.com/sjrusso8/spark-connect-rs.git
git submodule update --init --recursive

cargo build

步骤 4:在本地主机上设置 Spark Driver,通过下载 spark 或使用 docker

使用本地 spark

  1. 下载 Spark 发行版 (推荐 3.5.1),解压缩包。

  2. 将您的 SPARK_HOME 环境变量设置为 spark 解压到的位置,

  3. 使用以下命令启动 Spark Connect 服务器(确保使用与您的 Spark 发行版匹配的包版本)

$ $SPARK_HOME/sbin/start-connect-server.sh --packages "org.apache.spark:spark-connect_2.12:3.5.1,io.delta:delta-spark_2.12:3.0.0" \
      --conf "spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp" \
      --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
      --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

使用 docker

  1. 通过利用此仓库中创建的 docker-compose.yml 来启动 Spark Connect 服务器。这将启动一个在端口 15002 上运行的 Spark Connect 服务器
$ docker compose up --build -d

步骤 5:在 /examples 下运行仓库中的示例

特性

以下部分概述了Spark Connect实现中尚未工作的一些较大功能。

  • done 通过功能标志 feature = 'tls' 实现TLS身份验证和Databricks兼容性
  • open StreamingQueryManager
  • open UDFs或任何接受闭包的功能(foreach、foreachBatch等)

SparkSession

Spark Session 类型对象及其实现的特征

SparkSession API 注释
活跃 open
addArtifact(s) open
addTag done
clearTags done
copyFromLocalToFs open
createDataFrame partial 部分。仅对 RecordBatch 有效
getActiveSessions open
getTags done
interruptAll done splitn
interruptOperation done
interruptTag done
newSession open
range done
removeTag done
sql done
stop open
table done
catalog done 目录
客户端 done 仅用于测试的不稳定开发API
conf done Conf
read done DataFrameReader
readStream done DataStreamReader
streams open
udf open Udf - 可能不可用
udtf open Udtf - 可能不可用
version done

SparkSessionBuilder

SparkSessionBuilder API 注释
appName done
config done
master open
remote partial 使用 spark连接字符串 进行验证

StreamingQueryManager

StreamingQueryManager API 注释
awaitAnyTermination open
get open
resetTerminated open
活跃 open

StreamingQuery

StreamingQuery API 注释
awaitTermination done
exception done
explain done
processAllAvailable done
stop done
id done
isActive done
lastProgress done
name done
recentProgress done
runId done
status done

DataStreamReader

DataStreamReader API 注释
csv open
format done
json open
load done
option done
options done
orc open
parquet open
schema done
table open
text open

DataFrameReader

DataFrameReader API 注释
csv open
format done
json open
load done
option done
options done
orc open
parquet open
schema done
table done
text open

DataStreamWriter

启动流作业并返回一个 StreamingQuery 对象来处理流操作。

DataStreamWriter API 注释
foreach
foreachBatch
format done
option done
options done
outputMode done 使用枚举 OutputMode
partitionBy done
queryName done
start done
toTable done
trigger done 使用枚举 TriggerMode

StreamingQueryListener

StreamingQueryListener API 注释
onQueryIdle open
onQueryProgress open
onQueryStarted open
onQueryTerminated open

UdfRegistration (可能不可用)

UDFRegistration API 注释
register open
registerJavaFunction open
registerJavaUDAF open

UdtfRegistration (可能不可用)

UDTFRegistration API 注释
register open

RuntimeConfig

RuntimeConfig API 注释
get done
isModifiable done
set done
unset done

目录

目录 API 注释
cacheTable done
clearCache done
createExternalTale open
createTable open
currentCatalog done
currentDatabase done
databaseExists done
dropGlobalTempView done
dropTempView done
functionExists done
getDatabase done
getFunction done
getTable done
isCached done
listCatalogs done
listDatabases done
listFunctions done
listTables done
recoverPartitions done
refreshByPath done
refreshTable done
registerFunction open
setCurrentCatalog done
setCurrentDatabase done
tableExists done
uncacheTable done

DataFrame

Spark DataFrame 类型对象及其实现特性。

DataFrame API 注释
agg done
alias done
approxQuantile open
cache done
checkpoint open
coalesce done
colRegex done
collect done
columns done
corr done
count done
cov done
createGlobalTempView done
createOrReplaceGlobalTempView done
createOrReplaceTempView done
createTempView done
crossJoin done
crosstab done
cube done
describe done
distinct done
drop done
dropDuplicates done
dropDuplicatesWithinWatermark open 窗口函数目前正在开发中
drop_duplicates done
dropna done
dtypes done
exceptAll done
explain done
fillna open
filter done
first done
foreach open
foreachPartition open
freqItems done
groupBy done
head done
hint done
inputFiles done
intersect done
intersectAll done
isEmpty done
isLocal open
isStreaming done
join done
limit done
localCheckpoint open
mapInPandas open 关于此特定实现的详细信息待定
mapInArrow open 关于此特定实现的详细信息待定
melt done
na open
observe open
offset done
orderBy done
persist done
printSchema done
randomSplit open
registerTempTable open
repartition done
repartitionByRange open
replace open
rollup done
sameSemantics done
sample done
sampleBy open
schema done
select done
selectExpr done
semanticHash done
show done
sort done
sortWithinPartitions done
sparkSession done
stat done
storageLevel done
subtract done
summary open
tail done
take done
to done
toDF done
toJSON partial 不返回一个 RDD,而是一个长 JSON 格式的 String
toLocalIterator open
toPandas to_polars & toPolars partial 转换为 polars::frame::DataFrame
new to_datafusion & toDataFusion done 转换为 datafusion::dataframe::DataFrame
transform done
union done
unionAll done
unionByName done
unpersist done
unpivot done
where done 使用 filter 替代,where 是 rust 的关键字
withColumn done
withColumns done
withColumnRenamed open
withColumnsRenamed done
withMetadata open
withWatermark open
write done
writeStream done
writeTo done

DataFrameWriter

Spark Connect 尊重格式,只要您的集群支持指定的类型并且具有所需的 jars

DataFrameWriter API 注释
bucketBy done
csv
format done
insertInto done
jdbc
json
mode done
option done
options done
orc
parquet
partitionBy
save done
saveAsTable done
sortBy done
text

DataFrameWriterV2

DataFrameWriterV2 API 注释
append done
create done
createOrReplace done
option done
options done
overwrite done
overwritePartitions done
partitionedBy done
replace done
tableProperty done
using done

Column

Spark Column 类型对象及其实现特性

Column API 注释
alias done
asc done
asc_nulls_first done
asc_nulls_last done
astype open
between open
cast done
contains done
desc done
desc_nulls_first done
desc_nulls_last done
dropFields done
endswith done
eqNullSafe open
getField open 这已被弃用但需要实现
getItem open 这已被弃用但需要实现
ilike done
isNotNull done
isNull done
isin done
like done
name done
otherwise open
over done 请参阅 窗口 以创建窗口规范
rlike done
startswith done
substr done
when open
withField done
eq == done Rust不喜欢当您尝试重载==并返回除了bool之外的内容时。目前实现的列相等性如下:col('name').eq(col('id'))。不是最好的,但现在能工作。
加法 + done
减法 - done
乘法 * done
除法 / done
| done
& done
异或 ^ done
取反 ~ done

函数

只有少数函数被单元测试覆盖。

函数 API 注释
绝对值 done
反余弦 done
反双曲余弦 done
添加月份 done
聚合 open
approxCountDistinct open
approx_count_distinct done
数组 done
array_append done
array_compact done
array_contains open
array_distinct done
array_except done
array_insert open
array_intersect done
array_join open
array_max done
array_min done
array_position done
array_remove done
array_repeat done
array_sort open
array_union done
arrays_overlap open
arrays_zip done
asc done
asc_nulls_first done
asc_nulls_last done
ascii done
反正弦 done
反双曲正弦 done
assert_true open
反正切 done
atan2 done
反双曲正切 done
平均值 done
base64 done
bin done
位长度 done
bitwiseNOT open
bitwise_not done
broadcast open
bround open
bucket open
call_udf open
cbrt done
ceil done
coalesce done
col done
collect_list done
collect_set done
column done
concat done
concat_ws open
conv open
corr open
cos open
cosh open
cot open
count open
countDistinct open
count_distinct open
covar_pop done
covar_samp done
crc32 done
create_map done
csc done
cume_dist done
current_date done
current_timestamp done
date_add done
date_format open
date_sub done
date_trunc open
datediff done
dayofmonth done
dayofweek done
dayofyear done
days done
decode open
degrees done
dense_rank done
desc done
desc_nulls_first done
desc_nulls_last done
element_at open
encode open
exists open
exp done
explode done
explode_outer done
expm1 done
expr done
阶乘 done
filter open
first open
flatten done
floor done
forall open
format_number open
format_string open
from_csv open
from_json open
from_unixtime open
from_utc_timestamp open
functools open
get open
get_active_spark_context open
get_json_object open
greatest done
grouping done
grouping_id open
has_numpy open
hash done
hex done
hour done
hours done
hypot open
initcap done
inline done
inline_outer done
input_file_name done
inspect open
instr open
isnan done
isnull done
json_tuple open
kurtosis done
lag open
last open
last_day open
lead open
least done
length done
levenshtein open
lit done
localtimestamp done
locate open
log done
log10 done
log1p done
log2 done
lower done
lpad open
ltrim done
make_date open
map_concat done
map_contains_key open
map_entries done
map_filter open
map_from_arrays open
map_from_entries done
map_keys done
map_values done
map_zip_with open
max done
max_by open
md5 done
mean done
median done
min done
min_by open
minute done
mode open
monotonically_increasing_id done
month done
months done
months_between open
nanvl done
next_day open
np open
nth_value open
ntile done
octet_length done
overlay open
overload open
pandas_udf open
percent_rank done
percentile_approx open
pmod open
posexplode done
posexplode_outer done
pow done
product done
quarter done
radians done
raise_error open
rand done
randn done
rank done
regexp_extract open
regexp_replace open
repeat open
reverse done
rint done
round done
row_number done
rpad open
rtrim done
schema_of_csv open
schema_of_json open
sec done
second done
sentences open
sequence open
session_window open
sha1 done
sha2 open
shiftLeft open
shiftRight open
shiftRightUnsigned open
shiftleft open
shiftright open
shiftrightunsigned open
shuffle done
signum done
sin done
sinh done
size done
skewness done
slice open
sort_array open
soundex done
spark_partition_id done
split open
sqrt done
stddev done
stddev_pop done
stddev_samp done
struct open
substring open
substring_index open
sum done
sumDistinct open
sum_distinct open
sys open
tan done
tanh done
timestamp_seconds done
toDegrees open
toRadians open
to_csv open
to_date open
to_json open
to_str open
to_timestamp open
to_utc_timestamp open
transform open
transform_keys open
transform_values open
translate open
trim done
trunc open
try_remote_functions open
udf open
unbase64 done
unhex done
unix_timestamp open
unwrap_udt open
upper done
var_pop done
var_samp done
variance done
warnings open
weekofyear done
when open
window open
window_time open
xxhash64 done
year done
years done
zip_with open

数据类型

数据类型用于创建模式和将列转换为特定类型

Column API 注释
数组类型 done
二进制类型 done
布尔类型 done
字节类型 done
日期类型 done
十进制类型 done
双精度浮点类型 done
浮点类型 done
整数类型 done
长整型 done
映射类型 done
空类型 done
短整型 done
字符串类型 done
字符类型 done
可变长字符串类型 done
结构字段 done
结构类型 done
时间戳类型 done
无时区时间戳类型 done
天时间间隔类型 done
年月间隔类型 done

文字类型

从这些Rust类型创建Spark文字类型。例如,`lit(1_i64)`会在模式中表示为`LongType()`。

数组可以创建为`lit([1_i16, 2_i16, 3_i16])`,这会产生一个`ArrayType(Short)`,因为切片中的所有值都可以转换为文字类型。

Spark文字类型 Rust类型 状态
open
二进制 &[u8] done
布尔 bool done
字节 open
短整型 i16 done
整数 i32 done
长整型 i64 done
浮点 f32 done
双精度浮点 f64 done
十进制 open
字符串 &str / String done
日期 chrono::NaiveDate done
Timestamp chrono::DateTime<Tz> done
TimestampNtz chrono::NaiveDateTime done
CalendarInterval open
YearMonthInterval open
DayTimeInterval open
数组 slice / Vec done
映射 使用函数`create_map`创建 done
结构 使用函数`struct_col`或`named_struct`创建 done

窗口 & 窗口规范

为了便于使用,建议使用 Window 创建 WindowSpec

Window API 注释
currentRow done
orderBy done
partitionBy done
rangeBetween done
rowsBetween done
unboundedFollowing done
unboundedPreceding done
WindowSpec.orderBy done
WindowSpec.partitionBy done
WindowSpec.rangeBetween done
WindowSpec.rowsBetween done

依赖关系

~23–35MB
~568K SLoC