7 个版本

0.2.0 2023 年 2 月 11 日
0.1.4 2023 年 1 月 29 日
0.1.2 2022 年 12 月 2 日
0.1.1 2022 年 8 月 2 日
0.0.1 2021 年 10 月 28 日

#386异步

Download history 115167/week @ 2024-03-14 133387/week @ 2024-03-21 123363/week @ 2024-03-28 119551/week @ 2024-04-04 119385/week @ 2024-04-11 130285/week @ 2024-04-18 125953/week @ 2024-04-25 125708/week @ 2024-05-02 119146/week @ 2024-05-09 131722/week @ 2024-05-16 130694/week @ 2024-05-23 140831/week @ 2024-05-30 128054/week @ 2024-06-06 137860/week @ 2024-06-13 140943/week @ 2024-06-20 124518/week @ 2024-06-27

557,657 每月下载量
用于 346 个 crate(直接使用 2 个)

MIT/Apache

77KB
1.5K SLoC

产生具有关联排序的元素的流

假设你有一系列事件,这些事件都具有时间戳、序列号或其他排序属性。如果你从多个 Stream 中获取这些事件,只要每个原始流是有序的,你应该能够通过连接每个单独的流来产生一个“复合”流。

但是,如果你实际上实现了这个,你会发现问题在于你需要至少从每个流中缓冲一个元素,以避免在源独立(包括在不同的任务中运行)时发生排序反转。如果其中一个源很少产生事件,那么这个慢速源可以阻止其他所有流,以便处理由于早期元素而不是没有元素而导致的延迟。

OrderedStream trait 提供了一种解决此问题的方法:如果你可以询问一个流是否将会有任何应该在给定事件之前交付的事件,那么你通常可以在数据准备就绪时避免阻塞复合流。

use futures_core::Stream;
use ordered_stream::FromStream;
use ordered_stream::JoinMultiple;
use ordered_stream::OrderedStream;
use ordered_stream::OrderedStreamExt;
use std::pin::Pin;
use std::time::SystemTime;

pub struct Message {
    time: SystemTime,
    level: u8,
    data: String,
    source: String,
}

pub struct RemoteLogSource {
    stream: Pin<Box<dyn Stream<Item = Message>>>,
    min_level: u8,
}

pub async fn display_logs(logs: &mut [RemoteLogSource]) {
    let mut streams: Vec<_> = logs
        .iter_mut()
        .map(|s| {
            let min = s.min_level;
            FromStream::with_ordering(&mut s.stream, |m| m.time)
                .filter(move |m| m.level >= min)
                .peekable()
        })
        .collect();
    let mut joined = JoinMultiple(streams);
    while let Some(msg) = joined.next().await {
        println!("{:?}: {}", msg.time, msg.data);
    }
}

依赖项

~70KB