21个版本 (破坏性)
0.16.1 | 2020年8月6日 |
---|---|
0.15.0 | 2020年7月14日 |
0.14.0 | 2020年3月28日 |
#1983 in 异步
每月下载量 48
28KB
439 行
stream_multiplexer
此crate为流类别提供自然背压
流被收集到可以通过recv()
轮询的'通道'中。通道之间是独立的,并且有自己的背压。
lib.rs
:
此crate为流类别提供自然背压。
流被收集到可以通过recv()
轮询的'通道'中。通道之间是独立的,并且有自己的背压。
示例
使用TCP服务器,您可能有两种不同类型的连接:已认证和未认证。通过将每个类别的连接组合到它自己的通道中,您可以使已认证的连接优于未认证的连接。这将为已认证的用户提供更好的体验。
代码示例
use futures_util::stream::StreamExt;
use tokio_util::compat::*;
smol::block_on(async move {
const CHANNEL_ONE: usize = 1;
const CHANNEL_TWO: usize = 2;
// Initialize a multiplexer
let mut multiplexer = stream_multiplexer::Multiplexer::new();
// Set up the recognized channels
multiplexer.add_channel(CHANNEL_ONE);
multiplexer.add_channel(CHANNEL_TWO);
// Bind to a random port on localhost
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let local_addr = listener.local_addr().unwrap();
// Set up a task to add incoming connections into multiplexer
let mut incoming_multiplexer = multiplexer.clone();
smol::Task::spawn(async move {
for stream in listener.incoming() {
match stream {
Ok(stream) => {
let stream = async_io::Async::new(stream).unwrap();
let codec = tokio_util::codec::LinesCodec::new();
let framed = tokio_util::codec::Framed::new(stream.compat(), codec);
let (sink, stream) = framed.split();
let _stream_id = incoming_multiplexer.add_stream_pair(sink, stream, CHANNEL_ONE);
}
Err(_) => unimplemented!()
}
}
}).detach();
// test clients to put into channels
let mut client_1 = std::net::TcpStream::connect(local_addr).unwrap();
let mut client_2 = std::net::TcpStream::connect(local_addr).unwrap();
let mut multiplexer_ch_1 = multiplexer.clone();
// Simple server that echos the data back to the stream and moves the stream to channel 2.
smol::Task::spawn(async move {
while let Ok((stream_id, message)) = multiplexer_ch_1.recv(CHANNEL_ONE).await {
match message {
Some(Ok(data)) => {
// echo the data back and move it to channel 2
multiplexer_ch_1.send(vec![stream_id], data);
multiplexer_ch_1.change_stream_channel(stream_id, CHANNEL_TWO);
}
Some(Err(err)) => {
// the stream had an error
}
None => {
// stream_id has been dropped
}
}
}
}).detach();
});
依赖项
~2.1–3MB
~59K SLoC