4个版本 (1个稳定版)

1.0.0 2021年5月7日
0.9.2 2020年8月10日
0.9.1 2020年8月4日
0.9.0 2020年8月3日

#551 in 异步

MPL-2.0许可证

33KB
240

plumbing

plumbing是一个库,它通过异步请求/响应系统管理管道请求,例如HTTP Keep-Alive连接或通过RedisRedis协议进行交互。

plumbing的核心是Pipeline结构,它管理单个请求/响应连接。此连接由一对SinkStream组成,并应设置,使得每个通过Sink发送的请求最终将通过Stream发送回响应。创建此类对的一个示例是

  • 在tokio中打开TCP连接,获取一个TcpStream
  • 使用TcpStream::into_split将这些流拆分为读取器和写入器
  • 使用tokio_util::codec将这些流包装在你的协议的EncoderDecoder中。这个EncoderDecoder作为PipelineSinkStream

提交给Pipeline的请求将返回一个Resolver,这是一个Future,它将解析为该请求的响应。可以同时存在任意数量的Resolvers,响应将按照它们通过底层Stream到达的顺序依次发送给每个Resolvers。

管道对背压敏感,并且不会进行自己的缓冲,所以如果底层流停止接受请求,提交新的请求将会阻塞。同样,每个Resolver都必须被轮询以检索其响应;后续的Resolver将会在先前的Resolver收到响应(或被丢弃)之前阻塞。根据您的系统,这意味着您可能需要确保发送或刷新端与接收端同时进行轮询。

plumbing当前为#![no_std];它只需要alloc才能正常运行。

示例

此示例使用tokio任务创建一个模拟的单键数据库,然后使用plumbing来管理对其的一些简单写入和读取。

mod fake_db {
    use futures::{channel::mpsc, stream::StreamExt, SinkExt};
    use tokio::task;

    #[derive(Debug)]
    pub struct FakeDb {
        counter: i32,
    }

    #[derive(Debug)]
    pub enum Request {
        Incr(i32),
        Decr(i32),
        Set(i32),
        Get,
    }

    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    pub enum Response {
        Ok,
        Value(i32),
    }

    pub fn create_db() -> (mpsc::Sender<Request>, mpsc::Receiver<Response>) {
        let (send_req, mut recv_req) = mpsc::channel(0);
        let (mut send_resp, recv_resp) = mpsc::channel(0);

        let _task = task::spawn(async move {
            let mut database = FakeDb { counter: 0 };

            while let Some(request) = recv_req.next().await {
                match request {
                    Request::Incr(count) => {
                        database.counter += count;
                        send_resp.send(Response::Ok).await.unwrap();
                    }
                    Request::Decr(count) => {
                        database.counter -= count;
                        send_resp.send(Response::Ok).await.unwrap();
                    }
                    Request::Set(value) => {
                        database.counter = value;
                        send_resp.send(Response::Ok).await.unwrap();
                    }
                    Request::Get => {
                        let response = Response::Value(database.counter);
                        send_resp.send(response).await.unwrap();
                    }
                }
            }
        });

        (send_req, recv_resp)
    }
}

use fake_db::{Request, Response, create_db};
use futures::{
    future,
    sink::SinkExt,
    FutureExt,
};
use plumbing::Pipeline;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let (send, recv) = create_db();
    // Because the send channel can only handle 1 item at a time, we want
    // to buffer requests
    let send = send.buffer(20);

    let mut pipeline = Pipeline::new(send, recv);

    // Basic interaction
    let fut = pipeline.submit(Request::Set(10)).await?;

    // If we're buffering requests or responses, we may need to make sure
    // they both
    let (_, response) = future::join(pipeline.flush(), fut).await;
    assert_eq!(response.unwrap(), Response::Ok);

    let fut = pipeline.submit(Request::Get).await?;
    let (_, response) = future::join(pipeline.flush(), fut).await;
    assert_eq!(response.unwrap(), Response::Value(10));

    // pipeline several requests together
    let write1 = pipeline.submit(Request::Incr(20)).await?;
    let write2 = pipeline.submit(Request::Decr(5)).await?;
    let read = pipeline.submit(Request::Get).await?;

    // We need to make sure all of these are polled
    let (_, _, _, response) = future::join4(pipeline.flush(), write1, write2, read).await;
    assert_eq!(response.unwrap(), Response::Value(25));

    // Alternatively, if we drop the futures returned by submit, the responses
    // associated with them will be silently discarded. We can use this to
    // keep only the responses we're interested in.
    let _ = pipeline.submit(Request::Set(0)).await?;
    let _ = pipeline.submit(Request::Incr(12)).await?;
    let _ = pipeline.submit(Request::Decr(2)).await?;
    let read1 = pipeline.submit(Request::Get).await?;

    let _ = pipeline.submit(Request::Decr(2)).await?;
    let _ = pipeline.submit(Request::Decr(2)).await?;
    let read2 = pipeline.submit(Request::Get).await?;

    let (_, resp1, resp2) = future::join3(pipeline.flush(), read1, read2).await;
    assert_eq!(resp1.unwrap(), Response::Value(10));
    assert_eq!(resp2.unwrap(), Response::Value(6));

    Ok(())
}

许可证:MPL-2.0

依赖关系

~1–1.7MB
~34K SLoC