#分布式 #数据科学 #日志 #星座 #数据

amadeus

在 Rust 中实现和谐分布式数据处理与分析。parquet postgres aws s3 cloudfront elb json csv logs hadoop hdfs arrow common crawl

25 个版本

0.4.3 2021 年 5 月 20 日
0.4.2 2020 年 8 月 18 日
0.3.7 2020 年 7 月 30 日
0.1.6 2020 年 1 月 2 日
0.1.0 2018 年 8 月 17 日

#252并发

Download history

每月 55 次下载

Apache-2.0

1MB
26K SLoC

Amadeus

在 Rust 中实现和谐分布式数据处理与分析

Crates.io Apache-2.0 licensed Build Status

📖 文档 | 🌐 首页 | 💬 聊天

Amadeus 提供

  • 分布式流:类似于 Rayon 的并行迭代器,但分布在集群中。
  • 数据连接器:用于处理 CSV、JSON、Parquet、Postgres、S3 等。
  • ETL 和数据科学工具:专注于流处理和分析。

Amadeus 是 Rust 分布式计算和大数据生态系统的低级可重用构建块。

原则

  • 无畏:无数据竞争,无不安全代码,无损失的数据规范化。
  • 使分布式计算变得简单:运行分布式应与本地运行一样简单和高效。
  • 数据是逐渐类型化的:在已知模式时实现最大性能,在不了解模式时提供灵活性。
  • 简单性:尽可能保持接口和实现简单可靠。
  • 可靠性:尽量减少未处理的错误(包括 OOM),并只暴露无法内部处理的错误。

为什么选择 Amadeus?

干净且可扩展的应用程序

设计上,Amadeus 鼓励您编写干净且可重用的代码,无论数据规模如何,都可以在本地或跨集群运行。一次编写,任何数据规模都可以运行。

社区

我们旨在创建一个对任何感兴趣的人都友好且乐于助人的社区!加入我们的 Zulip 聊天

  • 使 Amadeus 为您的用例工作;
  • 讨论项目的方向;
  • 找到合适的 issues 以便开始。

开箱即用的兼容性

Amadeus 与各种文件格式、数据库和接口具有深度的、可插拔的集成

数据格式 来源 目标
CSV
JSON
XML 👐
Parquet 🔨
Avro 🔨
PostgreSQL 🔨
HDF5 👐
Redshift 👐
CloudFront Logs
Common Crawl
S3 🔨
HDFS 👐 👐

✔ = 正在运行
🔨 = 进行中
👐 = 已请求:查看问题了解如何帮助!

性能

Amadeus 常规基准测试,初步结果非常令人鼓舞

  • 与官方 Apache Arrow parquet crate 相比,读取 Parquet 数据的速度提高了 1.5 倍到 17 倍,这些基准测试在此处

无处不在

Amadeus 是一个库,可以作为并行线程池单独使用,也可以与 Constellation 一起作为分布式集群使用。

Constellation 是一个用于进程分布和通信的框架,并具有裸集群(Linux 或 macOS)、托管 Kubernetes 集群等后端。

示例

此操作将从 S3 存储桶中读取 Parquet 分区,并打印出现频率最高的 100 个 URL。

use amadeus::prelude::*;
use amadeus::data::{IpAddr, Url};
use std::error::Error;

#[derive(Data, Clone, PartialEq, Debug)]
struct LogLine {
    uri: Option<String>,
    requestip: Option<IpAddr>,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let pool = ThreadPool::new(None, None)?;

    let rows = Parquet::new(ParquetDirectory::new(S3Directory::new_with(
        AwsRegion::UsEast1,
        "us-east-1.data-analytics",
        "cflogworkshop/optimized/cf-accesslogs/",
        AwsCredentials::Anonymous,
    )))
    .await?;

    let top_pages = rows
        .par_stream()
        .map(|row: Result<LogLine, _>| {
            let row = row.unwrap();
            (row.uri, row.requestip)
        })
        .most_distinct(&pool, 100, 0.99, 0.002, 0.0808)
        .await;

    println!("{:#?}", top_pages);
    Ok(())
}

这是有类型的,因此更快,并且还进一步进行了分析步骤,按记录的独立 IP 打印前 100 个 URL。

查看相同示例,但使用动态类型的数据。
use amadeus::prelude::*;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let pool = ThreadPool::new(None, None)?;

    let rows = Parquet::new(ParquetDirectory::new(S3Directory::new_with(
        AwsRegion::UsEast1,
        "us-east-1.data-analytics",
        "cflogworkshop/optimized/cf-accesslogs/",
        AwsCredentials::Anonymous,
    )))
    .await?;

    let top_pages = rows
        .par_stream()
        .map(|row: Result<Value, _>| {
            let row = row.ok()?.into_group().ok()?;
            row.get("uri")?.clone().into_url().ok()
        })
        .filter(|row| row.is_some())
        .map(Option::unwrap)
        .most_frequent(&pool, 100, 0.99, 0.002)
        .await;

    println!("{:#?}", top_pages);
    Ok(())
}

将此数据加载到 Postgres 中怎么办?这将创建并填充名为 "accesslogs" 的表。

use amadeus::prelude::*;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let pool = ThreadPool::new(None, None)?;

    let rows = Parquet::new(ParquetDirectory::new(S3Directory::new_with(
        AwsRegion::UsEast1,
        "us-east-1.data-analytics",
        "cflogworkshop/optimized/cf-accesslogs/",
        AwsCredentials::Anonymous,
    )))
    .await?;

    // Note: this isn't yet implemented!
    rows.par_stream()
        .pipe(Postgres::new("127.0.0.1", PostgresTable::new("accesslogs")));

    Ok(())
}

运行分布式

操作可以在并行线程池或分布式进程池中运行。

Amadeus 使用 Constellation 框架进行进程分布和通信。Constellation 具有裸集群(Linux 或 macOS)和托管 Kubernetes 集群的后端。

use amadeus::dist::prelude::*;
use amadeus::data::{IpAddr, Url};
use constellation::*;
use std::error::Error;

#[derive(Data, Clone, PartialEq, Debug)]
struct LogLine {
    uri: Option<String>,
    requestip: Option<IpAddr>,
}

fn main() -> Result<(), Box<dyn Error>> {
    init(Resources::default());

    // #[tokio::main] isn't supported yet so unfortunately setting up the Runtime must be done explicitly
    tokio::runtime::Builder::new()
        .threaded_scheduler()
        .enable_all()
        .build()
        .unwrap()
        .block_on(async {
            let pool = ProcessPool::new(None, None, None, Resources::default())?;

            let rows = Parquet::new(ParquetDirectory::new(S3Directory::new_with(
                AwsRegion::UsEast1,
                "us-east-1.data-analytics",
                "cflogworkshop/optimized/cf-accesslogs/",
                AwsCredentials::Anonymous,
            )))
            .await?;

            let top_pages = rows
                .dist_stream()
                .map(FnMut!(|row: Result<LogLine, _>| {
                    let row = row.unwrap();
                    (row.uri, row.requestip)
                }))
                .most_distinct(&pool, 100, 0.99, 0.002, 0.0808)
                .await;

            println!("{:#?}", top_pages);
            Ok(())
        })
}

入门

待办事项

示例

查看各种 示例

贡献

Amadeus 是一个开源项目!如果您想贡献,请查看“良好入门问题”列表。这些都是(或应该是)适合入门的问题,通常包括一个详细的操作说明。如果有什么不清楚的,请提问并在 我们的 Zulip 聊天 中 ping 我们!

许可证

根据 Apache License,版本 2.0 许可,(LICENSE.txthttps://apache.ac.cn/licenses/LICENSE-2.0).

除非您明确声明,否则根据 Apache-2.0 许可定义,您有意提交以包含在作品中的任何贡献,都应按照上述方式许可,而不附加任何其他条款或条件。

依赖关系

~15–35MB
~568K SLoC