7 个版本
使用旧的 Rust 2015
0.5.8 | 2017年10月10日 |
---|---|
0.5.7 | 2017年9月29日 |
#1071 in 异步
43KB
448 行
reqchan-rs
定义了一个用于请求和接收数据的通道。
简介
每个通道只有一个请求端,但可以有多个响应端。它适用于实现工作分担。
通道的两端是异步的,所以它有点非阻塞。然而,如果多个响应端试图对同一个请求做出响应,只有一个会成功;其余的将返回错误。
设计
概述
reqchan
是围绕通道的两部分构建的:Requester
和 Responder
。两者都实现了方法,Requester::try_request()
和 Responder::try_respond()
,成功时锁定它们相应的通道端并返回合约。 RequestContract 需要 用户成功接收数据或取消请求。
ResponseContract 需要用户发送数据。这些要求防止通过通道发送的数据丢失。
锁定
Responder::try_response()
锁定响应端以防止其他潜在响应者对同一请求做出响应。然而,Requester::try_request()
锁定请求端以防止用户尝试发出多个挂起请求。当对应的合约对象被丢弃时,这两个锁都会被释放。
合约
请求者::try_request()
必须发出一个RequestContract
,以避免执行线程在等待响应时阻塞。然而,这个原因不适用于响应者::try_response()
。我最初让响应者::try_response()
发送数据。然而,这要求用户即使在数据无法发送的情况下也要有数据可供发送,并且如果数据无法发送,用户还需要处理返回的数据。如果数据是,比如说,一个Vec
内容的一半,这可能会涉及大量的昂贵的内存分配。因此,我让响应者::try_response()
返回一个ResponseContract
,表明响应者可以并且将会对请求做出响应。这样,用户只需在必要时执行发送数据的必要步骤。
示例
简单示例
这个简单、单线程的示例演示了大部分API。唯一遗漏的是RequestContract::try_cancel()
。
extern crate reqchan;
fn main() {
// Create channel.
let (requester, responder) = reqchan::channel::<u32>();
// Issue request.
let mut request_contract = requester.try_request().unwrap();
// Respond with number.
responder.try_respond().unwrap().send(5);
// Receive and print number.
println!("Number is {}", request_contract.try_receive().unwrap());
}
更复杂的示例
这个更复杂的示例演示了更多“现实世界”的使用。一个线程请求一个“任务”(即一个运行闭包),而另外两个线程争夺谁可以得到自己的个人任务。同时,请求线程正在轮询任务,如果及时得到一个任务,它就会运行。无论接收者是否收到任务或超时,接收者都会通知其他线程停止运行,并停止自己。
extern crate reqchan as chan;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::thread;
use std::time::{Duration, Instant};
// Stuff to make it easier to pass around closures.
trait FnBox {
fn call_box(self: Box<Self>);
}
impl<F: FnOnce()> FnBox for F {
fn call_box(self: Box<F>) {
(*self)()
}
}
type Task = Box<FnBox + Send + 'static>;
fn main() {
// Variable used to test calling a `Task` sent between threads.
let test_var = Arc::new(AtomicUsize::new(0));
let test_var2 = test_var.clone();
let test_var3 = test_var.clone();
// Variable needed to stop `responder` thread if `requester` times out
let should_exit = Arc::new(AtomicBool::new(false));
let should_exit_copy_1 = should_exit.clone();
let should_exit_copy_2 = should_exit.clone();
let (requester, responder) = chan::channel::<Task>();
let responder2 = responder.clone();
// requesting thread
let requester_handle = thread::spawn(move || {
let start_time = Instant::now();
let timeout = Duration::new(0, 1000000);
let mut contract = requester.try_request().unwrap();
loop {
// Try to cancel request and stop threads if runtime
// has exceeded `timeout`.
if start_time.elapsed() >= timeout {
// Try to cancel request.
// This should only fail if `responder` has started responding.
if contract.try_cancel() {
// Notify other threads to stop.
should_exit.store(true, Ordering::SeqCst);
break;
}
}
// Try getting `task` from `responder`.
match contract.try_receive() {
// `contract` received `task`.
Ok(task) => {
task.call_box();
// Notify other threads to stop.
should_exit.store(true, Ordering::SeqCst);
break;
},
// Continue looping if `responder` has not yet sent `task`.
Err(chan::TryReceiveError::Empty) => {},
// The only other error is `chan::TryReceiveError::Done`.
// This only happens if we call `contract.try_receive()`
// after either receiving data or cancelling the request.
_ => unreachable!(),
}
}
});
// responding thread 1
let responder_1_handle = thread::spawn(move || {
let mut tasks = vec![Box::new(move || {
test_var2.fetch_add(1, Ordering::SeqCst);
}) as Task];
loop {
// Exit loop if `receiver` has timed out.
if should_exit_copy_1.load(Ordering::SeqCst) {
break;
}
// Send `task` to `receiver` if it has issued a request.
match responder2.try_respond() {
// `responder2` can respond to request.
Ok(contract) => {
contract.send(tasks.pop().unwrap());
break;
},
// Either `requester` has not yet made a request,
// or `responder2` already handled the request.
Err(chan::TryRespondError::NoRequest) => {},
// `responder2` is processing request..
Err(chan::TryRespondError::Locked) => { break; },
}
}
});
// responding thread 2
let responder_2_handle = thread::spawn(move || {
let mut tasks = vec![Box::new(move || {
test_var3.fetch_add(2, Ordering::SeqCst);
}) as Task];
loop {
// Exit loop if `receiver` has timed out.
if should_exit_copy_2.load(Ordering::SeqCst) {
break;
}
// Send `task` to `receiver` if it has issued a request.
match responder.try_respond() {
// `responder2` can respond to request.
Ok(contract) => {
contract.send(tasks.pop().unwrap());
break;
},
// Either `requester` has not yet made a request,
// or `responder` already handled the request.
Err(chan::TryRespondError::NoRequest) => {},
// `responder` is processing request.
Err(chan::TryRespondError::Locked) => { break; },
}
}
});
requester_handle.join().unwrap();
responder_1_handle.join().unwrap();
responder_2_handle.join().unwrap();
// `num` can be 0, 1 or 2.
let num = test_var.load(Ordering::SeqCst);
println!("Number is {}", num);
}
平台
reqchan-rs
应在Windows和任何POSIX兼容的系统(Linux、Mac OSX等)上运行。
reqchan-rs
在以下平台上持续测试:
x86_64-unknown-linux-gnu
(Linux)i686-unknown-linux-gnu
x86_64-unknown-linux-musl
(Linux w/ MUSL)i686-unknown-linux-musl
x86_64-apple-darwin
(Mac OSX)i686-apple-darwin
x86_64-pc-windows-msvc
(Windows)i686-pc-windows-msvc
x86_64-pc-windows-gnu
i686-pc-windows-gnu
reqchan-rs
持续交叉编译为以下平台:
arm-unknown-linux-gnueabihf
aarch64-unknown-linux-gnu
mips-unknown-linux-gnu
aarch64-unknown-linux-musl
i686-linux-android
x86_64-linux-android
arm-linux-androideabi
aarch64-linux-android
i386-apple-ios
x86_64-apple-ios
i686-unknown-freebsd
x86_64-unknown-freebsd
x86_64-unknown-netbsd
asmjs-unknown-emscripten