2个版本
0.0.1-beta.5 | 2024年7月3日 |
---|---|
0.0.1-beta.4 | 2024年5月10日 |
#1204 在 数据库接口 中
350KB
7K SLoC
Apache Spark Connect Client for Rust
本项目包含Apache Spark的Spark Connect客户端,采用Rust语言编写。
项目当前状态
目前,Rust版本的Spark Connect客户端处于高度实验性阶段,**不建议在任何生产环境中使用**。这是一个“概念验证”,用于确定从Rust与Spark集群交互的方法。
spark-connect-rs
旨在提供Spark Connect的入口点,并提供类似的DataFrame API交互。
项目结构
├── core <- core implementation in Rust
│ └─ spark <- git submodule for apache/spark
├── rust <- shim for 'spark-connect-rs' from core
├── examples <- examples of using different aspects of the crate
├── datasets <- sample files from the main spark repo
未来的状态将是,在顶层rust
文件夹旁边,为其他语言提供额外的绑定。
入门指南
本节解释了如何从0开始在本地上运行Spark Connect Rust。
步骤 1:通过rustup安装rust: https://rust-lang.net.cn/tools/install
步骤 3:运行以下命令以克隆仓库
git clone https://github.com/sjrusso8/spark-connect-rs.git
git submodule update --init --recursive
cargo build
步骤 4:在本地主机上设置Spark驱动程序,可以通过下载Spark或使用docker进行。
使用本地Spark
-
下载Spark发行版 (推荐3.5.1),解压缩包。
-
将你的
SPARK_HOME
环境变量设置为Spark解压到的位置, -
使用以下命令启动Spark Connect服务器(确保使用与你的Spark发行版匹配的包版本)
$ $SPARK_HOME/sbin/start-connect-server.sh --packages "org.apache.spark:spark-connect_2.12:3.5.1,io.delta:delta-spark_2.12:3.0.0" \
--conf "spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp" \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
使用docker
- 通过在这个仓库中创建的
docker-compose.yml
启动Spark Connect服务器。这将启动一个在15002端口运行的Spark Connect服务器
$ docker compose up --build -d
步骤 5:在 /examples 下运行仓库中的示例
特性
以下部分概述了Spark Connect实现中尚未支持的一些较大功能。
- 通过功能标志
feature = 'tls'
实现TLS身份验证和Databricks兼容性 - StreamingQueryManager
- UDFs或任何需要闭包的功能(foreach、foreachBatch等)
SparkSession
Spark Session 类型对象及其实现特征
SparkSession | API | 注释 |
---|---|---|
激活的 | ||
addArtifact(s) | ||
addTag | ||
clearTags | ||
copyFromLocalToFs | ||
createDataFrame | 部分。仅适用于 RecordBatch |
|
getActiveSessions | ||
getTags | ||
interruptAll | splitn | |
interruptOperation | ||
interruptTag | ||
newSession | ||
range | ||
removeTag | ||
sql | ||
stop | ||
table | ||
catalog | 目录 | |
client | 仅用于测试的不可稳定开发API | |
conf | Conf | |
read | DataFrameReader | |
readStream | DataStreamReader | |
streams | 流 | |
udf | Udf - 可能不可行 | |
udtf | Udtf - 可能不可行 | |
版本 |
SparkSessionBuilder
SparkSessionBuilder | API | 注释 |
---|---|---|
appName | ||
config | ||
master | ||
remote | 使用 spark连接字符串 进行验证 |
StreamingQueryManager
StreamingQueryManager | API | 注释 |
---|---|---|
awaitAnyTermination | ||
get | ||
resetTerminated | ||
激活的 |
StreamingQuery
StreamingQuery | API | 注释 |
---|---|---|
awaitTermination | ||
exception | ||
explain | ||
processAllAvailable | ||
stop | ||
id | ||
isActive | ||
lastProgress | ||
name | ||
recentProgress | ||
runId | ||
status |
DataStreamReader
DataStreamReader | API | 注释 |
---|---|---|
csv | ||
format | ||
json | ||
load | ||
option | ||
options | ||
orc | ||
parquet | ||
schema | ||
table | ||
text |
DataFrameReader
DataFrameReader | API | 注释 |
---|---|---|
csv | ||
format | ||
json | ||
load | ||
option | ||
options | ||
orc | ||
parquet | ||
schema | ||
table | ||
text |
DataStreamWriter
启动流作业并返回一个 StreamingQuery
对象以处理流操作。
DataStreamWriter | API | 注释 |
---|---|---|
foreach | ||
foreachBatch | ||
format | ||
option | ||
options | ||
outputMode | 使用Enum为 OutputMode |
|
partitionBy | ||
queryName | ||
start | ||
toTable | ||
trigger | 使用Enum为 TriggerMode |
StreamingQueryListener
StreamingQueryListener | API | 注释 |
---|---|---|
onQueryIdle | ||
onQueryProgress | ||
onQueryStarted | ||
onQueryTerminated |
UdfRegistration (可能不可行)
UDFRegistration | API | 注释 |
---|---|---|
register | ||
registerJavaFunction | ||
registerJavaUDAF |
UdtfRegistration (可能不可行)
UDTFRegistration | API | 注释 |
---|---|---|
register |
RuntimeConfig
RuntimeConfig | API | 注释 |
---|---|---|
get | ||
isModifiable | ||
set | ||
unset |
目录
目录 | API | 注释 |
---|---|---|
cacheTable | ||
clearCache | ||
createExternalTale | ||
createTable | ||
currentCatalog | ||
currentDatabase | ||
databaseExists | ||
dropGlobalTempView | ||
dropTempView | ||
functionExists | ||
getDatabase | ||
getFunction | ||
getTable | ||
isCached | ||
listCatalogs | ||
listDatabases | ||
listFunctions | ||
listTables | ||
recoverPartitions | ||
refreshByPath | ||
refreshTable | ||
registerFunction | ||
setCurrentCatalog | ||
setCurrentDatabase | ||
tableExists | ||
uncacheTable |
DataFrame
Spark DataFrame 类型对象及其实现的特性。
DataFrame | API | 注释 |
---|---|---|
agg | ||
alias | ||
approxQuantile | ||
cache | ||
checkpoint | ||
coalesce | ||
colRegex | ||
collect | ||
columns | ||
corr | ||
count | ||
cov | ||
createGlobalTempView | ||
createOrReplaceGlobalTempView | ||
createOrReplaceTempView | ||
createTempView | ||
crossJoin | ||
crosstab | ||
cube | ||
describe | ||
distinct | ||
drop | ||
dropDuplicates | ||
dropDuplicatesWithinWatermark | 窗口函数目前正在开发中 | |
drop_duplicates | ||
dropna | ||
dtypes | ||
exceptAll | ||
explain | ||
fillna | ||
filter | ||
first | ||
foreach | ||
foreachPartition | ||
freqItems | ||
groupBy | ||
head | ||
hint | ||
inputFiles | ||
intersect | ||
intersectAll | ||
isEmpty | ||
isLocal | ||
isStreaming | ||
join | ||
limit | ||
localCheckpoint | ||
mapInPandas | 此确切实现的待定 | |
mapInArrow | 此确切实现的待定 | |
melt | ||
na | ||
observe | ||
offset | ||
orderBy | ||
persist | ||
printSchema | ||
randomSplit | ||
registerTempTable | ||
repartition | ||
repartitionByRange | ||
replace | ||
rollup | ||
sameSemantics | ||
sample | ||
sampleBy | ||
schema | ||
select | ||
selectExpr | ||
semanticHash | ||
show | ||
sort | ||
sortWithinPartitions | ||
sparkSession | ||
stat | ||
storageLevel | ||
subtract | ||
summary | ||
tail | ||
take | ||
to | ||
toDF | ||
toJSON | 不返回一个 RDD ,而是一个长的 JSON 格式化的 String |
|
toLocalIterator | ||
转换为 polars::frame::DataFrame |
||
new to_datafusion & toDataFusion | 转换为 datafusion::dataframe::DataFrame |
|
transform | ||
union | ||
unionAll | ||
unionByName | ||
unpersist | ||
unpivot | ||
where | 使用 filter 代替,where 是 rust 的关键字 |
|
withColumn | ||
withColumns | ||
withColumnRenamed | ||
withColumnsRenamed | ||
withMetadata | ||
withWatermark | ||
write | ||
writeStream | ||
writeTo |
DataFrameWriter
Spark Connect 应当尊重格式,只要您的集群支持指定的类型并且具有所需的 jars
DataFrameWriter | API | 注释 |
---|---|---|
bucketBy | ||
csv | ||
format | ||
insertInto | ||
jdbc | ||
json | ||
mode | ||
option | ||
options | ||
orc | ||
parquet | ||
partitionBy | ||
save | ||
saveAsTable | ||
sortBy | ||
text |
DataFrameWriterV2
DataFrameWriterV2 | API | 注释 |
---|---|---|
append | ||
create | ||
createOrReplace | ||
option | ||
options | ||
overwrite | ||
overwritePartitions | ||
partitionedBy | ||
replace | ||
tableProperty | ||
using |
Column
Spark Column 类型对象及其实现的特性
Column | API | 注释 |
---|---|---|
alias | ||
asc | ||
asc_nulls_first | ||
asc_nulls_last | ||
astype | ||
between | ||
cast | ||
contains | ||
desc | ||
desc_nulls_first | ||
desc_nulls_last | ||
dropFields | ||
endswith | ||
eqNullSafe | ||
getField | 这已被弃用但需要实现 | |
getItem | 这已被弃用但需要实现 | |
ilike | ||
isNotNull | ||
isNull | ||
isin | ||
like | ||
name | ||
otherwise | ||
over | 请参阅 Window 以创建窗口规范 | |
rlike | ||
startswith | ||
substr | ||
when | ||
withField | ||
eq == |
Rust 不喜欢当您尝试重载 == 并返回除了 |
|
加法 + |
||
减法 - |
||
乘法 * |
||
除法 / |
||
或 | |
||
与 & |
||
异或 ^ |
||
取反 ~ |
函数
只有少数函数被单元测试所覆盖。
函数 | API | 注释 |
---|---|---|
abs | ||
acos | ||
acosh | ||
add_months | ||
aggregate | ||
approxCountDistinct | ||
approx_count_distinct | ||
array | ||
array_append | ||
array_compact | ||
array_contains | ||
array_distinct | ||
array_except | ||
array_insert | ||
array_intersect | ||
array_join | ||
array_max | ||
array_min | ||
array_position | ||
array_remove | ||
array_repeat | ||
array_sort | ||
array_union | ||
arrays_overlap | ||
arrays_zip | ||
asc | ||
asc_nulls_first | ||
asc_nulls_last | ||
ascii | ||
asin | ||
asinh | ||
assert_true | ||
atan | ||
atan2 | ||
atanh | ||
avg | ||
base64 | ||
bin | ||
bit_length | ||
bitwiseNOT | ||
bitwise_not | ||
broadcast | ||
bround | ||
bucket | ||
call_udf | ||
cbrt | ||
ceil | ||
coalesce | ||
col | ||
collect_list | ||
collect_set | ||
column | ||
concat | ||
concat_ws | ||
conv | ||
corr | ||
cos | ||
cosh | ||
cot | ||
count | ||
countDistinct | ||
count_distinct | ||
covar_pop | ||
covar_samp | ||
crc32 | ||
create_map | ||
csc | ||
cume_dist | ||
current_date | ||
current_timestamp | ||
date_add | ||
date_format | ||
date_sub | ||
date_trunc | ||
datediff | ||
dayofmonth | ||
dayofweek | ||
dayofyear | ||
days | ||
decode | ||
degrees | ||
dense_rank | ||
desc | ||
desc_nulls_first | ||
desc_nulls_last | ||
element_at | ||
encode | ||
exists | ||
exp | ||
explode | ||
explode_outer | ||
expm1 | ||
expr | ||
factorial | ||
filter | ||
first | ||
flatten | ||
floor | ||
forall | ||
format_number | ||
format_string | ||
from_csv | ||
from_json | ||
from_unixtime | ||
from_utc_timestamp | ||
functools | ||
get | ||
get_active_spark_context | ||
get_json_object | ||
greatest | ||
grouping | ||
grouping_id | ||
has_numpy | ||
hash | ||
hex | ||
hour | ||
hours | ||
hypot | ||
initcap | ||
inline | ||
inline_outer | ||
input_file_name | ||
inspect | ||
instr | ||
isnan | ||
isnull | ||
json_tuple | ||
kurtosis | ||
lag | ||
last | ||
last_day | ||
lead | ||
least | ||
length | ||
levenshtein | ||
lit | ||
localtimestamp | ||
locate | ||
log | ||
log10 | ||
log1p | ||
log2 | ||
lower | ||
lpad | ||
ltrim | ||
make_date | ||
map_concat | ||
map_contains_key | ||
map_entries | ||
map_filter | ||
map_from_arrays | ||
map_from_entries | ||
map_keys | ||
map_values | ||
map_zip_with | ||
max | ||
max_by | ||
md5 | ||
mean | ||
median | ||
min | ||
min_by | ||
minute | ||
mode | ||
monotonically_increasing_id | ||
month | ||
months | ||
months_between | ||
nanvl | ||
next_day | ||
np | ||
nth_value | ||
ntile | ||
octet_length | ||
overlay | ||
overload | ||
pandas_udf | ||
percent_rank | ||
percentile_approx | ||
pmod | ||
posexplode | ||
posexplode_outer | ||
pow | ||
product | ||
季度 | ||
弧度 | ||
raise_error | ||
rand | ||
randn | ||
rank | ||
regexp_extract | ||
regexp_replace | ||
repeat | ||
reverse | ||
rint | ||
round | ||
row_number | ||
rpad | ||
rtrim | ||
schema_of_csv | ||
schema_of_json | ||
sec | ||
second | ||
sentences | ||
sequence | ||
session_window | ||
sha1 | ||
sha2 | ||
shiftLeft | ||
shiftRight | ||
shiftRightUnsigned | ||
shiftleft | ||
shiftright | ||
shiftrightunsigned | ||
shuffle | ||
signum | ||
sin | ||
sinh | ||
size | ||
skewness | ||
slice | ||
sort_array | ||
soundex | ||
spark_partition_id | ||
split | ||
sqrt | ||
stddev | ||
stddev_pop | ||
stddev_samp | ||
struct | ||
substring | ||
substring_index | ||
sum | ||
sumDistinct | ||
sum_distinct | ||
sys | ||
tan | ||
tanh | ||
timestamp_seconds | ||
toDegrees | ||
toRadians | ||
to_csv | ||
to_date | ||
to_json | ||
to_str | ||
to_timestamp | ||
to_utc_timestamp | ||
transform | ||
transform_keys | ||
transform_values | ||
translate | ||
trim | ||
trunc | ||
try_remote_functions | ||
udf | ||
unbase64 | ||
unhex | ||
unix_timestamp | ||
unwrap_udt | ||
upper | ||
var_pop | ||
var_samp | ||
variance | ||
warnings | ||
weekofyear | ||
when | ||
window | ||
window_time | ||
xxhash64 | ||
year | ||
years | ||
zip_with |
数据类型
数据类型用于创建模式并将列转换为特定类型
Column | API | 注释 |
---|---|---|
数组类型 | ||
二进制类型 | ||
布尔类型 | ||
字节类型 | ||
日期类型 | ||
十进制类型 | ||
双精度类型 | ||
单精度类型 | ||
整数类型 | ||
长整数类型 | ||
映射类型 | ||
空类型 | ||
短整数类型 | ||
字符串类型 | ||
字符类型 | ||
可变长度字符串类型 | ||
结构字段 | ||
结构类型 | ||
时间戳类型 | ||
无时区时间戳类型 | ||
天时间间隔类型 | ||
年月间隔类型 |
文字类型
从这些Rust类型创建Spark文字类型。例如,lit(1_i64)
将在模式中表示为 LongType()
。
可以创建数组,例如 lit([1_i16,2_i16,3_i16])
将产生一个 ArrayType(Short)
,因为切片中的所有值都可以转换为文字类型。
Spark文字类型 | Rust类型 | 状态 |
---|---|---|
空值 | ||
二进制 | &[u8] |
|
布尔 | bool |
|
字节 | ||
短整数 | i16 |
|
整数 | i32 |
|
长整数 | i64 |
|
浮点数 | f32 |
|
双精度浮点数 | f64 |
|
十进制 | ||
字符串 | &str / String |
|
日期 | chrono::NaiveDate |
|
Timestamp | chrono::DateTime<Tz> |
|
TimestampNtz | chrono::NaiveDateTime |
|
CalendarInterval | ||
YearMonthInterval | ||
DayTimeInterval | ||
数组 | slice / Vec |
|
映射 | 使用函数 create_map 创建 |
|
结构 | 使用函数 struct_col 或 named_struct 创建 |
窗口 & 窗口规范
为了方便使用,建议使用 Window
创建 WindowSpec
。
窗口 | API | 注释 |
---|---|---|
当前行 | ||
orderBy | ||
partitionBy | ||
rangeBetween | ||
rowsBetween | ||
unboundedFollowing | ||
unboundedPreceding | ||
WindowSpec.orderBy | ||
WindowSpec.partitionBy | ||
WindowSpec.rangeBetween | ||
WindowSpec.rowsBetween |
Dependencies
~23–43MB
~719K SLoC