1 个不稳定版本
0.1.0 | 2023年7月16日 |
---|
#1357 in 异步
10KB
68 行
同步流
按顺序轮询流项目,同步多个流的消费。使用流项目的 PartialOrd 实现允许混合流项目类型的排序。
警告
该功能仅在所有收集的流都具有较新项目或已结束时才发出流项目。这意味着如果有流延迟,则其他流项目可能需要更长的时间才能交付。为了缓解这个问题,可以发送无操作事件(只需一个 id 以保持处理一致性)
使用方法
- 在 cargo 文件中导入
[dependencies]
sync_stream = "0.1.0"
- 通过在事件项目上实现 Ord 来聚合和同步多个流
use async_stream::stream;
use futures::StreamExt;
use rand::random;
use std::{
cmp::Ordering,
time::Duration,
};
use tokio::time::sleep;
use sync_stream::sync_stream;
#[derive(Clone, Debug)]
struct Item<T> {
id: u32,
value: T,
}
impl<T> Eq for Item<T> {}
//implement ordering for our item
impl<T, B> PartialEq<Item<B>> for Item<T> {
fn eq(&self, other: &Item<B>) -> bool {
self.id == other.id
}
}
impl<T, B> PartialOrd<Item<B>> for Item<T> {
fn partial_cmp(&self, other: &Item<B>) -> Option<Ordering> {
self.id.partial_cmp(&other.id)
}
}
impl<T> Ord for Item<T> {
fn cmp(&self, other: &Self) -> Ordering {
self.id.cmp(&other.id)
}
}
async fn delay() {
sleep(Duration::from_millis(random::<u8>().into())).await;
}
#[tokio::main]
async fn main() {
let a = stream! {
delay().await;
yield Item { id: 1, value: 100 };
delay().await;
yield Item { id: 6, value: 200 };
delay().await;
yield Item { id: 8, value: 100 };
delay().await;
yield Item { id: 9, value: 300 };
delay().await;
yield Item { id: 10, value: 100 };
delay().await;
yield Item { id: 18, value: 900 };
delay().await;
};
let b = stream! {
delay().await;
yield Item { id: 2, value: "a" };
delay().await;
yield Item { id: 4, value: "z" };
delay().await;
yield Item { id: 14, value: "r" };
delay().await;
yield Item { id: 23, value: "c" };
delay().await;
};
let c = stream! {
delay().await;
yield Item { id: 3, value: 'p' };
delay().await;
yield Item { id: 5, value: 'c' };
delay().await;
yield Item { id: 17, value: 'd' };
delay().await;
yield Item { id: 19, value: 'w' };
delay().await;
};
//our three stream items will be emitted ordered by the id in our stream items
sync_stream!(a, b, c)
.for_each(|(a, b, c)| async move {
println!("{a:?},{b:?},{c:?}");
})
.await;
}
依赖关系
~1–1.7MB
~34K SLoC