#future #stream #async #networking #buffer-unordered

buffer-unordered-weighted

Stream::buffer_unordered,其中每个future可以有不同的权重

3个版本

0.1.2 2022年11月1日
0.1.1 2022年10月29日
0.1.0 2022年10月28日

#239 in #stream

Download history 29/week @ 2024-03-13 25/week @ 2024-03-20 42/week @ 2024-03-27 52/week @ 2024-04-03 21/week @ 2024-04-10 17/week @ 2024-04-17 16/week @ 2024-04-24 10/week @ 2024-05-01 12/week @ 2024-05-08 26/week @ 2024-05-15 12/week @ 2024-05-22 10/week @ 2024-05-29 20/week @ 2024-06-05 48/week @ 2024-06-12 506/week @ 2024-06-19 560/week @ 2024-06-26

1,137 每月下载量

MIT/Apache

23KB
230

buffer-unordered-weighted

buffer-unordered-weighted on crates.io Documentation (latest release) Documentation (main) Changelog License License

buffer_unordered_weightedbuffer_unordered 的一个变体,其中每个future可以分配不同的权重。

该包是GitHub上 nextest 组织 的一部分,旨在满足 cargo-nextest 的需求。

动机

Rust中的异步编程通常使用一个名为 buffer_unordered 的适配器:这个适配器接收一系列future流,并执行所有future,限制最大并发数。

  • future的启动顺序与流返回它们的顺序相同。
  • 启动后,future将同时轮询,完成future的输出以任意顺序返回(因此名为 unordered)。

buffer_unordered 的常见用途包括

  • 并发发送网络请求,但限制并发量以避免压倒远程服务器。
  • 使用像 cargo-nextest 这样的工具运行测试。

buffer_unordered 对于许多用例来说都很好。然而,它有一个问题,就是将所有future视为同等负担:无法表示某些future比其他future消耗更多资源。对于nextest来说尤其如此,一些测试可能比其他测试重得多,应该同时运行较少的这些测试。

[^1]: 此适配器接受一系列future以提高最大通用性。在实践中,这通常是一个future的 迭代器,通过 stream::iter 转换。

关于此包

此包提供了一种在流上名为 buffer_unordered_weighted 的适配器,可以同时运行多个future,限制并发数达到最大 权重

此适配器接收的是一系列 (usize, future) 对,其中 usize 表示每个future的权重。此适配器将调度和缓冲要运行的future,直到超过最大权重。一旦发生这种情况,此适配器将等待一些当前正在执行的未来完成,并且正在执行的未来的当前权重低于最大权重,然后调度新的future。

请注意,在某些情况下,当前权重可能超过最大权重。例如

  • 假设最大权重是 24,当前权重是 20
  • 如果下一个未来的权重小于6,则将进行调度,当前权重将变为26
  • 只有在当前权重降至23或以下时,才会安排新的未来。

可能存在这种适配器的变体,它始终保持在限制之下,并将下一个未来保持待定状态;然而,该变体的实现更为复杂,并且也不是nextest所期望的行为。这个变体可能在未来提供。

未来的权重可以是零,在这种情况下,它不会计入最大权重。

如果所有权重都是1,则buffer_unordered_weightedbuffer_unordered完全相同。

示例

use futures::{channel::oneshot, stream, StreamExt as _};
use buffer_unordered_weighted::{StreamExt as _};

let (send_one, recv_one) = oneshot::channel();
let (send_two, recv_two) = oneshot::channel();

let stream_of_futures = stream::iter(vec![(1, recv_one), (2, recv_two)]);
let mut buffered = stream_of_futures.buffer_unordered_weighted(10);

send_two.send("hello")?;
assert_eq!(buffered.next().await, Some(Ok("hello")));

send_one.send("world")?;
assert_eq!(buffered.next().await, Some(Ok("world")));

assert_eq!(buffered.next().await, None);

最小支持的Rust版本(MSRV)

最小支持的Rust版本是Rust 1.56

在中等期限内,MSRV可能不会改变,但在此crate为预发布(0.x.x)时,其MSRV可能在补丁版本中提升。一旦此crate达到1.x,任何MSRV的提升都将伴随着新的次要版本。

贡献

有关如何帮助的说明,请参阅CONTRIBUTING文件。

许可证

本项目可根据Apache 2.0许可证MIT许可证的条款使用。

代码源自futures-rs,并在此Apache 2.0和MIT许可证下使用。

依赖关系

~760KB
~14K SLoC