#stream #future #tokio #async

stream_router

一个具备未来感知功能的路由器,用于在流和汇之间动态路由

4 个版本 (2 个重大变更)

0.2.1 2020年4月18日
0.2.0 2020年4月18日
0.1.0 2020年4月14日
0.0.1 2020年4月12日

#647 in 异步

Apache-2.0

32KB
335

Stream Router

Latest Version License Downloads

此 crate 提供了一个 StreamRouter 结构体,该结构体能够在 StreamSink 之间动态路由值。

API 文档

crates.io


当与 StreamSink 一起工作时,通常需要编写大量样板代码,包括链式 Stream 组合器 和特定业务逻辑,以确保在 StreamSink 之间安全地路由。此 crate 试图提供一种通用的通用组合器和动态未来感知路由器的实现,同时具有最小的依赖关系,并且是无执行器相关的。

StreamRouter 是此 crate 的主要结构体,能够在 StreamSink 之间动态路由值。一个 StreamRouter 在其核心是一个 Stream,可以拥有任意数量的其他 Stream 和任意数量的 Sink,并通过用户定义的路由规则将 Stream 产生的值动态路由到任何一个提供的 Sink

每个提供给 StreamRouterSink 都会被赋予一个用户自定义的、可哈希的值。这个标签被路由器用于识别和区分 Sink,并且是用户在定义路由逻辑时引用特定 Sink 的方式。

每个 Stream 都提供了一个匹配的闭包,该闭包消费由伴随的 Stream 产生的值,并返回一个 Future。该 Future 将解析为识别要转发产生的值的特定 Sink 的标签。如果找不到与返回的路由标签匹配的 Sink,则值将直接从 StreamRouter 中产生。

StreamRouter 保证来自 Stream "A" 并发送到 Sink "B" 的值的顺序将被保持,即 "A" 不会尝试将任何值下沉到 "B",直到所有之前从 "A" 发送到 "B" 的值都已处理。没有跨 Stream 或跨 Sink 的时间或顺序保证。

示例

以下示例来自 simple.rs,位于 examples 文件夹。这个简单的例子说明了 StreamRouter 将所有 偶数 值转发到 even_chan_tx,而所有 奇数 则由 StreamRouter 本身产生。如果需要,用户可以提供第二个 Sink 来显式消费 奇数 值,在这种情况下,StreamRouter 不会产生任何值。

use futures::{channel::mpsc, future, stream, stream::StreamExt};
use stream_router;
use tokio;

#[tokio::main]
async fn main() {
    let mut router = stream_router::StreamRouter::new();
    let nums = stream::iter(0..1_000);
    let (even_chan_tx, mut even_chan_rx) = mpsc::channel(10);

    router.add_source(nums, |x| future::lazy(move |_| (x, x % 2 == 0)));
    router.add_sink(even_chan_tx, true);

    loop {
        tokio::select! {
            v = router.next() => {
                println!("odd number:  {:?}", v.unwrap());
            }
            v = even_chan_rx.next() => {
                println!("even number: {:?}", v.unwrap());
            }
        }
    }
}

路由逻辑

StreamRouter的路由逻辑是由用户以闭包的形式提供的,这些闭包可以将特定Stream产生的值映射为标识特定Sink的标签。这些闭包遵循以下形式:Fn(A) -> Future<Output=T>,其中A是由Stream产生的值,而T是用户分配给其Sink之一的标签。需要注意的是,闭包会获取流产生的值的所有权,并且负责将值作为包含Stream标签的元组的一部分返回。这样做是为了避免需要为每个值调用clone(),同时也允许用户根据他们的特定用例“映射”这些值。虽然简单的路由(如上面所示)实际上不需要利用返回Future所提供的灵活性,但返回Future的选项允许进行更复杂的状态路由。使用状态路由来去重传入的Stream的示例可以在dedup.rs示例中找到。

许可证

Apache License, Version 2.0下授权

依赖关系

~1MB
~16K SLoC