3个版本
0.1.2 | 2022年11月1日 |
---|---|
0.1.1 | 2022年10月29日 |
0.1.0 | 2022年10月28日 |
#239 in #stream
1,137 每月下载量
23KB
230 行
buffer-unordered-weighted
buffer_unordered_weighted
是 buffer_unordered
的一个变体,其中每个future可以分配不同的权重。
该包是GitHub上 nextest 组织 的一部分,旨在满足 cargo-nextest 的需求。
动机
Rust中的异步编程通常使用一个名为 buffer_unordered
的适配器:这个适配器接收一系列future流,并执行所有future,限制最大并发数。
- future的启动顺序与流返回它们的顺序相同。
- 启动后,future将同时轮询,完成future的输出以任意顺序返回(因此名为
unordered
)。
buffer_unordered
的常见用途包括
- 并发发送网络请求,但限制并发量以避免压倒远程服务器。
- 使用像 cargo-nextest 这样的工具运行测试。
buffer_unordered
对于许多用例来说都很好。然而,它有一个问题,就是将所有future视为同等负担:无法表示某些future比其他future消耗更多资源。对于nextest来说尤其如此,一些测试可能比其他测试重得多,应该同时运行较少的这些测试。
[^1]: 此适配器接受一系列future以提高最大通用性。在实践中,这通常是一个future的 迭代器,通过 stream::iter
转换。
关于此包
此包提供了一种在流上名为 buffer_unordered_weighted
的适配器,可以同时运行多个future,限制并发数达到最大 权重。
此适配器接收的是一系列 (usize, future)
对,其中 usize
表示每个future的权重。此适配器将调度和缓冲要运行的future,直到超过最大权重。一旦发生这种情况,此适配器将等待一些当前正在执行的未来完成,并且正在执行的未来的当前权重低于最大权重,然后调度新的future。
请注意,在某些情况下,当前权重可能超过最大权重。例如
- 假设最大权重是 24,当前权重是 20。
- 如果下一个未来的权重小于6,则将进行调度,当前权重将变为26。
- 只有在当前权重降至23或以下时,才会安排新的未来。
可能存在这种适配器的变体,它始终保持在限制之下,并将下一个未来保持待定状态;然而,该变体的实现更为复杂,并且也不是nextest所期望的行为。这个变体可能在未来提供。
未来的权重可以是零,在这种情况下,它不会计入最大权重。
如果所有权重都是1,则buffer_unordered_weighted
与buffer_unordered
完全相同。
示例
use futures::{channel::oneshot, stream, StreamExt as _};
use buffer_unordered_weighted::{StreamExt as _};
let (send_one, recv_one) = oneshot::channel();
let (send_two, recv_two) = oneshot::channel();
let stream_of_futures = stream::iter(vec![(1, recv_one), (2, recv_two)]);
let mut buffered = stream_of_futures.buffer_unordered_weighted(10);
send_two.send("hello")?;
assert_eq!(buffered.next().await, Some(Ok("hello")));
send_one.send("world")?;
assert_eq!(buffered.next().await, Some(Ok("world")));
assert_eq!(buffered.next().await, None);
最小支持的Rust版本(MSRV)
最小支持的Rust版本是Rust 1.56。
在中等期限内,MSRV可能不会改变,但在此crate为预发布(0.x.x)时,其MSRV可能在补丁版本中提升。一旦此crate达到1.x,任何MSRV的提升都将伴随着新的次要版本。
贡献
有关如何帮助的说明,请参阅CONTRIBUTING文件。
许可证
本项目可根据Apache 2.0许可证或MIT许可证的条款使用。
代码源自futures-rs,并在此Apache 2.0和MIT许可证下使用。
依赖关系
~760KB
~14K SLoC