25 个版本
0.4.3 | 2021 年 5 月 20 日 |
---|---|
0.4.2 | 2020 年 8 月 18 日 |
0.3.7 | 2020 年 7 月 30 日 |
0.1.6 | 2020 年 1 月 2 日 |
0.1.0 | 2018 年 8 月 17 日 |
#252 在 并发
每月 55 次下载
1MB
26K SLoC
在 Rust 中实现和谐分布式数据处理与分析
Amadeus 提供
- 分布式流:类似于 Rayon 的并行迭代器,但分布在集群中。
- 数据连接器:用于处理 CSV、JSON、Parquet、Postgres、S3 等。
- ETL 和数据科学工具:专注于流处理和分析。
Amadeus 是 Rust 分布式计算和大数据生态系统的低级可重用构建块。
原则
- 无畏:无数据竞争,无不安全代码,无损失的数据规范化。
- 使分布式计算变得简单:运行分布式应与本地运行一样简单和高效。
- 数据是逐渐类型化的:在已知模式时实现最大性能,在不了解模式时提供灵活性。
- 简单性:尽可能保持接口和实现简单可靠。
- 可靠性:尽量减少未处理的错误(包括 OOM),并只暴露无法内部处理的错误。
为什么选择 Amadeus?
干净且可扩展的应用程序
设计上,Amadeus 鼓励您编写干净且可重用的代码,无论数据规模如何,都可以在本地或跨集群运行。一次编写,任何数据规模都可以运行。
社区
我们旨在创建一个对任何感兴趣的人都友好且乐于助人的社区!加入我们的 Zulip 聊天 来
- 使 Amadeus 为您的用例工作;
- 讨论项目的方向;
- 找到合适的 issues 以便开始。
开箱即用的兼容性
Amadeus 与各种文件格式、数据库和接口具有深度的、可插拔的集成
数据格式 | 来源 |
目标 |
---|---|---|
CSV | ✔ | ✔ |
JSON | ✔ | ✔ |
XML | 👐 | |
Parquet | ✔ | 🔨 |
Avro | 🔨 | |
PostgreSQL | ✔ | 🔨 |
HDF5 | 👐 | |
Redshift | 👐 | |
CloudFront Logs | ✔ | – |
Common Crawl | ✔ | – |
S3 | ✔ | 🔨 |
HDFS | 👐 | 👐 |
✔ = 正在运行
🔨 = 进行中
👐 = 已请求:查看问题了解如何帮助!
性能
Amadeus 常规基准测试,初步结果非常令人鼓舞
无处不在
Amadeus 是一个库,可以作为并行线程池单独使用,也可以与 Constellation 一起作为分布式集群使用。
Constellation 是一个用于进程分布和通信的框架,并具有裸集群(Linux 或 macOS)、托管 Kubernetes 集群等后端。
示例
此操作将从 S3 存储桶中读取 Parquet 分区,并打印出现频率最高的 100 个 URL。
use amadeus::prelude::*;
use amadeus::data::{IpAddr, Url};
use std::error::Error;
#[derive(Data, Clone, PartialEq, Debug)]
struct LogLine {
uri: Option<String>,
requestip: Option<IpAddr>,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let pool = ThreadPool::new(None, None)?;
let rows = Parquet::new(ParquetDirectory::new(S3Directory::new_with(
AwsRegion::UsEast1,
"us-east-1.data-analytics",
"cflogworkshop/optimized/cf-accesslogs/",
AwsCredentials::Anonymous,
)))
.await?;
let top_pages = rows
.par_stream()
.map(|row: Result<LogLine, _>| {
let row = row.unwrap();
(row.uri, row.requestip)
})
.most_distinct(&pool, 100, 0.99, 0.002, 0.0808)
.await;
println!("{:#?}", top_pages);
Ok(())
}
这是有类型的,因此更快,并且还进一步进行了分析步骤,按记录的独立 IP 打印前 100 个 URL。
查看相同示例,但使用动态类型的数据。
use amadeus::prelude::*;
use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let pool = ThreadPool::new(None, None)?;
let rows = Parquet::new(ParquetDirectory::new(S3Directory::new_with(
AwsRegion::UsEast1,
"us-east-1.data-analytics",
"cflogworkshop/optimized/cf-accesslogs/",
AwsCredentials::Anonymous,
)))
.await?;
let top_pages = rows
.par_stream()
.map(|row: Result<Value, _>| {
let row = row.ok()?.into_group().ok()?;
row.get("uri")?.clone().into_url().ok()
})
.filter(|row| row.is_some())
.map(Option::unwrap)
.most_frequent(&pool, 100, 0.99, 0.002)
.await;
println!("{:#?}", top_pages);
Ok(())
}
将此数据加载到 Postgres 中怎么办?这将创建并填充名为 "accesslogs" 的表。
use amadeus::prelude::*;
use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let pool = ThreadPool::new(None, None)?;
let rows = Parquet::new(ParquetDirectory::new(S3Directory::new_with(
AwsRegion::UsEast1,
"us-east-1.data-analytics",
"cflogworkshop/optimized/cf-accesslogs/",
AwsCredentials::Anonymous,
)))
.await?;
// Note: this isn't yet implemented!
rows.par_stream()
.pipe(Postgres::new("127.0.0.1", PostgresTable::new("accesslogs")));
Ok(())
}
运行分布式
操作可以在并行线程池或分布式进程池中运行。
Amadeus 使用 Constellation 框架进行进程分布和通信。Constellation 具有裸集群(Linux 或 macOS)和托管 Kubernetes 集群的后端。
use amadeus::dist::prelude::*;
use amadeus::data::{IpAddr, Url};
use constellation::*;
use std::error::Error;
#[derive(Data, Clone, PartialEq, Debug)]
struct LogLine {
uri: Option<String>,
requestip: Option<IpAddr>,
}
fn main() -> Result<(), Box<dyn Error>> {
init(Resources::default());
// #[tokio::main] isn't supported yet so unfortunately setting up the Runtime must be done explicitly
tokio::runtime::Builder::new()
.threaded_scheduler()
.enable_all()
.build()
.unwrap()
.block_on(async {
let pool = ProcessPool::new(None, None, None, Resources::default())?;
let rows = Parquet::new(ParquetDirectory::new(S3Directory::new_with(
AwsRegion::UsEast1,
"us-east-1.data-analytics",
"cflogworkshop/optimized/cf-accesslogs/",
AwsCredentials::Anonymous,
)))
.await?;
let top_pages = rows
.dist_stream()
.map(FnMut!(|row: Result<LogLine, _>| {
let row = row.unwrap();
(row.uri, row.requestip)
}))
.most_distinct(&pool, 100, 0.99, 0.002, 0.0808)
.await;
println!("{:#?}", top_pages);
Ok(())
})
}
入门
待办事项
示例
查看各种 示例。
贡献
Amadeus 是一个开源项目!如果您想贡献,请查看“良好入门问题”列表。这些都是(或应该是)适合入门的问题,通常包括一个详细的操作说明。如果有什么不清楚的,请提问并在 我们的 Zulip 聊天 中 ping 我们!
许可证
根据 Apache License,版本 2.0 许可,(LICENSE.txt 或 https://apache.ac.cn/licenses/LICENSE-2.0).
除非您明确声明,否则根据 Apache-2.0 许可定义,您有意提交以包含在作品中的任何贡献,都应按照上述方式许可,而不附加任何其他条款或条件。
依赖关系
~15–35MB
~568K SLoC