#stream #clone #fork

fork_stream

克隆任何流 S,其中 <S as Stream>::Item: Clone

1个不稳定版本

0.1.0 2023年9月11日

#772异步

Download history 3/week @ 2024-03-14 5/week @ 2024-03-21 18/week @ 2024-03-28 39/week @ 2024-04-04 22/week @ 2024-04-11 15/week @ 2024-05-23 9/week @ 2024-05-30 18/week @ 2024-06-06 42/week @ 2024-06-13 57/week @ 2024-06-20 31/week @ 2024-06-27

149 每月下载次数
rstreams 中使用

MIT 许可证

29KB
601

fork_stream

克隆任何 Stream S,其中 <S as Stream>::Item: Clone

用法

use fork_stream::StreamExt as _;

async fn example() {
    let source = futures::stream::iter(0..3);

    let fork1 = source.fork();

    let fork2 = fork1.clone();

    assert_eq!(fork1.collect(), vec![0, 1, 2]);

    assert_eq!(fork2.collect(), vec![0, 1, 2]);
}

行为

  1. 从源流中轮询的项目存储在缓冲区中;
  2. 当轮询源流的分支时,它们要么从缓冲区中产生项目的副本,要么轮询源流以获取新项目;
  3. 如果没有分支可以读取项目(无论是分支被丢弃,还是因为所有分支都已经产生了该项目),则丢弃项目;
  4. 尽可能将缓冲区中的项目移动出来而不是克隆。
  5. 当所有分支都丢弃时,源流被丢弃。

弱引用

任何分支都可以降级为 Weak,稍后可以将其升级回,类似于 std::rc::Rcstd::sync::Arc API。

该行为如下:

  1. Weak 不实现 Stream 并且在没有升级之前不能轮询;
  2. 当一个 Weak 升级为一个 Forked 时,结果 Forked 与源流一样先进;即它不会产生任何在升级之前由其他分支产生过的事项。
  3. 如果在升级之前所有分叉都已被丢弃,Weak::upgrade 返回 None

Weak API 在需要重用初始化成本高昂的流,同时也希望在不使用时丢弃它们时非常有用。

shared_stream 的不同之处

此库实现了一个类似于 shared_stream 的 API,但有一些显著的差异

  1. 此库产生的流是 SendSync。因此,我们必须使用支持它的同步原语,这可能会降低性能,但使其更适合异步环境。
  2. shared_stream 会缓冲项目,直到源流的一个克隆存在。此库尽可能快地“垃圾回收”项目。这会带来一些额外的业务逻辑,可能会降低性能,但使其更适合流应长期存在的情况,例如服务器。
  3. 此库提供了一个 Weak API,见上文。

依赖项

~1–1.6MB
~33K SLoC