3个不稳定版本
0.2.0 | 2020年5月13日 |
---|---|
0.1.1 | 2020年5月1日 |
0.1.0 | 2020年5月1日 |
#1804 in 异步
4,753 每月下载量
用于 vila
19KB
281 行
flatten_iters
将迭代器流扁平化为一个连续流。
当您有一个正在分页遍历资源(如具有分页的REST端点、带有next URL的ElasticSearch查询等)的生产者时,这很有用。
此代码几乎直接取自 StreamExt::flatten
,并在精神上类似于 Iterator::flatten
。
use stream_flatten_iters::StreamExt as _;
use futures::stream::StreamExt;
#[tokio::main]
async fn main() {
let (mut tx, mut rx) = tokio::sync::mpsc::channel(3);
tokio::spawn(async move {
tx.send(vec![0, 1, 2, 3]).await.unwrap();
tx.send(vec![4, 5, 6]).await.unwrap();
tx.send(vec![7, 8, 9]).await.unwrap();
});
let mut stream = rx.flatten_iters();
while let Some(res) = stream.next().await {
println!("got = {}", res);
}
}
// Output:
// got = 0
// got = 1
// got = 2
// got = 3
// got = 4
// got = 5
// got = 6
// got = 7
// got = 8
// got = 9
当与 StreamExt::buffered
结合使用时,这特别有用,以在整个长promise期间保持promise缓冲区。
use stream_flatten_iters::StreamExt as _;
use futures::stream::StreamExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (mut tx, mut rx) = tokio::sync::mpsc::channel(3);
tokio::spawn(async move {
for i in 0_usize..100 {
let start = i * 10;
let end = start + 10;
tx.send(start..end).await.unwrap();
}
});
let mut stream = rx.flatten_iters().map(|i| long_process(i)).buffered(10);
let mut total = 0_usize;
while let Some(res) = stream.next().await {
let _ = res?;
total += 1;
println!("Completed {} tasks", total);
}
Ok(())
}
async fn long_process(i: usize) -> Result<(), Box<dyn std::error::Error>> {
// Do something that takes a long time
Ok(())
}
依赖关系
~1.5MB
~36K SLoC