12 个版本 (5 个破坏性更新)

0.5.2 2021 年 8 月 24 日
0.5.1 2021 年 4 月 2 日
0.5.0 2021 年 3 月 31 日
0.3.0 2020 年 12 月 31 日

#528 in 并发

Download history · Rust 包仓库 1741/week @ 2024-03-14 · Rust 包仓库 2026/week @ 2024-03-21 · Rust 包仓库 1657/week @ 2024-03-28 · Rust 包仓库 2228/week @ 2024-04-04 · Rust 包仓库 2464/week @ 2024-04-11 · Rust 包仓库 1843/week @ 2024-04-18 · Rust 包仓库 1619/week @ 2024-04-25 · Rust 包仓库 1580/week @ 2024-05-02 · Rust 包仓库 1585/week @ 2024-05-09 · Rust 包仓库 1949/week @ 2024-05-16 · Rust 包仓库 1990/week @ 2024-05-23 · Rust 包仓库 1538/week @ 2024-05-30 · Rust 包仓库 1092/week @ 2024-06-06 · Rust 包仓库 2020/week @ 2024-06-13 · Rust 包仓库 2144/week @ 2024-06-20 · Rust 包仓库 2447/week @ 2024-06-27 · Rust 包仓库

每月下载量 7,989
3 个 Crates 中使用 (通过 noxious)

MIT/Apache 许可

30KB
482

bmrng 🪃

Crates.io Documentation Unit Tests Coverage Status Dependency status

一个异步 MPSC 请求-响应通道,适用于 Tokio,您可以向发送者发送响应。受 crossbeam_requests 的启发。

示例

#[tokio::main]
async fn main() {
    let buffer_size = 100;
    let (tx, mut rx) = bmrng::channel::<i32, i32>(buffer_size);
    tokio::spawn(async move {
        while let Ok((input, mut responder)) = rx.recv().await {
            if let Err(err) = responder.respond(input * input) {
                println!("sender dropped the response channel");
            }
        }
    });
    for i in 1..=10 {
        if let Ok(response) = tx.send_receive(i).await {
            println!("Requested {}, got {}", i, response);
            assert_eq!(response, i * i);
        }
    }
}

请求超时

您还可以创建带有请求超时的通道

use tokio::time::{Duration, sleep};
#[tokio::main]
async fn main() {
    let (tx, mut rx) = bmrng::channel_with_timeout::<i32, i32>(100, Duration::from_millis(100));
    tokio::spawn(async move {
        match rx.recv().await {
            Ok((input, mut responder)) => {
                sleep(Duration::from_millis(200)).await;
                let res = responder.respond(input * input);
                assert_eq!(res.is_ok(), true);
            }
            Err(err) => {
                println!("all request senders dropped");
            }
        }
    });
    let response = tx.send_receive(8).await;
    assert_eq!(response, Err(bmrng::error::RequestError::<i32>::RecvTimeoutError));
}

无界通道

还有一个无界替代方案,即 bmrng::unbounded_channel(),其中包含同步 .send() 调用。

依赖项

~2–3MB
~47K SLoC