#tokio #async #mpsc #request #unbounded-channel

bmrng

异步 MPSC 请求-响应通道,适用于 Tokio

12 个版本 (5 个破坏性更新)

0.5.2 2021 年 8 月 24 日
0.5.1 2021 年 4 月 2 日
0.5.0 2021 年 3 月 31 日
0.3.0 2020 年 12 月 31 日

#528 in 并发

Download history 1741/week @ 2024-03-14 2026/week @ 2024-03-21 1657/week @ 2024-03-28 2228/week @ 2024-04-04 2464/week @ 2024-04-11 1843/week @ 2024-04-18 1619/week @ 2024-04-25 1580/week @ 2024-05-02 1585/week @ 2024-05-09 1949/week @ 2024-05-16 1990/week @ 2024-05-23 1538/week @ 2024-05-30 1092/week @ 2024-06-06 2020/week @ 2024-06-13 2144/week @ 2024-06-20 2447/week @ 2024-06-27

每月下载量 7,989
3 个 Crates 中使用 (通过 noxious)

MIT/Apache 许可

30KB
482

bmrng 🪃

Crates.io Documentation Unit Tests Coverage Status Dependency status

一个异步 MPSC 请求-响应通道,适用于 Tokio,您可以向发送者发送响应。受 crossbeam_requests 的启发。

示例

#[tokio::main]
async fn main() {
    let buffer_size = 100;
    let (tx, mut rx) = bmrng::channel::<i32, i32>(buffer_size);
    tokio::spawn(async move {
        while let Ok((input, mut responder)) = rx.recv().await {
            if let Err(err) = responder.respond(input * input) {
                println!("sender dropped the response channel");
            }
        }
    });
    for i in 1..=10 {
        if let Ok(response) = tx.send_receive(i).await {
            println!("Requested {}, got {}", i, response);
            assert_eq!(response, i * i);
        }
    }
}

请求超时

您还可以创建带有请求超时的通道

use tokio::time::{Duration, sleep};
#[tokio::main]
async fn main() {
    let (tx, mut rx) = bmrng::channel_with_timeout::<i32, i32>(100, Duration::from_millis(100));
    tokio::spawn(async move {
        match rx.recv().await {
            Ok((input, mut responder)) => {
                sleep(Duration::from_millis(200)).await;
                let res = responder.respond(input * input);
                assert_eq!(res.is_ok(), true);
            }
            Err(err) => {
                println!("all request senders dropped");
            }
        }
    });
    let response = tx.send_receive(8).await;
    assert_eq!(response, Err(bmrng::error::RequestError::<i32>::RecvTimeoutError));
}

无界通道

还有一个无界替代方案,即 bmrng::unbounded_channel(),其中包含同步 .send() 调用。

依赖项

~2–3MB
~47K SLoC