#async-channel #multi-consumer #multi-producer #reservations

mpmc-async

具有预留的多生产者、多消费者异步通道

8 个版本

0.1.7 2024 年 4 月 9 日
0.1.6 2024 年 4 月 1 日
0.1.3 2024 年 3 月 18 日

#277 in 并发

Download history 9/week @ 2024-04-13 4/week @ 2024-05-18 1/week @ 2024-05-25 2/week @ 2024-06-29

每月 529 次下载

MIT 许可证

76KB
2K SLoC

mpmc-async

Rust 的具有预留的多生产者、多消费者异步通道。

更多信息请见

许可证

MIT.


lib.rs:

具有预留的多生产者、多消费者异步通道。

示例用法

tokio_test::block_on(async {
    let (tx1, rx1) = mpmc_async::channel(2);

    let task = tokio::spawn(async move {
      let rx2 = rx1.clone();
      assert_eq!(rx1.recv().await.unwrap(), 2);
      assert_eq!(rx2.recv().await.unwrap(), 1);
    });

    let tx2 = tx1.clone();
    let permit = tx1.reserve().await.unwrap();
    tx2.send(1).await.unwrap();
    permit.send(2);

    task.await.unwrap();
});

具有多个发送者和接收者任务更复杂的一个示例

use std::collections::BTreeSet;
use std::ops::DerefMut;
use std::sync::{Arc, Mutex};

tokio_test::block_on(async {
    let (tx, rx) = mpmc_async::channel(1);

    let num_workers = 10;
    let count = 10;
    let mut tasks = Vec::with_capacity(num_workers);

    for i in 0..num_workers {
        let mut tx = tx.clone();
        let task = tokio::spawn(async move {
            for j in 0..count {
                let val = i * count + j;
                tx.reserve().await.expect("no error").send(val);
            }
        });
        tasks.push(task);
    }

    let total = count * num_workers;
    let values = Arc::new(Mutex::new(BTreeSet::new()));

    for _ in 0..num_workers {
        let values = values.clone();
        let rx = rx.clone();
        let task = tokio::spawn(async move {
            for _ in 0..count {
                let val = rx.recv().await.expect("Failed to recv");
                values.lock().unwrap().insert(val);
            }
        });
        tasks.push(task);
    }

    for task in tasks {
        task.await.expect("failed to join task");
    }

    let exp = (0..total).collect::<Vec<_>>();
    let got = std::mem::take(values.lock().unwrap().deref_mut())
        .into_iter()
        .collect::<Vec<_>>();
    assert_eq!(exp, got);
});

无运行时依赖