#channel #requests #chan #data-channel

reqchan

这是一个用于请求和接收数据的通道。每个通道只有一个请求端,但可以有多个响应端。它适用于实现工作分担。通道的两端是异步的,所以它有点非阻塞。然而,如果多个响应端试图对同一个请求做出响应,只有一个会成功;其余的将返回错误。

7 个版本

使用旧的 Rust 2015

0.5.8 2017年10月10日
0.5.7 2017年9月29日

#1071 in 异步

MIT/Apache

43KB
448

reqchan-rs 定义了一个用于请求和接收数据的通道。

文档

Linux Status Build status

简介

每个通道只有一个请求端,但可以有多个响应端。它适用于实现工作分担。

通道的两端是异步的,所以它有点非阻塞。然而,如果多个响应端试图对同一个请求做出响应,只有一个会成功;其余的将返回错误。

设计

概述

reqchan 是围绕通道的两部分构建的:RequesterResponder。两者都实现了方法,Requester::try_request()Responder::try_respond(),成功时锁定它们相应的通道端并返回合约。 RequestContract 需要 用户成功接收数据或取消请求。 ResponseContract 需要用户发送数据。这些要求防止通过通道发送的数据丢失。

锁定

Responder::try_response() 锁定响应端以防止其他潜在响应者对同一请求做出响应。然而,Requester::try_request() 锁定请求端以防止用户尝试发出多个挂起请求。当对应的合约对象被丢弃时,这两个锁都会被释放。

合约

请求者::try_request()必须发出一个RequestContract,以避免执行线程在等待响应时阻塞。然而,这个原因不适用于响应者::try_response()。我最初让响应者::try_response()发送数据。然而,这要求用户即使在数据无法发送的情况下也要有数据可供发送,并且如果数据无法发送,用户还需要处理返回的数据。如果数据是,比如说,一个Vec内容的一半,这可能会涉及大量的昂贵的内存分配。因此,我让响应者::try_response()返回一个ResponseContract,表明响应者可以并且将会对请求做出响应。这样,用户只需在必要时执行发送数据的必要步骤。

示例

简单示例

这个简单、单线程的示例演示了大部分API。唯一遗漏的是RequestContract::try_cancel()

extern crate reqchan;

fn main() {
    // Create channel.
    let (requester, responder) = reqchan::channel::<u32>(); 
    
    // Issue request.
    let mut request_contract = requester.try_request().unwrap();
    
    // Respond with number.
    responder.try_respond().unwrap().send(5);
    
    // Receive and print number.
    println!("Number is {}", request_contract.try_receive().unwrap());
}

更复杂的示例

这个更复杂的示例演示了更多“现实世界”的使用。一个线程请求一个“任务”(即一个运行闭包),而另外两个线程争夺谁可以得到自己的个人任务。同时,请求线程正在轮询任务,如果及时得到一个任务,它就会运行。无论接收者是否收到任务或超时,接收者都会通知其他线程停止运行,并停止自己。

extern crate reqchan as chan;

use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::thread;
use std::time::{Duration, Instant};

// Stuff to make it easier to pass around closures.
trait FnBox {
    fn call_box(self: Box<Self>);
}
impl<F: FnOnce()> FnBox for F {
    fn call_box(self: Box<F>) {
        (*self)()
    }
}
type Task = Box<FnBox + Send + 'static>;

fn main() {
    // Variable used to test calling a `Task` sent between threads.
    let test_var = Arc::new(AtomicUsize::new(0));
    let test_var2 = test_var.clone();
    let test_var3 = test_var.clone();
    
    // Variable needed to stop `responder` thread if `requester` times out
    let should_exit = Arc::new(AtomicBool::new(false));
    let should_exit_copy_1 = should_exit.clone();
    let should_exit_copy_2 = should_exit.clone();
    
    let (requester, responder) = chan::channel::<Task>();
    let responder2 = responder.clone();
    
    // requesting thread
    let requester_handle = thread::spawn(move || {
        let start_time = Instant::now();
        let timeout = Duration::new(0, 1000000);
        
        let mut contract = requester.try_request().unwrap();
    
        loop {
            // Try to cancel request and stop threads if runtime
            // has exceeded `timeout`.
            if start_time.elapsed() >= timeout {
                // Try to cancel request.
                // This should only fail if `responder` has started responding.
                if contract.try_cancel() {
                    // Notify other threads to stop.
                    should_exit.store(true, Ordering::SeqCst);
                    break;
                }
            }
    
            // Try getting `task` from `responder`.
            match contract.try_receive() {
                // `contract` received `task`.
                Ok(task) => {
                    task.call_box();
                    // Notify other threads to stop.
                    should_exit.store(true, Ordering::SeqCst);
                    break;
                },
                // Continue looping if `responder` has not yet sent `task`.
                Err(chan::TryReceiveError::Empty) => {},
                // The only other error is `chan::TryReceiveError::Done`.
                // This only happens if we call `contract.try_receive()`
                // after either receiving data or cancelling the request.
                _ => unreachable!(),
            }
        }
    });
    
    // responding thread 1
    let responder_1_handle = thread::spawn(move || {
        let mut tasks = vec![Box::new(move || {
            test_var2.fetch_add(1, Ordering::SeqCst);
        }) as Task];
        
        loop {
            // Exit loop if `receiver` has timed out.
            if should_exit_copy_1.load(Ordering::SeqCst) {
                break;
            }
            
            // Send `task` to `receiver` if it has issued a request.
            match responder2.try_respond() {
                // `responder2` can respond to request.
                Ok(contract) => {
                    contract.send(tasks.pop().unwrap());
                    break;
                },
                // Either `requester` has not yet made a request,
                // or `responder2` already handled the request.
                Err(chan::TryRespondError::NoRequest) => {},
                // `responder2` is processing request..
                Err(chan::TryRespondError::Locked) => { break; },
            }
        }
    });
    
    // responding thread 2
    let responder_2_handle = thread::spawn(move || {
        let mut tasks = vec![Box::new(move || {
            test_var3.fetch_add(2, Ordering::SeqCst);
        }) as Task];
        
        loop {
            // Exit loop if `receiver` has timed out.
            if should_exit_copy_2.load(Ordering::SeqCst) {
                break;
            }
            
            // Send `task` to `receiver` if it has issued a request.
            match responder.try_respond() {
                // `responder2` can respond to request.
                Ok(contract) => {
                    contract.send(tasks.pop().unwrap());
                    break;
                },
                // Either `requester` has not yet made a request,
                // or `responder` already handled the request.
                Err(chan::TryRespondError::NoRequest) => {},
                // `responder` is processing request.
                Err(chan::TryRespondError::Locked) => { break; },
            }
        }
    });
    
    requester_handle.join().unwrap();
    responder_1_handle.join().unwrap();
    responder_2_handle.join().unwrap();
    
    // `num` can be 0, 1 or 2.
    let num = test_var.load(Ordering::SeqCst);
    println!("Number is {}", num);
}

平台

reqchan-rs应在Windows和任何POSIX兼容的系统(Linux、Mac OSX等)上运行。

reqchan-rs在以下平台上持续测试:

  • x86_64-unknown-linux-gnu(Linux)
  • i686-unknown-linux-gnu
  • x86_64-unknown-linux-musl(Linux w/ MUSL
  • i686-unknown-linux-musl
  • x86_64-apple-darwin(Mac OSX)
  • i686-apple-darwin
  • x86_64-pc-windows-msvc(Windows)
  • i686-pc-windows-msvc
  • x86_64-pc-windows-gnu
  • i686-pc-windows-gnu

reqchan-rs持续交叉编译为以下平台:

  • arm-unknown-linux-gnueabihf
  • aarch64-unknown-linux-gnu
  • mips-unknown-linux-gnu
  • aarch64-unknown-linux-musl
  • i686-linux-android
  • x86_64-linux-android
  • arm-linux-androideabi
  • aarch64-linux-android
  • i386-apple-ios
  • x86_64-apple-ios
  • i686-unknown-freebsd
  • x86_64-unknown-freebsd
  • x86_64-unknown-netbsd
  • asmjs-unknown-emscripten

无运行时依赖