4 个版本 (2 个重大变更)
0.2.1 | 2020年4月18日 |
---|---|
0.2.0 | 2020年4月18日 |
0.1.0 | 2020年4月14日 |
0.0.1 | 2020年4月12日 |
#647 in 异步
32KB
335 行
Stream Router
此 crate 提供了一个 StreamRouter
结构体,该结构体能够在 Stream
和 Sink
之间动态路由值。
当与 Stream
和 Sink
一起工作时,通常需要编写大量样板代码,包括链式 Stream 组合器 和特定业务逻辑,以确保在 Stream
和 Sink
之间安全地路由。此 crate 试图提供一种通用的通用组合器和动态未来感知路由器的实现,同时具有最小的依赖关系,并且是无执行器相关的。
StreamRouter
是此 crate 的主要结构体,能够在 Stream
和 Sink
之间动态路由值。一个 StreamRouter
在其核心是一个 Stream
,可以拥有任意数量的其他 Stream
和任意数量的 Sink
,并通过用户定义的路由规则将 Stream
产生的值动态路由到任何一个提供的 Sink
。
每个提供给 StreamRouter
的 Sink
都会被赋予一个用户自定义的、可哈希的值。这个标签被路由器用于识别和区分 Sink
,并且是用户在定义路由逻辑时引用特定 Sink
的方式。
每个 Stream
都提供了一个匹配的闭包,该闭包消费由伴随的 Stream
产生的值,并返回一个 Future
。该 Future
将解析为识别要转发产生的值的特定 Sink
的标签。如果找不到与返回的路由标签匹配的 Sink
,则值将直接从 StreamRouter
中产生。
StreamRouter
保证来自 Stream
"A" 并发送到 Sink
"B" 的值的顺序将被保持,即 "A" 不会尝试将任何值下沉到 "B",直到所有之前从 "A" 发送到 "B" 的值都已处理。没有跨 Stream
或跨 Sink
的时间或顺序保证。
示例
以下示例来自 simple.rs
,位于 examples 文件夹。这个简单的例子说明了 StreamRouter
将所有 偶数 值转发到 even_chan_tx
,而所有 奇数 则由 StreamRouter
本身产生。如果需要,用户可以提供第二个 Sink
来显式消费 奇数 值,在这种情况下,StreamRouter
不会产生任何值。
use futures::{channel::mpsc, future, stream, stream::StreamExt};
use stream_router;
use tokio;
#[tokio::main]
async fn main() {
let mut router = stream_router::StreamRouter::new();
let nums = stream::iter(0..1_000);
let (even_chan_tx, mut even_chan_rx) = mpsc::channel(10);
router.add_source(nums, |x| future::lazy(move |_| (x, x % 2 == 0)));
router.add_sink(even_chan_tx, true);
loop {
tokio::select! {
v = router.next() => {
println!("odd number: {:?}", v.unwrap());
}
v = even_chan_rx.next() => {
println!("even number: {:?}", v.unwrap());
}
}
}
}
路由逻辑
StreamRouter的路由逻辑是由用户以闭包的形式提供的,这些闭包可以将特定Stream
产生的值映射为标识特定Sink
的标签。这些闭包遵循以下形式:Fn(A) -> Future<Output=T>
,其中A
是由Stream
产生的值,而T
是用户分配给其Sink
之一的标签。需要注意的是,闭包会获取流产生的值的所有权,并且负责将值作为包含Stream
标签的元组的一部分返回。这样做是为了避免需要为每个值调用clone()
,同时也允许用户根据他们的特定用例“映射”这些值。虽然简单的路由(如上面所示)实际上不需要利用返回Future
所提供的灵活性,但返回Future
的选项允许进行更复杂的状态路由。使用状态路由来去重传入的Stream
的示例可以在dedup.rs
示例中找到。
许可证
在Apache License, Version 2.0下授权依赖关系
~1MB
~16K SLoC