4个版本
0.3.0 | 2023年3月18日 |
---|---|
0.2.2 | 2022年12月28日 |
0.2.1 | 2022年12月28日 |
0.2.0 | 2022年12月28日 |
#404 在 异步
44,125 每月下载量
在 12 个crates中使用(通过 nextest-runner)
52KB
732 行
future-queue
future_queue
提供了运行多个futures的方法
- 并发
- 按它们启动的顺序
- 具有全局限制
- 并且可以为每个futures指定一个可选的组,它有自己的限制。
此crate是GitHub上的nextest组织的一部分,旨在满足cargo-nextest的需求。
动机
Rust中的异步编程通常使用一个名为 buffer_unordered
的适配器:此适配器接受一个futures流,并将所有futures限制在最大并发量内执行。
- 按流返回它们的顺序启动futures。
- 一旦启动,futures将同时轮询,并且完成的future输出将以任意顺序返回(因此称为
unordered
)。
buffer_unordered
的常见用例包括
- 并发发送网络请求,但限制并发量以避免压倒远程服务器。
- 使用cargo-nextest等工具运行测试。
buffer_unordered
对许多用例都很好用。然而,它有一个问题,就是它将所有futures视为同等负担:没有办法说有些futures比其他futures消耗更多的资源,或者某些futures子集应该与其他子集相互排斥。
对于nextest来说,特别是有些测试可能比其他测试重得多,应该同时运行较少的这些测试。此外,某些测试可能需要相互排除,或者对它们施加其他并发限制。
[^1]: 此适配器接受一个futures流以实现最大通用性。实际上这通常是一个futures的 迭代器,通过 stream::iter
进行转换。
关于此crate
此crate为流提供了两个适配器。
1. future_queue
适配器
future_queue
适配器可以同时运行多个未来,将并发限制在最大 权重 内。
此适配器不是接受一系列未来,而是接受一系列 (usize, future)
对,其中 usize
表示每个未来的权重。此适配器将安排和缓冲要运行的未来,直到队列下一个未来将超过最大权重。
- 在运行未来时,最大权重永远不会超过。
- 如果单个未来的权重大于最大权重,其权重将设置为最大权重。
一旦所有可能未来的调度完成,此适配器将等待直到一些当前正在执行的未来完成,并且当前运行未来的权重低于最大权重,然后再安排新的未来。
未来的权重可以是零,在这种情况下,它不计入最大权重。
如果所有权重都是 1,则 future_queue
与 buffer_unordered
完全相同。
示例
use futures::{channel::oneshot, stream, StreamExt as _};
use future_queue::{StreamExt as _};
let (send_one, recv_one) = oneshot::channel();
let (send_two, recv_two) = oneshot::channel();
let stream_of_futures = stream::iter(vec![(1, recv_one), (2, recv_two)]);
let mut queue = stream_of_futures.future_queue(10);
send_two.send("hello")?;
assert_eq!(queue.next().await, Some(Ok("hello")));
send_one.send("world")?;
assert_eq!(queue.next().await, Some(Ok("world")));
assert_eq!(queue.next().await, None);
2. future_queue_grouped
适配器
future_queue_grouped
适配器类似于 future_queue
,不同之处在于可以为每个未来指定一个可选的 组。每个组都有一个最大权重,并且只有在最大权重和组权重都没有超过的情况下才会安排未来。
在给定的约束条件下,适配器尽可能地公平:它将按照流返回未来的顺序安排未来,而不根据权重进行任何重新排序。当一个组的未来完成时,该组中的排队未来将优先于提供流中的任何其他未来进行安排。
与 future_queue
相同
- 在运行未来时,最大全局和组权重永远不会超过。
- 在计算全局权重时,如果单个未来的权重大于最大权重,其权重将设置为最大权重。
- 如果未来属于一个组: 在计算组权重时,如果其权重大于最大组权重,其权重将设置为最大组权重。
示例
use futures::{channel::oneshot, stream, StreamExt as _};
use future_queue::{StreamExt as _};
let (send_one, recv_one) = oneshot::channel();
let (send_two, recv_two) = oneshot::channel();
let stream_of_futures = stream::iter(
vec![
(1, Some("group1"), recv_one),
(2, None, recv_two),
],
);
let mut queue = stream_of_futures.future_queue_grouped(10, [("group1", 5)]);
send_two.send("hello")?;
assert_eq!(queue.next().await, Some(Ok("hello")));
send_one.send("world")?;
assert_eq!(queue.next().await, Some(Ok("world")));
assert_eq!(queue.next().await, None);
最低支持 Rust 版本 (MSRV)
最低支持 Rust 版本是 Rust 1.56。
在短期内,MSRV 很可能不会改变,但在本 crate 是预发布 (0.x.x) 期间,它可能在补丁版本中提高其 MSRV。一旦本 crate 达到 1.x,任何 MSRV 的提高都将伴随着新的次要版本。
注意
本 crate 以前被称为 buffer-unordered-weighted
。它被重命名为 future-queue
,以便更准确地描述 crate 的功能,而不是其实现方式。
贡献
有关如何帮助的说明,请参阅 CONTRIBUTING 文件。
许可证
本项目可在 Apache 2.0 许可证 或 MIT 许可证 的条款下获得。
代码源自 futures-rs,并使用 Apache 2.0 和 MIT 许可证。
依赖关系
~775KB
~14K SLoC