9 个不稳定版本 (3 个破坏性更新)

0.4.0 2024年7月28日
0.3.2 2024年7月6日
0.3.0 2024年6月24日
0.2.2 2024年5月27日
0.1.2 2024年5月22日

#11 in #etl

Download history 273/week @ 2024-05-18 458/week @ 2024-05-25 34/week @ 2024-06-01 8/week @ 2024-06-08 1/week @ 2024-06-15 181/week @ 2024-06-22 32/week @ 2024-06-29 244/week @ 2024-07-06 9/week @ 2024-07-13 20/week @ 2024-07-20 267/week @ 2024-07-27 5/week @ 2024-08-03

每月310次下载
用于 2 个 crate (通过 aqueducts)

自定义许可

11KB
102

Aqueducts

Build status Crates.io Documentation

Aqueducts 是一个用于声明式编写和执行 ETL 数据管道的框架。

特性

  • 使用 YAML 定义 ETL 管道
  • 从 csv 文件、JSONL、parquet 文件或 delta 表中提取数据
  • 使用 SQL 处理数据
  • 将数据加载到对象存储中作为 csv/parquet 或 delta 表
  • 支持文件和 delta 表分区
  • 支持 delta 表上的 Upsert/Replace/Append 操作
  • 支持 Local、S3、GCS 和 Azure Blob 存储
  • 实验性 支持 ODBC 源和目标

此框架建立在 arrow-rs、datafusion、delta-rs 等项目所做出色工作的基础上

请为这些项目提供一些支持 ❤️!

文档

您可以在 https://vigimite.github.io/aqueducts 找到文档

变更日志: CHANGELOG

快速入门

要定义和执行 Aqueduct 管道,有以下几种选择

  • 使用 yaml 配置文件
  • 使用 json 配置文件
  • 在代码中手动

您可以在 examples 目录中查看一些示例。以下是一个使用 yaml 配置格式定义 Aqueduct 管道的简单示例 链接

sources:
  # Register a local file source containing temperature readings for various cities
  - type: File
    name: temp_readings
    file_type:
      type: Csv
      options: {}
    # use built-in templating functionality
    location: ./examples/temp_readings_${month}_${year}.csv

  #Register a local file source containing a mapping between location_ids and location names
  - type: File
    name: locations
    file_type:
      type: Csv
      options: {}
    location: ./examples/location_dict.csv

stages:
  # Query to aggregate temperature data by date and location
  - - name: aggregated
      query: >
          SELECT
            cast(timestamp as date) date,
            location_id,
            round(min(temperature_c),2) min_temp_c,
            round(min(humidity),2) min_humidity,
            round(max(temperature_c),2) max_temp_c,
            round(max(humidity),2) max_humidity,
            round(avg(temperature_c),2) avg_temp_c,
            round(avg(humidity),2) avg_humidity
          FROM temp_readings
          GROUP by 1,2
          ORDER by 1 asc
      # print the query plan to stdout for debugging purposes
      explain: true

  # Enrich aggregation with the location name
  - - name: enriched
      query: >
        SELECT
          date,
          location_name,
          min_temp_c,
          max_temp_c,
          avg_temp_c,
          min_humidity,
          max_humidity,
          avg_humidity
        FROM aggregated
        JOIN locations 
          ON aggregated.location_id = locations.location_id
        ORDER BY date, location_name
      # print 10 rows to stdout for debugging purposes
      show: 10

# Write the pipeline result to a parquet file (can be omitted if you don't want an output)
destination:
  type: File
  name: results
  file_type:
    type: Parquet
    options: {}
  location: ./examples/output_${month}_${year}.parquet

此存储库包含 Aqueducts 框架的最小示例实现,可用于测试上述管道定义

cargo install aqueducts-cli
aqueducts --file examples/aqueduct_pipeline_example.yml --param year=2024 --param month=jan

路线图

  • 文档
  • ODBC 源
  • ODBC 目标
  • 阶段并行处理
  • 流式源(最初是 kafka + 可能是 aws kinesis)
  • 流式目标(最初是 kafka)

依赖关系

~53–72MB
~1.5M SLoC