#spark #connect #client #apache #dataframe #udf #spark-connect

spark-connect-core

Apache Spark Connect Client for Rust

2个版本

0.0.1-beta.52024年7月3日
0.0.1-beta.42024年5月10日

#1204数据库接口


用于 spark-connect-rs

Apache-2.0

350KB
7K SLoC

Apache Spark Connect Client for Rust

本项目包含Apache Spark的Spark Connect客户端,采用Rust语言编写。

项目当前状态

目前,Rust版本的Spark Connect客户端处于高度实验性阶段,**不建议在任何生产环境中使用**。这是一个“概念验证”,用于确定从Rust与Spark集群交互的方法。

spark-connect-rs旨在提供Spark Connect的入口点,并提供类似的DataFrame API交互。

项目结构

├── core       <- core implementation in Rust
│   └─ spark   <- git submodule for apache/spark
├── rust       <- shim for 'spark-connect-rs' from core
├── examples   <- examples of using different aspects of the crate
├── datasets   <- sample files from the main spark repo

未来的状态将是,在顶层rust文件夹旁边,为其他语言提供额外的绑定。

入门指南

本节解释了如何从0开始在本地上运行Spark Connect Rust。

步骤 1:通过rustup安装rust: https://www.rust-lang.net.cn/tools/install

步骤 2:确保你的机器上安装了cmakeprotobuf

步骤 3:运行以下命令以克隆仓库

git clone https://github.com/sjrusso8/spark-connect-rs.git
git submodule update --init --recursive

cargo build

步骤 4:在本地主机上设置Spark驱动程序,可以通过下载Spark或使用docker进行。

使用本地Spark

  1. 下载Spark发行版 (推荐3.5.1),解压缩包。

  2. 将你的SPARK_HOME环境变量设置为Spark解压到的位置,

  3. 使用以下命令启动Spark Connect服务器(确保使用与你的Spark发行版匹配的包版本)

$ $SPARK_HOME/sbin/start-connect-server.sh --packages "org.apache.spark:spark-connect_2.12:3.5.1,io.delta:delta-spark_2.12:3.0.0" \
      --conf "spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp" \
      --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
      --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

使用docker

  1. 通过在这个仓库中创建的docker-compose.yml启动Spark Connect服务器。这将启动一个在15002端口运行的Spark Connect服务器
$ docker compose up --build -d

步骤 5:在 /examples 下运行仓库中的示例

特性

以下部分概述了Spark Connect实现中尚未支持的一些较大功能。

  • done 通过功能标志 feature = 'tls' 实现TLS身份验证和Databricks兼容性
  • open StreamingQueryManager
  • open UDFs或任何需要闭包的功能(foreach、foreachBatch等)

SparkSession

Spark Session 类型对象及其实现特征

SparkSession API 注释
激活的 open
addArtifact(s) open
addTag done
clearTags done
copyFromLocalToFs open
createDataFrame partial 部分。仅适用于 RecordBatch
getActiveSessions open
getTags done
interruptAll done splitn
interruptOperation done
interruptTag done
newSession open
range done
removeTag done
sql done
stop open
table done
catalog done 目录
client done 仅用于测试的不可稳定开发API
conf done Conf
read done DataFrameReader
readStream done DataStreamReader
streams open
udf open Udf - 可能不可行
udtf open Udtf - 可能不可行
版本 done

SparkSessionBuilder

SparkSessionBuilder API 注释
appName done
config done
master open
remote partial 使用 spark连接字符串 进行验证

StreamingQueryManager

StreamingQueryManager API 注释
awaitAnyTermination open
get open
resetTerminated open
激活的 open

StreamingQuery

StreamingQuery API 注释
awaitTermination done
exception done
explain done
processAllAvailable done
stop done
id done
isActive done
lastProgress done
name done
recentProgress done
runId done
status done

DataStreamReader

DataStreamReader API 注释
csv open
format done
json open
load done
option done
options done
orc open
parquet open
schema done
table open
text open

DataFrameReader

DataFrameReader API 注释
csv open
format done
json open
load done
option done
options done
orc open
parquet open
schema done
table done
text open

DataStreamWriter

启动流作业并返回一个 StreamingQuery 对象以处理流操作。

DataStreamWriter API 注释
foreach
foreachBatch
format done
option done
options done
outputMode done 使用Enum为 OutputMode
partitionBy done
queryName done
start done
toTable done
trigger done 使用Enum为 TriggerMode

StreamingQueryListener

StreamingQueryListener API 注释
onQueryIdle open
onQueryProgress open
onQueryStarted open
onQueryTerminated open

UdfRegistration (可能不可行)

UDFRegistration API 注释
register open
registerJavaFunction open
registerJavaUDAF open

UdtfRegistration (可能不可行)

UDTFRegistration API 注释
register open

RuntimeConfig

RuntimeConfig API 注释
get done
isModifiable done
set done
unset done

目录

目录 API 注释
cacheTable done
clearCache done
createExternalTale open
createTable open
currentCatalog done
currentDatabase done
databaseExists done
dropGlobalTempView done
dropTempView done
functionExists done
getDatabase done
getFunction done
getTable done
isCached done
listCatalogs done
listDatabases done
listFunctions done
listTables done
recoverPartitions done
refreshByPath done
refreshTable done
registerFunction open
setCurrentCatalog done
setCurrentDatabase done
tableExists done
uncacheTable done

DataFrame

Spark DataFrame 类型对象及其实现的特性。

DataFrame API 注释
agg done
alias done
approxQuantile open
cache done
checkpoint open
coalesce done
colRegex done
collect done
columns done
corr done
count done
cov done
createGlobalTempView done
createOrReplaceGlobalTempView done
createOrReplaceTempView done
createTempView done
crossJoin done
crosstab done
cube done
describe done
distinct done
drop done
dropDuplicates done
dropDuplicatesWithinWatermark open 窗口函数目前正在开发中
drop_duplicates done
dropna done
dtypes done
exceptAll done
explain done
fillna open
filter done
first done
foreach open
foreachPartition open
freqItems done
groupBy done
head done
hint done
inputFiles done
intersect done
intersectAll done
isEmpty done
isLocal open
isStreaming done
join done
limit done
localCheckpoint open
mapInPandas open 此确切实现的待定
mapInArrow open 此确切实现的待定
melt done
na open
observe open
offset done
orderBy done
persist done
printSchema done
randomSplit open
registerTempTable open
repartition done
repartitionByRange open
replace open
rollup done
sameSemantics done
sample done
sampleBy open
schema done
select done
selectExpr done
semanticHash done
show done
sort done
sortWithinPartitions done
sparkSession done
stat done
storageLevel done
subtract done
summary open
tail done
take done
to done
toDF done
toJSON partial 不返回一个 RDD,而是一个长的 JSON 格式化的 String
toLocalIterator open
toPandas to_polars & toPolars partial 转换为 polars::frame::DataFrame
new to_datafusion & toDataFusion done 转换为 datafusion::dataframe::DataFrame
transform done
union done
unionAll done
unionByName done
unpersist done
unpivot done
where done 使用 filter 代替,where 是 rust 的关键字
withColumn done
withColumns done
withColumnRenamed open
withColumnsRenamed done
withMetadata open
withWatermark open
write done
writeStream done
writeTo done

DataFrameWriter

Spark Connect 应当尊重格式,只要您的集群支持指定的类型并且具有所需的 jars

DataFrameWriter API 注释
bucketBy done
csv
format done
insertInto done
jdbc
json
mode done
option done
options done
orc
parquet
partitionBy
save done
saveAsTable done
sortBy done
text

DataFrameWriterV2

DataFrameWriterV2 API 注释
append done
create done
createOrReplace done
option done
options done
overwrite done
overwritePartitions done
partitionedBy done
replace done
tableProperty done
using done

Column

Spark Column 类型对象及其实现的特性

Column API 注释
alias done
asc done
asc_nulls_first done
asc_nulls_last done
astype open
between open
cast done
contains done
desc done
desc_nulls_first done
desc_nulls_last done
dropFields done
endswith done
eqNullSafe open
getField open 这已被弃用但需要实现
getItem open 这已被弃用但需要实现
ilike done
isNotNull done
isNull done
isin done
like done
name done
otherwise open
over done 请参阅 Window 以创建窗口规范
rlike done
startswith done
substr done
when open
withField done
eq == done Rust 不喜欢当您尝试重载 == 并返回除了 bool 之外的内容时。目前实现的列等价性如 col('name').eq(col('id'))。不是最好的,但目前可行。
加法 + done
减法 - done
乘法 * done
除法 / done
| done
& done
异或 ^ done
取反 ~ done

函数

只有少数函数被单元测试所覆盖。

函数 API 注释
abs done
acos done
acosh done
add_months done
aggregate open
approxCountDistinct open
approx_count_distinct done
array done
array_append done
array_compact done
array_contains open
array_distinct done
array_except done
array_insert open
array_intersect done
array_join open
array_max done
array_min done
array_position done
array_remove done
array_repeat done
array_sort open
array_union done
arrays_overlap open
arrays_zip done
asc done
asc_nulls_first done
asc_nulls_last done
ascii done
asin done
asinh done
assert_true open
atan done
atan2 done
atanh done
avg done
base64 done
bin done
bit_length done
bitwiseNOT open
bitwise_not done
broadcast open
bround open
bucket open
call_udf open
cbrt done
ceil done
coalesce done
col done
collect_list done
collect_set done
column done
concat done
concat_ws open
conv open
corr open
cos open
cosh open
cot open
count open
countDistinct open
count_distinct open
covar_pop done
covar_samp done
crc32 done
create_map done
csc done
cume_dist done
current_date done
current_timestamp done
date_add done
date_format open
date_sub done
date_trunc open
datediff done
dayofmonth done
dayofweek done
dayofyear done
days done
decode open
degrees done
dense_rank done
desc done
desc_nulls_first done
desc_nulls_last done
element_at open
encode open
exists open
exp done
explode done
explode_outer done
expm1 done
expr done
factorial done
filter open
first open
flatten done
floor done
forall open
format_number open
format_string open
from_csv open
from_json open
from_unixtime open
from_utc_timestamp open
functools open
get open
get_active_spark_context open
get_json_object open
greatest done
grouping done
grouping_id open
has_numpy open
hash done
hex done
hour done
hours done
hypot open
initcap done
inline done
inline_outer done
input_file_name done
inspect open
instr open
isnan done
isnull done
json_tuple open
kurtosis done
lag open
last open
last_day open
lead open
least done
length done
levenshtein open
lit done
localtimestamp done
locate open
log done
log10 done
log1p done
log2 done
lower done
lpad open
ltrim done
make_date open
map_concat done
map_contains_key open
map_entries done
map_filter open
map_from_arrays open
map_from_entries done
map_keys done
map_values done
map_zip_with open
max done
max_by open
md5 done
mean done
median done
min done
min_by open
minute done
mode open
monotonically_increasing_id done
month done
months done
months_between open
nanvl done
next_day open
np open
nth_value open
ntile done
octet_length done
overlay open
overload open
pandas_udf open
percent_rank done
percentile_approx open
pmod open
posexplode done
posexplode_outer done
pow done
product done
季度 done
弧度 done
raise_error open
rand done
randn done
rank done
regexp_extract open
regexp_replace open
repeat open
reverse done
rint done
round done
row_number done
rpad open
rtrim done
schema_of_csv open
schema_of_json open
sec done
second done
sentences open
sequence open
session_window open
sha1 done
sha2 open
shiftLeft open
shiftRight open
shiftRightUnsigned open
shiftleft open
shiftright open
shiftrightunsigned open
shuffle done
signum done
sin done
sinh done
size done
skewness done
slice open
sort_array open
soundex done
spark_partition_id done
split open
sqrt done
stddev done
stddev_pop done
stddev_samp done
struct open
substring open
substring_index open
sum done
sumDistinct open
sum_distinct open
sys open
tan done
tanh done
timestamp_seconds done
toDegrees open
toRadians open
to_csv open
to_date open
to_json open
to_str open
to_timestamp open
to_utc_timestamp open
transform open
transform_keys open
transform_values open
translate open
trim done
trunc open
try_remote_functions open
udf open
unbase64 done
unhex done
unix_timestamp open
unwrap_udt open
upper done
var_pop done
var_samp done
variance done
warnings open
weekofyear done
when open
window open
window_time open
xxhash64 done
year done
years done
zip_with open

数据类型

数据类型用于创建模式并将列转换为特定类型

Column API 注释
数组类型 done
二进制类型 done
布尔类型 done
字节类型 done
日期类型 done
十进制类型 done
双精度类型 done
单精度类型 done
整数类型 done
长整数类型 done
映射类型 done
空类型 done
短整数类型 done
字符串类型 done
字符类型 done
可变长度字符串类型 done
结构字段 done
结构类型 done
时间戳类型 done
无时区时间戳类型 done
天时间间隔类型 done
年月间隔类型 done

文字类型

从这些Rust类型创建Spark文字类型。例如,lit(1_i64) 将在模式中表示为 LongType()

可以创建数组,例如 lit([1_i16,2_i16,3_i16]) 将产生一个 ArrayType(Short),因为切片中的所有值都可以转换为文字类型。

Spark文字类型 Rust类型 状态
空值 open
二进制 &[u8] done
布尔 bool done
字节 open
短整数 i16 done
整数 i32 done
长整数 i64 done
浮点数 f32 done
双精度浮点数 f64 done
十进制 open
字符串 &str / String done
日期 chrono::NaiveDate done
Timestamp chrono::DateTime<Tz> done
TimestampNtz chrono::NaiveDateTime done
CalendarInterval open
YearMonthInterval open
DayTimeInterval open
数组 slice / Vec done
映射 使用函数 create_map 创建 done
结构 使用函数 struct_colnamed_struct 创建 done

窗口 & 窗口规范

为了方便使用,建议使用 Window 创建 WindowSpec

窗口 API 注释
当前行 done
orderBy done
partitionBy done
rangeBetween done
rowsBetween done
unboundedFollowing done
unboundedPreceding done
WindowSpec.orderBy done
WindowSpec.partitionBy done
WindowSpec.rangeBetween done
WindowSpec.rowsBetween done

Dependencies

~23–43MB
~719K SLoC