6 个版本

0.3.1 2024年2月4日
0.3.0 2024年1月13日
0.2.2 2024年1月11日
0.2.0 2023年12月31日
0.1.0 2023年12月29日

#440异步

GPL-2.0-or-later

83KB
1.5K SLoC

Async Pipes

为 Rust 应用程序创建轻量级、并发的数据管道。

描述

Async Pipes 通过利用 Rust 的异步运行时能力,提供了一种简单的方式来创建高吞吐量数据管道。这是通过提供管理异步任务和任务间数据传输的框架来实现的,因此开发者只需要关注管道每个阶段的特定实现。

此库通过能够处理循环通道和通过启动并发任务进行可扩展性,与手动构建的管道(即使用通道)区分开来。

有关开始使用 Async Pipes 的信息,请参阅文档

文档

Async Pipes - Docs.rs

简单、线性管道示例

#[tokio::main]
async fn main() {
    let total = Arc::new(AtomicUsize::new(0));
    let task_total = total.clone();

    Pipeline::builder()
        .with_inputs("MapPipe", vec!["a", "bb", "ccc"])
        .with_stage(
            "MapPipe",
            "ReducePipe",
            WorkerOptions::default(),
            |value: &'static str| async move { Some(format!("{}!", value)) },
        )
        .with_consumer(
            "ReducePipe",
            WorkerOptions::default_single_task(),
            move |value: String| {
                let total = task_total.clone();
                async move {
                    total.fetch_add(value.len(), Ordering::SeqCst);
                }
            },
        )
        .build()
        .expect("failed to build pipeline!")
        .wait()
        .await;

    assert_eq!(total.load(Ordering::Acquire), 9);
}

分支、循环管道示例(例如网络爬虫)

#[tokio::main]
async fn main() {
    let initial_urls = vec![
        "https://example.com".to_string(),
        "https://rust-lang.net.cn".to_string(),
    ];

    Pipeline::builder()
        .with_inputs("ToFetch", initial_urls)
        .with_flattener::<Vec<String>>("ToFlattenThenFetch", "ToFetch")
        .with_stage(
            "ToFetch",
            "ToCrawl",
            WorkerOptions::default_multi_task(),
            |_url: String| async move {
                // Fetch content from url...
                Some("<html>Sample Content</html>".to_string())
            },
        )
        .with_branching_stage(
            "ToCrawl",
            vec!["ToFlattenThenFetch", "ToLog"],
            WorkerOptions::default_single_task(),
            |_html: String| async move {
                // Crawl HTML, extracting embedded URLs and content
                let has_embedded_urls = false; // Mimic the crawler not finding any URLs

                let output = if has_embedded_urls {
                    let urls = vec![
                        "https://first.com".to_string(),
                        "https://second.com".to_string(),
                    ];
                    branch![urls, NoOutput]
                } else {
                    branch![NoOutput, "Extracted content".to_string()]
                };

                Some(output)
            },
        )
        .with_consumer(
            "ToLog",
            WorkerOptions::default_single_task(),
            |content: String| async move { println!("{content}") },
        )
        .build()
        .expect("failed to build pipeline!")
        .wait()
        .await;
}

依赖关系

~3–4.5MB
~77K SLoC