#async-stream #future #output-stream #async #non-blocking

stream-map-any

允许合并不同输出类型的异步流

4个版本

0.2.2 2020年5月2日
0.2.1 2020年5月1日
0.2.0 2020年5月1日
0.1.0 2020年5月1日

#15#output-stream

MIT 许可证

12KB
141

stream-map-any

允许合并不同输出类型的异步流。

它与Tokio的StreamMap非常相似,但不需要流有相同的输出类型。当不知道应该合并哪种类型的流时,这很有用,它可以作为运行时动态选择。

不是零成本抽象

由于我们不知道流将生成哪种类型的输出,生成的输出将是一个StreamMapAnyVariant,这是围绕Box<dyn Any>的新类型。因此,我们依赖于动态调度来将其转换回所需的输出。基准测试显示,它比StreamMap或Tokio的select宏慢两倍。

示例

要开始,请将以下内容添加到Cargo.toml

stream-map-any = "0.2"

合并两个流

use futures::channel::mpsc::channel;
use futures::executor::block_on;
use futures::stream::{self, StreamExt};
use stream_map_any::StreamMapAny;

fn main() {
    let int_stream = stream::iter(vec![1; 10]);
    let (mut tx, rx) = channel::<String>(100);

    let mut merge = StreamMapAny::new();
    merge.insert(0, int_stream);
    merge.insert(1, rx);

    std::thread::spawn(move || {
        tx.try_send("hello world".into()).unwrap();
    });

    block_on(async move {
        loop {
            match merge.next().await {
                Some((0, val)) => {
                    let _val: i32 = val.value().unwrap();
                }
                Some((1, val)) => {
                    let _val: String = val.value().unwrap();
                }
                Some(_) => panic!("unexpected key"),
                None => break,
            }
        }
    });
}

更多详细信息请参阅API文档

许可证

MIT许可证下授权。

依赖关系

~1MB
~15K SLoC