1 个不稳定版本
0.1.0 | 2020 年 12 月 10 日 |
---|
#1134 在 异步
348 每月下载量
10KB
74 代码行
and-then-concurrent
通过 TryStreamAndThenExt
特性在 Stream
上使用。
为什么这是必要的?考虑下面的例子。我们有一个来自 try_unfold
的 Stream
,但这个流将某个更大的流分割成子流,每个子流由一个通道表示。如果我们简单地调用 and_then
,那么该函数的实现,作为一个优化,只在其状态中保留一个 "挂起" 的 future。这意味着它不能轮询底层流,因为这可能会产生它没有空间的另一个 future。所以,它 必须 在再次轮询流之前运行到完成。
不幸的是,在这种情况下,底层流必须被轮询以解决我们的 future!所以使用 and_then
将导致死锁。相反,这个 crate 做了一个权衡:它将在 FuturesUnordered
中保留一个挂起的 futures 列表,这样就可以安全地轮询底层流。这意味着如果结果 futures 没有解决,我们可能会有一长串的 futures。
let c = futures_util::stream::try_unfold(
(
0,
HashMap::<usize, mpsc::UnboundedSender<(usize, usize)>>::default(),
),
move |(mut i, mut map)| async move {
loop {
sleep(Duration::from_millis(10)).await;
let (substream, message) = (i % 3, i);
i += 1;
if i > 25 {
return Ok(None);
}
let mut new = None;
if map
.entry(substream)
.or_insert_with(|| {
let (sub_s, sub_r) = mpsc::unbounded_channel();
new = Some(sub_r);
sub_s
})
.send((substream, message))
.is_err()
{
map.remove(&substream);
}
if let Some(new_sub_r) = new {
return Ok::<_, String>(Some((new_sub_r, (i, map))));
}
}
},
)
// .and_then(...) would deadlock!
.and_then_concurrent(|mut res| async move {
loop {
let (stream, val): (usize, usize) = match res.recv().await {
None => return Ok(()),
Some(s) => s,
};
println!("got {:?} on stream {:?}", val, stream);
}
})
.try_collect::<Vec<_>>();
c.await.unwrap();
依赖项
~0.9–1.5MB
~31K SLoC