18 个版本 (10 个重大变更)
0.12.0 | 2024年2月7日 |
---|---|
0.11.0 | 2023年2月28日 |
0.10.0 | 2022年11月21日 |
0.7.0 | 2022年5月16日 |
0.1.2 | 2019年7月9日 |
#1325 在 数据库接口
在 3 crate 中使用
620KB
13K SLoC
Ballista:Apache Arrow DataFusion 的分布式调度器
Ballista 是一个主要用 Rust 实现的分布式计算平台,由 Apache Arrow 和 DataFusion 驱动。它基于一种架构,允许其他编程语言(如 Python、C++ 和 Java)作为一等公民支持,而无需为序列化成本付出代价。
Ballista 的基础技术包括
- Apache Arrow 内存模型和计算内核,用于高效处理数据。
- Apache Arrow Flight 协议,用于进程间的数据高效传输。
- Google Protocol Buffers 用于序列化查询计划。
- Docker 用于打包执行器以及用户定义的代码。
Ballista 可以作为独立的集群部署,也支持 Kubernetes。在任一情况下,都可以将调度器配置为使用 etcd 作为后端存储,以提供冗余,以防调度器失败。
Rust 版本兼容性
此 crate 使用最新稳定的 Rust 版本进行测试。我们目前不测试 Rust 编译器的其他旧版本。
启动集群
有多种方法可以启动 Ballista 集群,包括对 Docker 和 Kubernetes 的支持。有关完整文档,请参阅 Ballista 用户指南 中的部署部分
一种简单的方法是使用 cargo 安装调度器和执行器 crate 以进行测试。
cargo install --locked ballista-scheduler
cargo install --locked ballista-executor
安装这些 crate 后,现在可以启动调度器进程。
RUST_LOG=info ballista-scheduler
默认情况下,调度器将绑定到端口 50050。
接下来,在新的终端会话中启动具有指定并发级别的执行器进程。
RUST_LOG=info ballista-executor -c 4
默认情况下,执行器将绑定到端口 50051。可以通过手动指定绑定端口来启动更多执行器。例如
RUST_LOG=info ballista-executor --bind-port 50052 -c 4
执行查询
Ballista提供了BallistaContext
作为创建查询的起点。可以通过调用read_csv
、read_parquet
和sql
方法来创建DataFrame。
要构建一个简单的Ballista示例,请将以下依赖项添加到您的Cargo.toml
文件中
[dependencies]
ballista = "0.11"
datafusion = "28.0.0"
tokio = "1.0"
以下示例在一个来自纽约出租车和豪华轿车委员会数据集的Parquet文件(yellow_tripdata_2022-01.parquet
)上运行一个简单的聚合SQL查询。在运行示例之前,请下载该文件并将其添加到testdata
文件夹中。
use ballista::prelude::*;
use datafusion::prelude::{col, min, max, avg, sum, ParquetReadOptions};
use datafusion::arrow::util::pretty;
use datafusion::prelude::CsvReadOptions;
#[tokio::main]
async fn main() -> Result<()> {
// create configuration
let config = BallistaConfig::builder()
.set("ballista.shuffle.partitions", "4")
.build()?;
// connect to Ballista scheduler
let ctx = BallistaContext::remote("localhost", 50050, &config).await?;
let filename = "testdata/yellow_tripdata_2022-01.parquet";
// define the query using the DataFrame trait
let df = ctx
.read_parquet(filename, ParquetReadOptions::default())
.await?
.select_columns(&["passenger_count", "fare_amount"])?
.aggregate(vec![col("passenger_count")], vec![min(col("fare_amount")), max(col("fare_amount")), avg(col("fare_amount")), sum(col("fare_amount"))])?
.sort(vec![col("passenger_count").sort(true,true)])?;
// this is equivalent to the following SQL
// SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), AVG(fare_amount), SUM(fare_amount)
// FROM tripdata
// GROUP BY passenger_count
// ORDER BY passenger_count
// print the results
df.show().await?;
Ok(())
}
输出应类似于以下表格。
+-----------------+--------------------------+--------------------------+--------------------------+--------------------------+
| passenger_count | MIN(?table?.fare_amount) | MAX(?table?.fare_amount) | AVG(?table?.fare_amount) | SUM(?table?.fare_amount) |
+-----------------+--------------------------+--------------------------+--------------------------+--------------------------+
| | -159.5 | 285.2 | 17.60577640099004 | 1258865.829999991 |
| 0 | -115 | 500 | 11.794859107585335 | 614052.1600000001 |
| 1 | -480 | 401092.32 | 12.61028389876563 | 22623542.879999973 |
| 2 | -250 | 640.5 | 13.79501011585127 | 4732047.139999998 |
| 3 | -130 | 480 | 13.473184817311106 | 1139427.2400000002 |
| 4 | -250 | 464 | 14.232650547832726 | 502711.4499999997 |
| 5 | -52 | 668 | 12.160378472086954 | 624289.51 |
| 6 | -52 | 252.5 | 12.576583325529857 | 402916 |
| 7 | 7 | 79 | 61.77777777777778 | 556 |
| 8 | 8.3 | 115 | 79.9125 | 639.3 |
| 9 | 9.3 | 96.5 | 65.26666666666667 | 195.8 |
+-----------------+--------------------------+--------------------------+--------------------------+--------------------------+
更多示例可以在arrow-ballista存储库中找到。
依赖项
~69MB
~1.5M SLoC