8 个版本
0.1.7 | 2024 年 4 月 9 日 |
---|---|
0.1.6 | 2024 年 4 月 1 日 |
0.1.3 | 2024 年 3 月 18 日 |
#277 in 并发
每月 529 次下载
76KB
2K SLoC
mpmc-async
Rust 的具有预留的多生产者、多消费者异步通道。
更多信息请见
许可证
MIT.
lib.rs
:
具有预留的多生产者、多消费者异步通道。
示例用法
tokio_test::block_on(async {
let (tx1, rx1) = mpmc_async::channel(2);
let task = tokio::spawn(async move {
let rx2 = rx1.clone();
assert_eq!(rx1.recv().await.unwrap(), 2);
assert_eq!(rx2.recv().await.unwrap(), 1);
});
let tx2 = tx1.clone();
let permit = tx1.reserve().await.unwrap();
tx2.send(1).await.unwrap();
permit.send(2);
task.await.unwrap();
});
具有多个发送者和接收者任务更复杂的一个示例
use std::collections::BTreeSet;
use std::ops::DerefMut;
use std::sync::{Arc, Mutex};
tokio_test::block_on(async {
let (tx, rx) = mpmc_async::channel(1);
let num_workers = 10;
let count = 10;
let mut tasks = Vec::with_capacity(num_workers);
for i in 0..num_workers {
let mut tx = tx.clone();
let task = tokio::spawn(async move {
for j in 0..count {
let val = i * count + j;
tx.reserve().await.expect("no error").send(val);
}
});
tasks.push(task);
}
let total = count * num_workers;
let values = Arc::new(Mutex::new(BTreeSet::new()));
for _ in 0..num_workers {
let values = values.clone();
let rx = rx.clone();
let task = tokio::spawn(async move {
for _ in 0..count {
let val = rx.recv().await.expect("Failed to recv");
values.lock().unwrap().insert(val);
}
});
tasks.push(task);
}
for task in tasks {
task.await.expect("failed to join task");
}
let exp = (0..total).collect::<Vec<_>>();
let got = std::mem::take(values.lock().unwrap().deref_mut())
.into_iter()
.collect::<Vec<_>>();
assert_eq!(exp, got);
});