1 个不稳定版本

0.1.0 2020 年 12 月 10 日

#1134异步

Download history 5/week @ 2024-03-11 31/week @ 2024-03-18 68/week @ 2024-03-25 78/week @ 2024-04-01 99/week @ 2024-04-08 72/week @ 2024-04-15 61/week @ 2024-04-22 102/week @ 2024-04-29 28/week @ 2024-05-06 59/week @ 2024-05-13 64/week @ 2024-05-20 82/week @ 2024-05-27 134/week @ 2024-06-03 121/week @ 2024-06-10 37/week @ 2024-06-17 56/week @ 2024-06-24

348 每月下载量

MIT 许可证

10KB
74 代码行

and-then-concurrent

Rust

通过 TryStreamAndThenExt 特性在 Stream 上使用。

为什么这是必要的?考虑下面的例子。我们有一个来自 try_unfoldStream,但这个流将某个更大的流分割成子流,每个子流由一个通道表示。如果我们简单地调用 and_then,那么该函数的实现,作为一个优化,只在其状态中保留一个 "挂起" 的 future。这意味着它不能轮询底层流,因为这可能会产生它没有空间的另一个 future。所以,它 必须 在再次轮询流之前运行到完成。

不幸的是,在这种情况下,底层流必须被轮询以解决我们的 future!所以使用 and_then 将导致死锁。相反,这个 crate 做了一个权衡:它将在 FuturesUnordered 中保留一个挂起的 futures 列表,这样就可以安全地轮询底层流。这意味着如果结果 futures 没有解决,我们可能会有一长串的 futures。

let c = futures_util::stream::try_unfold(
    (
        0,
        HashMap::<usize, mpsc::UnboundedSender<(usize, usize)>>::default(),
    ),
    move |(mut i, mut map)| async move {
        loop {
            sleep(Duration::from_millis(10)).await;
            let (substream, message) = (i % 3, i);
            i += 1;
            if i > 25 {
                return Ok(None);
            }

            let mut new = None;
            if map
                .entry(substream)
                .or_insert_with(|| {
                    let (sub_s, sub_r) = mpsc::unbounded_channel();
                    new = Some(sub_r);
                    sub_s
                })
                .send((substream, message))
                .is_err()
            {
                map.remove(&substream);
            }

            if let Some(new_sub_r) = new {
                return Ok::<_, String>(Some((new_sub_r, (i, map))));
            }
        }
    },
)
// .and_then(...) would deadlock!
.and_then_concurrent(|mut res| async move {
    loop {
        let (stream, val): (usize, usize) = match res.recv().await {
            None => return Ok(()),
            Some(s) => s,
        };
        println!("got {:?} on stream {:?}", val, stream);
    }
})
.try_collect::<Vec<_>>();
c.await.unwrap();

依赖项

~0.9–1.5MB
~31K SLoC