12 个版本 (5 个破坏性更新)
0.5.2 | 2021 年 8 月 24 日 |
---|---|
0.5.1 | 2021 年 4 月 2 日 |
0.5.0 | 2021 年 3 月 31 日 |
0.3.0 | 2020 年 12 月 31 日 |
#528 in 并发
每月下载量 7,989
在 3 个 Crates 中使用 (通过 noxious)
30KB
482 行
bmrng 🪃
一个异步 MPSC 请求-响应通道,适用于 Tokio,您可以向发送者发送响应。受 crossbeam_requests 的启发。
示例
#[tokio::main]
async fn main() {
let buffer_size = 100;
let (tx, mut rx) = bmrng::channel::<i32, i32>(buffer_size);
tokio::spawn(async move {
while let Ok((input, mut responder)) = rx.recv().await {
if let Err(err) = responder.respond(input * input) {
println!("sender dropped the response channel");
}
}
});
for i in 1..=10 {
if let Ok(response) = tx.send_receive(i).await {
println!("Requested {}, got {}", i, response);
assert_eq!(response, i * i);
}
}
}
请求超时
您还可以创建带有请求超时的通道
use tokio::time::{Duration, sleep};
#[tokio::main]
async fn main() {
let (tx, mut rx) = bmrng::channel_with_timeout::<i32, i32>(100, Duration::from_millis(100));
tokio::spawn(async move {
match rx.recv().await {
Ok((input, mut responder)) => {
sleep(Duration::from_millis(200)).await;
let res = responder.respond(input * input);
assert_eq!(res.is_ok(), true);
}
Err(err) => {
println!("all request senders dropped");
}
}
});
let response = tx.send_receive(8).await;
assert_eq!(response, Err(bmrng::error::RequestError::<i32>::RecvTimeoutError));
}
无界通道
还有一个无界替代方案,即 bmrng::unbounded_channel()
,其中包含同步 .send()
调用。
依赖项
~2–3MB
~47K SLoC