#stream #broadcast #data-stream #cached #clone #shared

stream-broadcast

运行时独立的广播,仅在无待处理数据时才轮询其底层流

4个版本 (2个破坏性更新)

0.3.0 2024年2月28日
0.2.3 2024年2月28日
0.2.2 2023年7月24日
0.1.1 2023年7月20日

缓存分类中排名第368

Download history 4/week @ 2024-03-25 7/week @ 2024-04-01

每月下载量84

MIT许可MIT

13KB
274 代码行

运行时独立的广播,仅在无待处理数据时才轮询其底层流。

use futures::StreamExt;
use stream_broadcast::StreamBroadcastExt;

#[tokio::main]
async fn main() {
    let broadcast = futures::stream::iter('a'..='d').fuse().broadcast(3);
    let broadcast2 = broadcast.clone();
    assert_eq!(4, broadcast.count().await);
    // Letter 'a' wasn't available anymore due to `broadcast(3)`, which limits the buffer to 3 items
    // Left side of tuple represents number of missed items
    assert_eq!(vec![(1, 'b'), (0, 'c'), (0, 'd')], broadcast2.collect::<Vec<_>>().await);
}

使用#![forbid(unsafe_code)]

与其他库的区别

shared_stream:

  • 从开始缓存整个流,这对大数据集不实用。此crate从克隆源当前位置开始流式传输
  • shared_stream从不跳过任何条目。此库只提供有关缺失数据的信息
  • 内存泄漏风险高

tokio::sync::broadcast:

  • 广播没有直接实现Stream,但tokio_stream提供了一个包装器。
  • 条目被主动推送到发送者(当流暂停时没有懒加载)。这需要一个子程序,需要某种方式来管理。
  • 此库返回一个元组(自上一帧以来缺失的帧,TData),而不是在ErrorVariant(tokio_stream)中返回缺失的帧,以减轻在进行类似stream.count()的操作时的错误。

依赖项

~1–1.6MB
~33K SLoC