4个版本 (1个稳定版)
1.0.0 | 2021年5月7日 |
---|---|
0.9.2 | 2020年8月10日 |
0.9.1 | 2020年8月4日 |
0.9.0 | 2020年8月3日 |
#551 in 异步
33KB
240 行
plumbing
plumbing是一个库,它通过异步请求/响应系统管理管道请求,例如HTTP Keep-Alive
连接或通过Redis的Redis协议进行交互。
plumbing
的核心是Pipeline
结构,它管理单个请求/响应连接。此连接由一对Sink
和Stream
组成,并应设置,使得每个通过Sink
发送的请求最终将通过Stream
发送回响应。创建此类对的一个示例是
- 在tokio中打开TCP连接,获取一个
TcpStream
。 - 使用
TcpStream::into_split
将这些流拆分为读取器和写入器 - 使用
tokio_util::codec
将这些流包装在你的协议的Encoder
和Decoder
中。这个Encoder
和Decoder
作为Pipeline
的Sink
和Stream
。
提交给Pipeline
的请求将返回一个Resolver
,这是一个Future
,它将解析为该请求的响应。可以同时存在任意数量的Resolvers,响应将按照它们通过底层Stream
到达的顺序依次发送给每个Resolvers。
管道对背压敏感,并且不会进行自己的缓冲,所以如果底层流停止接受请求,提交新的请求将会阻塞。同样,每个Resolver
都必须被轮询以检索其响应;后续的Resolver将会在先前的Resolver收到响应(或被丢弃)之前阻塞。根据您的系统,这意味着您可能需要确保发送或刷新端与接收端同时进行轮询。
plumbing
当前为#![no_std]
;它只需要alloc
才能正常运行。
示例
此示例使用tokio任务创建一个模拟的单键数据库,然后使用plumbing来管理对其的一些简单写入和读取。
mod fake_db {
use futures::{channel::mpsc, stream::StreamExt, SinkExt};
use tokio::task;
#[derive(Debug)]
pub struct FakeDb {
counter: i32,
}
#[derive(Debug)]
pub enum Request {
Incr(i32),
Decr(i32),
Set(i32),
Get,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Response {
Ok,
Value(i32),
}
pub fn create_db() -> (mpsc::Sender<Request>, mpsc::Receiver<Response>) {
let (send_req, mut recv_req) = mpsc::channel(0);
let (mut send_resp, recv_resp) = mpsc::channel(0);
let _task = task::spawn(async move {
let mut database = FakeDb { counter: 0 };
while let Some(request) = recv_req.next().await {
match request {
Request::Incr(count) => {
database.counter += count;
send_resp.send(Response::Ok).await.unwrap();
}
Request::Decr(count) => {
database.counter -= count;
send_resp.send(Response::Ok).await.unwrap();
}
Request::Set(value) => {
database.counter = value;
send_resp.send(Response::Ok).await.unwrap();
}
Request::Get => {
let response = Response::Value(database.counter);
send_resp.send(response).await.unwrap();
}
}
}
});
(send_req, recv_resp)
}
}
use fake_db::{Request, Response, create_db};
use futures::{
future,
sink::SinkExt,
FutureExt,
};
use plumbing::Pipeline;
use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let (send, recv) = create_db();
// Because the send channel can only handle 1 item at a time, we want
// to buffer requests
let send = send.buffer(20);
let mut pipeline = Pipeline::new(send, recv);
// Basic interaction
let fut = pipeline.submit(Request::Set(10)).await?;
// If we're buffering requests or responses, we may need to make sure
// they both
let (_, response) = future::join(pipeline.flush(), fut).await;
assert_eq!(response.unwrap(), Response::Ok);
let fut = pipeline.submit(Request::Get).await?;
let (_, response) = future::join(pipeline.flush(), fut).await;
assert_eq!(response.unwrap(), Response::Value(10));
// pipeline several requests together
let write1 = pipeline.submit(Request::Incr(20)).await?;
let write2 = pipeline.submit(Request::Decr(5)).await?;
let read = pipeline.submit(Request::Get).await?;
// We need to make sure all of these are polled
let (_, _, _, response) = future::join4(pipeline.flush(), write1, write2, read).await;
assert_eq!(response.unwrap(), Response::Value(25));
// Alternatively, if we drop the futures returned by submit, the responses
// associated with them will be silently discarded. We can use this to
// keep only the responses we're interested in.
let _ = pipeline.submit(Request::Set(0)).await?;
let _ = pipeline.submit(Request::Incr(12)).await?;
let _ = pipeline.submit(Request::Decr(2)).await?;
let read1 = pipeline.submit(Request::Get).await?;
let _ = pipeline.submit(Request::Decr(2)).await?;
let _ = pipeline.submit(Request::Decr(2)).await?;
let read2 = pipeline.submit(Request::Get).await?;
let (_, resp1, resp2) = future::join3(pipeline.flush(), read1, read2).await;
assert_eq!(resp1.unwrap(), Response::Value(10));
assert_eq!(resp2.unwrap(), Response::Value(6));
Ok(())
}
许可证:MPL-2.0
依赖关系
~1–1.7MB
~34K SLoC