6 个版本
0.3.1 | 2024年2月4日 |
---|---|
0.3.0 | 2024年1月13日 |
0.2.2 | 2024年1月11日 |
0.2.0 | 2023年12月31日 |
0.1.0 | 2023年12月29日 |
#440 在 异步 中
83KB
1.5K SLoC
Async Pipes
为 Rust 应用程序创建轻量级、并发的数据管道。
描述
Async Pipes 通过利用 Rust 的异步运行时能力,提供了一种简单的方式来创建高吞吐量数据管道。这是通过提供管理异步任务和任务间数据传输的框架来实现的,因此开发者只需要关注管道每个阶段的特定实现。
此库通过能够处理循环通道和通过启动并发任务进行可扩展性,与手动构建的管道(即使用通道)区分开来。
有关开始使用 Async Pipes 的信息,请参阅文档。
文档
简单、线性管道示例
#[tokio::main]
async fn main() {
let total = Arc::new(AtomicUsize::new(0));
let task_total = total.clone();
Pipeline::builder()
.with_inputs("MapPipe", vec!["a", "bb", "ccc"])
.with_stage(
"MapPipe",
"ReducePipe",
WorkerOptions::default(),
|value: &'static str| async move { Some(format!("{}!", value)) },
)
.with_consumer(
"ReducePipe",
WorkerOptions::default_single_task(),
move |value: String| {
let total = task_total.clone();
async move {
total.fetch_add(value.len(), Ordering::SeqCst);
}
},
)
.build()
.expect("failed to build pipeline!")
.wait()
.await;
assert_eq!(total.load(Ordering::Acquire), 9);
}
分支、循环管道示例(例如网络爬虫)
#[tokio::main]
async fn main() {
let initial_urls = vec![
"https://example.com".to_string(),
"https://rust-lang.net.cn".to_string(),
];
Pipeline::builder()
.with_inputs("ToFetch", initial_urls)
.with_flattener::<Vec<String>>("ToFlattenThenFetch", "ToFetch")
.with_stage(
"ToFetch",
"ToCrawl",
WorkerOptions::default_multi_task(),
|_url: String| async move {
// Fetch content from url...
Some("<html>Sample Content</html>".to_string())
},
)
.with_branching_stage(
"ToCrawl",
vec!["ToFlattenThenFetch", "ToLog"],
WorkerOptions::default_single_task(),
|_html: String| async move {
// Crawl HTML, extracting embedded URLs and content
let has_embedded_urls = false; // Mimic the crawler not finding any URLs
let output = if has_embedded_urls {
let urls = vec![
"https://first.com".to_string(),
"https://second.com".to_string(),
];
branch![urls, NoOutput]
} else {
branch![NoOutput, "Extracted content".to_string()]
};
Some(output)
},
)
.with_consumer(
"ToLog",
WorkerOptions::default_single_task(),
|content: String| async move { println!("{content}") },
)
.build()
.expect("failed to build pipeline!")
.wait()
.await;
}
依赖关系
~3–4.5MB
~77K SLoC