2 个版本
0.1.6 | 2023 年 12 月 27 日 |
---|---|
0.1.5 | 2023 年 12 月 6 日 |
#70 in 并发
每月下载量 24,031
75KB
791 行
oneshot
Oneshot spsc(单生产者,单消费者)通道。这意味着每个通道实例只能传输一条消息。这有几个很好的结果。其中之一是实现可以非常高效,利用只会有一条消息的知识。但更重要的是,它允许 API 以一种方式表达,这种方式使得在通道上发送单条消息时不需要关心的一些边缘情况不存在。例如:发送者不能被复制或克隆,发送方法接受所有权并消耗发送者。因此,在类型级别上,你可以保证只能发送一条消息。
发送者的发送方法是非阻塞的,并且可能是无锁和无等待的。有关可能不是完全无等待的情况,请参阅 Sender::send 文档。接收者支持无锁和无等待的 try_recv
,以及无限期和有限时间的线程阻塞接收操作。接收者还实现了 Future
并支持异步等待消息。
示例
此示例设置了一个后台工作进程,该进程处理通过标准 mpsc 通道进入的请求,并在每个请求提供的 oneshot 通道上回复。由于 oneshot 接收者可以接收阻塞和非阻塞的,因此可以从同步和异步上下文中与该工作进程交互。
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
type Request = String;
// Starts a background thread performing some computation on requests sent to it.
// Delivers the response back over a oneshot channel.
fn spawn_processing_thread() -> mpsc::Sender<(Request, oneshot::Sender<usize>)> {
let (request_sender, request_receiver) = mpsc::channel::<(Request, oneshot::Sender<usize>)>();
thread::spawn(move || {
for (request_data, response_sender) in request_receiver.iter() {
let compute_operation = || request_data.len();
let _ = response_sender.send(compute_operation()); // <- Send on the oneshot channel
}
});
request_sender
}
let processor = spawn_processing_thread();
// If compiled with `std` the library can receive messages with timeout on regular threads
#[cfg(feature = "std")] {
let (response_sender, response_receiver) = oneshot::channel();
let request = Request::from("data from sync thread");
processor.send((request, response_sender)).expect("Processor down");
match response_receiver.recv_timeout(Duration::from_secs(1)) { // <- Receive on the oneshot channel
Ok(result) => println!("Processor returned {}", result),
Err(oneshot::RecvTimeoutError::Timeout) => eprintln!("Processor was too slow"),
Err(oneshot::RecvTimeoutError::Disconnected) => panic!("Processor exited"),
}
}
// If compiled with the `async` feature, the `Receiver` can be awaited in an async context
#[cfg(feature = "async")] {
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async move {
let (response_sender, response_receiver) = oneshot::channel();
let request = Request::from("data from sync thread");
processor.send((request, response_sender)).expect("Processor down");
match response_receiver.await { // <- Receive on the oneshot channel asynchronously
Ok(result) => println!("Processor returned {}", result),
Err(_e) => panic!("Processor exited"),
}
});
}
同步与异步
编写此库的主要动机是,据我所知,没有(已知于我)的通道实现允许你在正常线程和异步任务之间无缝地发送消息,或者相反。如果消息传递是您通信的方式,当然应该在程序同步和异步部分之间顺利工作!
这个库通过提供一个快速且经济的发送操作来实现这一点,该操作可以在同步线程和异步任务中使用。接收器提供了线程阻塞接收方法,用于同步使用,并实现了Future
以支持异步使用。
该通道的接收端实现了Rust的Future
特质,可以在异步任务中等待。此实现完全与执行器/运行时无关。应该可以使用任何执行器使用此库。
许可:MIT OR Apache-2.0