#future #stream #low-memory #capacity #fixed #thread-safe #concurrency

futures-buffered

注重性能和低内存使用的future并发原语

9个版本

0.2.8 2024年8月7日
0.2.7 2024年8月7日
0.2.6 2024年5月16日
0.2.4 2023年1月9日
0.1.0 2022年10月29日

#55 in 异步

Download history 1737/week @ 2024-05-02 1418/week @ 2024-05-09 2874/week @ 2024-05-16 1560/week @ 2024-05-23 1947/week @ 2024-05-30 4214/week @ 2024-06-06 8481/week @ 2024-06-13 10210/week @ 2024-06-20 10126/week @ 2024-06-27 10352/week @ 2024-07-04 11137/week @ 2024-07-11 11662/week @ 2024-07-18 13792/week @ 2024-07-25 15384/week @ 2024-08-01 16869/week @ 2024-08-08 12795/week @ 2024-08-15

60,942 每月下载量
用于 138 开源软件包 (直接使用9个)

MIT 协议

145KB
2.5K SLoC

futures-buffered

本项目提供几种基于 FuturesUnorderedBounded 原语的未来结构。

futures::FuturesUnordered 类似,这是一个线程安全、Pin 亲和性、生命周期友好的并发处理流。

这个原语与 FuturesUnordered 不同之处在于,FuturesUnorderedBounded 具有固定的处理容量。这意味着它的灵活性较低,但内存效率更高。

然而,我们也提供了一个 FuturesUnordered,它将自动分配更大的 FuturesUnorderedBounded 来缓解这些不灵活性。这基于三角形数组概念来分摊分配成本(类似于Vec),同时不违反 Pin 约束。

基准测试

速度

在一个单线程tokio运行时中,运行65536个100us定时器,同时有256个并发作业

FuturesUnorderedBounded    [339.9 ms  364.7 ms  380.6 ms]
futures::FuturesUnordered  [377.4 ms  391.4 ms  406.3 ms]
                           [min         mean         max]

内存使用

运行512000个 Ready<i32> future,同时有256个并发作业。

  • count: alloc/dealloc 被调用的次数
  • alloc: 分配的字节总数
  • dealloc: 释放的字节总数
futures::FuturesUnordered
    count:    1,024,004
    alloc:    40.96 MB
    dealloc:  40.96 MB

FuturesUnorderedBounded
    count:    4
    alloc:    8.28 KB
    dealloc:  8.28 KB

结论

如你所见,FuturesUnorderedBounded 在提供微小性能提升的同时,大幅减少了你的内存开销。非常适合如果你需要一个固定批量大小的情况

示例

// create a tcp connection
let stream = TcpStream::connect("example.com:80").await?;

// perform the http handshakes
let (mut rs, conn) = conn::handshake(stream).await?;
runtime.spawn(conn);

/// make http request to example.com and read the response
fn make_req(rs: &mut SendRequest<Body>) -> ResponseFuture {
    let req = Request::builder()
        .header("Host", "example.com")
        .method("GET")
        .body(Body::from(""))
        .unwrap();
    rs.send_request(req)
}

// create a queue that can hold 128 concurrent requests
let mut queue = FuturesUnorderedBounded::new(128);

// start up 128 requests
for _ in 0..128 {
    queue.push(make_req(&mut rs));
}
// wait for a request to finish and start another to fill its place - up to 1024 total requests
for _ in 128..1024 {
    queue.next().await;
    queue.push(make_req(&mut rs));
}
// wait for the tail end to finish
for _ in 0..128 {
    queue.next().await;
}
use futures_buffered::join_all;

async fn foo(i: u32) -> u32 { i }

let futures = vec![foo(1), foo(2), foo(3)];

assert_eq!(join_all(futures).await, [1, 2, 3]);

依赖

~0.1–24MB
~335K SLoC