#future #stream #async #buffer-unordered

future-queue

用于管理包含不同权重的futures的队列的适配器

4个版本

0.3.0 2023年3月18日
0.2.2 2022年12月28日
0.2.1 2022年12月28日
0.2.0 2022年12月28日

#404异步

Download history 11634/week @ 2024-03-14 10379/week @ 2024-03-21 7632/week @ 2024-03-28 10891/week @ 2024-04-04 9629/week @ 2024-04-11 9885/week @ 2024-04-18 9866/week @ 2024-04-25 8516/week @ 2024-05-02 9150/week @ 2024-05-09 9839/week @ 2024-05-16 7643/week @ 2024-05-23 10636/week @ 2024-05-30 10694/week @ 2024-06-06 9007/week @ 2024-06-13 13021/week @ 2024-06-20 8300/week @ 2024-06-27

44,125 每月下载量
12 个crates中使用(通过 nextest-runner

MIT/Apache

52KB
732

future-queue

future-queue on crates.io Documentation (latest release) Documentation (main) Changelog License License

future_queue 提供了运行多个futures的方法

  • 并发
  • 按它们启动的顺序
  • 具有全局限制
  • 并且可以为每个futures指定一个可选的组,它有自己的限制。

此crate是GitHub上的nextest组织的一部分,旨在满足cargo-nextest的需求。

动机

Rust中的异步编程通常使用一个名为 buffer_unordered 的适配器:此适配器接受一个futures流,并将所有futures限制在最大并发量内执行。

  • 按流返回它们的顺序启动futures。
  • 一旦启动,futures将同时轮询,并且完成的future输出将以任意顺序返回(因此称为 unordered)。

buffer_unordered 的常见用例包括

  • 并发发送网络请求,但限制并发量以避免压倒远程服务器。
  • 使用cargo-nextest等工具运行测试。

buffer_unordered 对许多用例都很好用。然而,它有一个问题,就是它将所有futures视为同等负担:没有办法说有些futures比其他futures消耗更多的资源,或者某些futures子集应该与其他子集相互排斥。

对于nextest来说,特别是有些测试可能比其他测试重得多,应该同时运行较少的这些测试。此外,某些测试可能需要相互排除,或者对它们施加其他并发限制。

[^1]: 此适配器接受一个futures流以实现最大通用性。实际上这通常是一个futures的 迭代器,通过 stream::iter 进行转换。

关于此crate

此crate为流提供了两个适配器。

1. future_queue 适配器

future_queue 适配器可以同时运行多个未来,将并发限制在最大 权重 内。

此适配器不是接受一系列未来,而是接受一系列 (usize, future) 对,其中 usize 表示每个未来的权重。此适配器将安排和缓冲要运行的未来,直到队列下一个未来将超过最大权重。

  • 在运行未来时,最大权重永远不会超过。
  • 如果单个未来的权重大于最大权重,其权重将设置为最大权重。

一旦所有可能未来的调度完成,此适配器将等待直到一些当前正在执行的未来完成,并且当前运行未来的权重低于最大权重,然后再安排新的未来。

未来的权重可以是零,在这种情况下,它不计入最大权重。

如果所有权重都是 1,则 future_queuebuffer_unordered 完全相同。

示例

use futures::{channel::oneshot, stream, StreamExt as _};
use future_queue::{StreamExt as _};

let (send_one, recv_one) = oneshot::channel();
let (send_two, recv_two) = oneshot::channel();

let stream_of_futures = stream::iter(vec![(1, recv_one), (2, recv_two)]);
let mut queue = stream_of_futures.future_queue(10);

send_two.send("hello")?;
assert_eq!(queue.next().await, Some(Ok("hello")));

send_one.send("world")?;
assert_eq!(queue.next().await, Some(Ok("world")));

assert_eq!(queue.next().await, None);

2. future_queue_grouped 适配器

future_queue_grouped 适配器类似于 future_queue,不同之处在于可以为每个未来指定一个可选的 。每个组都有一个最大权重,并且只有在最大权重和组权重都没有超过的情况下才会安排未来。

在给定的约束条件下,适配器尽可能地公平:它将按照流返回未来的顺序安排未来,而不根据权重进行任何重新排序。当一个组的未来完成时,该组中的排队未来将优先于提供流中的任何其他未来进行安排。

future_queue 相同

  • 在运行未来时,最大全局和组权重永远不会超过。
  • 在计算全局权重时,如果单个未来的权重大于最大权重,其权重将设置为最大权重。
  • 如果未来属于一个组: 在计算组权重时,如果其权重大于最大组权重,其权重将设置为最大组权重。

示例

use futures::{channel::oneshot, stream, StreamExt as _};
use future_queue::{StreamExt as _};

let (send_one, recv_one) = oneshot::channel();
let (send_two, recv_two) = oneshot::channel();

let stream_of_futures = stream::iter(
    vec![
        (1, Some("group1"), recv_one),
        (2, None, recv_two),
    ],
);
let mut queue = stream_of_futures.future_queue_grouped(10, [("group1", 5)]);

send_two.send("hello")?;
assert_eq!(queue.next().await, Some(Ok("hello")));

send_one.send("world")?;
assert_eq!(queue.next().await, Some(Ok("world")));

assert_eq!(queue.next().await, None);

最低支持 Rust 版本 (MSRV)

最低支持 Rust 版本是 Rust 1.56

在短期内,MSRV 很可能不会改变,但在本 crate 是预发布 (0.x.x) 期间,它可能在补丁版本中提高其 MSRV。一旦本 crate 达到 1.x,任何 MSRV 的提高都将伴随着新的次要版本。

注意

本 crate 以前被称为 buffer-unordered-weighted。它被重命名为 future-queue,以便更准确地描述 crate 的功能,而不是其实现方式。

贡献

有关如何帮助的说明,请参阅 CONTRIBUTING 文件。

许可证

本项目可在 Apache 2.0 许可证MIT 许可证 的条款下获得。

代码源自 futures-rs,并使用 Apache 2.0 和 MIT 许可证。

依赖关系

~775KB
~14K SLoC