#oneshot-channel #channel #spsc #async #single-consumer #sync #producer-consumer

无需std oneshot

Oneshot spsc (单生产者,单消费者) 通道,具有(可能)无锁非阻塞发送,以及支持线程阻塞接收操作以及基于 Future 的异步轮询的接收器

5 个版本

0.1.8 2024年6月13日
0.1.6 2023年9月14日
0.1.5 2022年9月1日
0.1.3 2021年11月23日
0.1.0 2019年5月30日

并发 中排名 11

Download history 72353/week @ 2024-05-03 77857/week @ 2024-05-10 85899/week @ 2024-05-17 72353/week @ 2024-05-24 141323/week @ 2024-05-31 139834/week @ 2024-06-07 125585/week @ 2024-06-14 174791/week @ 2024-06-21 160221/week @ 2024-06-28 137260/week @ 2024-07-05 144492/week @ 2024-07-12 140702/week @ 2024-07-19 145556/week @ 2024-07-26 133714/week @ 2024-08-02 113826/week @ 2024-08-09 94773/week @ 2024-08-16

每月下载量:515,655
用于 216 Crates(直接使用 43 个

MIT/Apache

76KB
804

oneshot

Oneshot spsc (单生产者,单消费者) 通道。这意味着每个通道实例只能传输一条消息。这有几个优点。一方面,实现可以非常高效,利用只有一条消息的知识。但更重要的是,它允许 API 以一种方式表达,当仅在通道上发送一条消息时,某些你不想关注的边缘情况不存在。例如:发送者不能被复制或克隆,发送方法接收所有权并消耗发送者。因此,在类型级别上,可以保证只能发送一条消息。

发送者的发送方法是非阻塞的,并且可能是无锁和无等待的。请参阅 Sender::send 的文档,了解可能不是完全无等待的情况。接收器支持无锁和无等待的 try_recv 以及不定时和定时线程阻塞接收操作。接收器还实现了 Future 并支持异步等待消息。

示例

此示例设置了一个后台工作进程,该进程处理通过标准 mpsc 通道进入的请求,并在每个请求上提供 oneshot 通道进行回复。由于 oneshot 接收器既支持阻塞又支持异步,因此可以从同步和异步上下文中与工作进程交互。

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

type Request = String;

// Starts a background thread performing some computation on requests sent to it.
// Delivers the response back over a oneshot channel.
fn spawn_processing_thread() -> mpsc::Sender<(Request, oneshot::Sender<usize>)> {
    let (request_sender, request_receiver) = mpsc::channel::<(Request, oneshot::Sender<usize>)>();
    thread::spawn(move || {
        for (request_data, response_sender) in request_receiver.iter() {
            let compute_operation = || request_data.len();
            let _ = response_sender.send(compute_operation()); // <- Send on the oneshot channel
        }
    });
    request_sender
}

let processor = spawn_processing_thread();

// If compiled with `std` the library can receive messages with timeout on regular threads
#[cfg(feature = "std")] {
    let (response_sender, response_receiver) = oneshot::channel();
    let request = Request::from("data from sync thread");

    processor.send((request, response_sender)).expect("Processor down");
    match response_receiver.recv_timeout(Duration::from_secs(1)) { // <- Receive on the oneshot channel
        Ok(result) => println!("Processor returned {}", result),
        Err(oneshot::RecvTimeoutError::Timeout) => eprintln!("Processor was too slow"),
        Err(oneshot::RecvTimeoutError::Disconnected) => panic!("Processor exited"),
    }
}

// If compiled with the `async` feature, the `Receiver` can be awaited in an async context
#[cfg(feature = "async")] {
    tokio::runtime::Runtime::new()
        .unwrap()
        .block_on(async move {
            let (response_sender, response_receiver) = oneshot::channel();
            let request = Request::from("data from sync thread");

            processor.send((request, response_sender)).expect("Processor down");
            match response_receiver.await { // <- Receive on the oneshot channel asynchronously
                Ok(result) => println!("Processor returned {}", result),
                Err(_e) => panic!("Processor exited"),
            }
        });
}

同步与异步

编写此库的主要动机是,据我所知,没有(我知道的)通道实现允许您在正常线程和异步任务之间无缝地发送消息,或者相反。当然,如果消息传递是您通信的方式,那么在程序的同步和异步部分之间应该能够顺利工作!

该库通过具有快速且廉价的发送操作来实现这一点,该操作可用于同步线程和异步任务。接收器具有用于同步使用的线程阻塞接收方法,并实现了 Future 以支持异步使用。

该通道的接收端点实现了Rust的Future特质,可以在异步任务中等待。此实现完全不受执行器/运行时的影响。应该可以使用任何执行器使用此库。

许可证:MIT OR Apache-2.0

依赖关系

~0–24MB
~334K SLoC