5 个版本
0.1.8 | 2024年6月13日 |
---|---|
0.1.6 | 2023年9月14日 |
0.1.5 | 2022年9月1日 |
0.1.3 |
|
0.1.0 |
|
在 并发 中排名 11
每月下载量:515,655
用于 216 个 Crates(直接使用 43 个)
76KB
804 行
oneshot
Oneshot spsc (单生产者,单消费者) 通道。这意味着每个通道实例只能传输一条消息。这有几个优点。一方面,实现可以非常高效,利用只有一条消息的知识。但更重要的是,它允许 API 以一种方式表达,当仅在通道上发送一条消息时,某些你不想关注的边缘情况不存在。例如:发送者不能被复制或克隆,发送方法接收所有权并消耗发送者。因此,在类型级别上,可以保证只能发送一条消息。
发送者的发送方法是非阻塞的,并且可能是无锁和无等待的。请参阅 Sender::send 的文档,了解可能不是完全无等待的情况。接收器支持无锁和无等待的 try_recv
以及不定时和定时线程阻塞接收操作。接收器还实现了 Future
并支持异步等待消息。
示例
此示例设置了一个后台工作进程,该进程处理通过标准 mpsc 通道进入的请求,并在每个请求上提供 oneshot 通道进行回复。由于 oneshot 接收器既支持阻塞又支持异步,因此可以从同步和异步上下文中与工作进程交互。
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
type Request = String;
// Starts a background thread performing some computation on requests sent to it.
// Delivers the response back over a oneshot channel.
fn spawn_processing_thread() -> mpsc::Sender<(Request, oneshot::Sender<usize>)> {
let (request_sender, request_receiver) = mpsc::channel::<(Request, oneshot::Sender<usize>)>();
thread::spawn(move || {
for (request_data, response_sender) in request_receiver.iter() {
let compute_operation = || request_data.len();
let _ = response_sender.send(compute_operation()); // <- Send on the oneshot channel
}
});
request_sender
}
let processor = spawn_processing_thread();
// If compiled with `std` the library can receive messages with timeout on regular threads
#[cfg(feature = "std")] {
let (response_sender, response_receiver) = oneshot::channel();
let request = Request::from("data from sync thread");
processor.send((request, response_sender)).expect("Processor down");
match response_receiver.recv_timeout(Duration::from_secs(1)) { // <- Receive on the oneshot channel
Ok(result) => println!("Processor returned {}", result),
Err(oneshot::RecvTimeoutError::Timeout) => eprintln!("Processor was too slow"),
Err(oneshot::RecvTimeoutError::Disconnected) => panic!("Processor exited"),
}
}
// If compiled with the `async` feature, the `Receiver` can be awaited in an async context
#[cfg(feature = "async")] {
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async move {
let (response_sender, response_receiver) = oneshot::channel();
let request = Request::from("data from sync thread");
processor.send((request, response_sender)).expect("Processor down");
match response_receiver.await { // <- Receive on the oneshot channel asynchronously
Ok(result) => println!("Processor returned {}", result),
Err(_e) => panic!("Processor exited"),
}
});
}
同步与异步
编写此库的主要动机是,据我所知,没有(我知道的)通道实现允许您在正常线程和异步任务之间无缝地发送消息,或者相反。当然,如果消息传递是您通信的方式,那么在程序的同步和异步部分之间应该能够顺利工作!
该库通过具有快速且廉价的发送操作来实现这一点,该操作可用于同步线程和异步任务。接收器具有用于同步使用的线程阻塞接收方法,并实现了 Future
以支持异步使用。
该通道的接收端点实现了Rust的Future
特质,可以在异步任务中等待。此实现完全不受执行器/运行时的影响。应该可以使用任何执行器使用此库。
许可证:MIT OR Apache-2.0
依赖关系
~0–24MB
~334K SLoC