#stream #iterator #future #async #non-blocking

stream-flatten-iters

将迭代器流扁平化为一个连续流

3个不稳定版本

0.2.0 2020年5月13日
0.1.1 2020年5月1日
0.1.0 2020年5月1日

#1804 in 异步

Download history 870/week @ 2024-03-14 1067/week @ 2024-03-21 1010/week @ 2024-03-28 1018/week @ 2024-04-04 1248/week @ 2024-04-11 701/week @ 2024-04-18 867/week @ 2024-04-25 831/week @ 2024-05-02 587/week @ 2024-05-09 1096/week @ 2024-05-16 889/week @ 2024-05-23 1209/week @ 2024-05-30 1359/week @ 2024-06-06 1336/week @ 2024-06-13 1192/week @ 2024-06-20 760/week @ 2024-06-27

4,753 每月下载量
用于 vila

MIT 许可证

19KB
281

flatten_iters 将迭代器流扁平化为一个连续流。

当您有一个正在分页遍历资源(如具有分页的REST端点、带有next URL的ElasticSearch查询等)的生产者时,这很有用。

此代码几乎直接取自 StreamExt::flatten,并在精神上类似于 Iterator::flatten

use stream_flatten_iters::StreamExt as _;
use futures::stream::StreamExt;

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = tokio::sync::mpsc::channel(3);

    tokio::spawn(async move {
        tx.send(vec![0, 1, 2, 3]).await.unwrap();
        tx.send(vec![4, 5, 6]).await.unwrap();
        tx.send(vec![7, 8, 9]).await.unwrap();
    });

    let mut stream = rx.flatten_iters();

    while let Some(res) = stream.next().await {
        println!("got = {}", res);
    }
}

// Output:
// got = 0
// got = 1
// got = 2
// got = 3
// got = 4
// got = 5
// got = 6
// got = 7
// got = 8
// got = 9

当与 StreamExt::buffered 结合使用时,这特别有用,以在整个长promise期间保持promise缓冲区。

use stream_flatten_iters::StreamExt as _;
use futures::stream::StreamExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (mut tx, mut rx) = tokio::sync::mpsc::channel(3);

    tokio::spawn(async move {
        for i in 0_usize..100 {
            let start = i * 10;
            let end = start + 10;
            tx.send(start..end).await.unwrap();
        }
    });

    let mut stream = rx.flatten_iters().map(|i| long_process(i)).buffered(10);

    let mut total = 0_usize;
    while let Some(res) = stream.next().await {
        let _ = res?;
        total += 1;
        println!("Completed {} tasks", total);
    }

    Ok(())
}

async fn long_process(i: usize) -> Result<(), Box<dyn std::error::Error>> {
    // Do something that takes a long time
    Ok(())
}

依赖关系

~1.5MB
~36K SLoC