#stream #aggregate #macro

sync_stream

按顺序轮询流项目,同步多个流的消费,聚合多个流

1 个不稳定版本

0.1.0 2023年7月16日

#1357 in 异步

MIT/Apache

10KB
68

同步流

按顺序轮询流项目,同步多个流的消费。使用流项目的 PartialOrd 实现允许混合流项目类型的排序。

警告

该功能仅在所有收集的流都具有较新项目或已结束时才发出流项目。这意味着如果有流延迟,则其他流项目可能需要更长的时间才能交付。为了缓解这个问题,可以发送无操作事件(只需一个 id 以保持处理一致性)

使用方法

  1. 在 cargo 文件中导入
[dependencies]
sync_stream = "0.1.0"
  1. 通过在事件项目上实现 Ord 来聚合和同步多个流
use async_stream::stream;
use futures::StreamExt;
use rand::random;
use std::{
    cmp::Ordering,
    time::Duration,
};
use tokio::time::sleep;
use sync_stream::sync_stream;

#[derive(Clone, Debug)]
struct Item<T> {
    id: u32,
    value: T,
}

impl<T> Eq for Item<T> {}

//implement ordering for our item
impl<T, B> PartialEq<Item<B>> for Item<T> {
    fn eq(&self, other: &Item<B>) -> bool {
        self.id == other.id
    }
}

impl<T, B> PartialOrd<Item<B>> for Item<T> {
    fn partial_cmp(&self, other: &Item<B>) -> Option<Ordering> {
        self.id.partial_cmp(&other.id)
    }
}

impl<T> Ord for Item<T> {
    fn cmp(&self, other: &Self) -> Ordering {
        self.id.cmp(&other.id)
    }
}

async fn delay() {
    sleep(Duration::from_millis(random::<u8>().into())).await;
}

#[tokio::main]
async fn main() {
    let a = stream! {
        delay().await;
        yield Item { id: 1, value: 100 };
        delay().await;
        yield Item { id: 6, value: 200 };
        delay().await;
        yield Item { id: 8, value: 100 };
        delay().await;
        yield Item { id: 9, value: 300 };
        delay().await;
        yield Item { id: 10, value: 100 };
        delay().await;
        yield Item { id: 18, value: 900 };
        delay().await;
    };
    let b = stream! {
        delay().await;
        yield Item { id: 2, value: "a" };
        delay().await;
        yield Item { id: 4, value: "z" };
        delay().await;
        yield Item { id: 14, value: "r" };
        delay().await;
        yield Item { id: 23, value: "c" };
        delay().await;
    };
    let c = stream! {
        delay().await;
        yield Item { id: 3, value: 'p' };
        delay().await;
        yield Item { id: 5, value: 'c' };
        delay().await;
        yield Item { id: 17, value: 'd' };
        delay().await;
        yield Item { id: 19, value: 'w' };
        delay().await;
    };

    //our three stream items will be emitted ordered by the id in our stream items
    sync_stream!(a, b, c)
        .for_each(|(a, b, c)| async move {
            println!("{a:?},{b:?},{c:?}");
        })
        .await;
}

依赖关系

~1–1.7MB
~34K SLoC