18 个版本 (10 个重大变更)

0.12.0 2024年2月7日
0.11.0 2023年2月28日
0.10.0 2022年11月21日
0.7.0 2022年5月16日
0.1.2 2019年7月9日

#1325数据库接口


3 crate 中使用

Apache-2.0

620KB
13K SLoC

Ballista:Apache Arrow DataFusion 的分布式调度器

Ballista 是一个主要用 Rust 实现的分布式计算平台,由 Apache Arrow 和 DataFusion 驱动。它基于一种架构,允许其他编程语言(如 Python、C++ 和 Java)作为一等公民支持,而无需为序列化成本付出代价。

Ballista 的基础技术包括

Ballista 可以作为独立的集群部署,也支持 Kubernetes。在任一情况下,都可以将调度器配置为使用 etcd 作为后端存储,以提供冗余,以防调度器失败。

Rust 版本兼容性

此 crate 使用最新稳定的 Rust 版本进行测试。我们目前不测试 Rust 编译器的其他旧版本。

启动集群

有多种方法可以启动 Ballista 集群,包括对 Docker 和 Kubernetes 的支持。有关完整文档,请参阅 Ballista 用户指南 中的部署部分

一种简单的方法是使用 cargo 安装调度器和执行器 crate 以进行测试。

cargo install --locked ballista-scheduler
cargo install --locked ballista-executor

安装这些 crate 后,现在可以启动调度器进程。

RUST_LOG=info ballista-scheduler

默认情况下,调度器将绑定到端口 50050。

接下来,在新的终端会话中启动具有指定并发级别的执行器进程。

RUST_LOG=info ballista-executor -c 4

默认情况下,执行器将绑定到端口 50051。可以通过手动指定绑定端口来启动更多执行器。例如

RUST_LOG=info ballista-executor --bind-port 50052 -c 4

执行查询

Ballista提供了BallistaContext作为创建查询的起点。可以通过调用read_csvread_parquetsql方法来创建DataFrame。

要构建一个简单的Ballista示例,请将以下依赖项添加到您的Cargo.toml文件中

[dependencies]
ballista = "0.11"
datafusion = "28.0.0"
tokio = "1.0"

以下示例在一个来自纽约出租车和豪华轿车委员会数据集的Parquet文件(yellow_tripdata_2022-01.parquet)上运行一个简单的聚合SQL查询。在运行示例之前,请下载该文件并将其添加到testdata文件夹中。

use ballista::prelude::*;
use datafusion::prelude::{col, min, max, avg, sum, ParquetReadOptions};
use datafusion::arrow::util::pretty;
use datafusion::prelude::CsvReadOptions;

#[tokio::main]
async fn main() -> Result<()> {
    // create configuration
    let config = BallistaConfig::builder()
        .set("ballista.shuffle.partitions", "4")
        .build()?;

    // connect to Ballista scheduler
    let ctx = BallistaContext::remote("localhost", 50050, &config).await?;

    let filename = "testdata/yellow_tripdata_2022-01.parquet";

    // define the query using the DataFrame trait
    let df = ctx
        .read_parquet(filename, ParquetReadOptions::default())
        .await?
        .select_columns(&["passenger_count", "fare_amount"])?
        .aggregate(vec![col("passenger_count")], vec![min(col("fare_amount")), max(col("fare_amount")), avg(col("fare_amount")), sum(col("fare_amount"))])?
        .sort(vec![col("passenger_count").sort(true,true)])?;

    // this is equivalent to the following SQL
    // SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), AVG(fare_amount), SUM(fare_amount)
    //     FROM tripdata
    //     GROUP BY passenger_count
    //     ORDER BY passenger_count

    // print the results
    df.show().await?;

    Ok(())
}

输出应类似于以下表格。

+-----------------+--------------------------+--------------------------+--------------------------+--------------------------+
| passenger_count | MIN(?table?.fare_amount) | MAX(?table?.fare_amount) | AVG(?table?.fare_amount) | SUM(?table?.fare_amount) |
+-----------------+--------------------------+--------------------------+--------------------------+--------------------------+
|                 | -159.5                   | 285.2                    | 17.60577640099004        | 1258865.829999991        |
| 0               | -115                     | 500                      | 11.794859107585335       | 614052.1600000001        |
| 1               | -480                     | 401092.32                | 12.61028389876563        | 22623542.879999973       |
| 2               | -250                     | 640.5                    | 13.79501011585127        | 4732047.139999998        |
| 3               | -130                     | 480                      | 13.473184817311106       | 1139427.2400000002       |
| 4               | -250                     | 464                      | 14.232650547832726       | 502711.4499999997        |
| 5               | -52                      | 668                      | 12.160378472086954       | 624289.51                |
| 6               | -52                      | 252.5                    | 12.576583325529857       | 402916                   |
| 7               | 7                        | 79                       | 61.77777777777778        | 556                      |
| 8               | 8.3                      | 115                      | 79.9125                  | 639.3                    |
| 9               | 9.3                      | 96.5                     | 65.26666666666667        | 195.8                    |
+-----------------+--------------------------+--------------------------+--------------------------+--------------------------+

更多示例可以在arrow-ballista存储库中找到。

依赖项

~69MB
~1.5M SLoC