#stream #async #future #multiplex

stream_multiplexer

将多个流合并成几个

21个版本 (破坏性)

0.16.1 2020年8月6日
0.15.0 2020年7月14日
0.14.0 2020年3月28日

#1983 in 异步

每月下载量 48

MIT/Apache

28KB
439

stream_multiplexer

Build Status Latest Version Rust Documentation

此crate为流类别提供自然背压

流被收集到可以通过recv()轮询的'通道'中。通道之间是独立的,并且有自己的背压。


lib.rs:

此crate为流类别提供自然背压。

流被收集到可以通过recv()轮询的'通道'中。通道之间是独立的,并且有自己的背压。

示例

使用TCP服务器,您可能有两种不同类型的连接:已认证和未认证。通过将每个类别的连接组合到它自己的通道中,您可以使已认证的连接优于未认证的连接。这将为已认证的用户提供更好的体验。

代码示例

use futures_util::stream::StreamExt;
use tokio_util::compat::*;

smol::block_on(async move {
    const CHANNEL_ONE: usize = 1;
    const CHANNEL_TWO: usize = 2;

    // Initialize a multiplexer
    let mut multiplexer = stream_multiplexer::Multiplexer::new();

    // Set up the recognized channels
    multiplexer.add_channel(CHANNEL_ONE);
    multiplexer.add_channel(CHANNEL_TWO);

    // Bind to a random port on localhost
    let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
    let local_addr = listener.local_addr().unwrap();

    // Set up a task to add incoming connections into multiplexer
    let mut incoming_multiplexer = multiplexer.clone();
    smol::Task::spawn(async move {
        for stream in listener.incoming() {
            match stream {
                Ok(stream) => {
                    let stream = async_io::Async::new(stream).unwrap();
                    let codec = tokio_util::codec::LinesCodec::new();
                    let framed = tokio_util::codec::Framed::new(stream.compat(), codec);
                    let (sink, stream) = framed.split();
                    let _stream_id = incoming_multiplexer.add_stream_pair(sink, stream, CHANNEL_ONE);
                }
                Err(_) => unimplemented!()
            }
        }
    }).detach();

    // test clients to put into channels
    let mut client_1 = std::net::TcpStream::connect(local_addr).unwrap();
    let mut client_2 = std::net::TcpStream::connect(local_addr).unwrap();

    let mut multiplexer_ch_1 = multiplexer.clone();

    // Simple server that echos the data back to the stream and moves the stream to channel 2.
    smol::Task::spawn(async move {
        while let Ok((stream_id, message)) = multiplexer_ch_1.recv(CHANNEL_ONE).await {
            match message {
                Some(Ok(data)) => {
                    // echo the data back and move it to channel 2
                    multiplexer_ch_1.send(vec![stream_id], data);
                    multiplexer_ch_1.change_stream_channel(stream_id, CHANNEL_TWO);
                }
                Some(Err(err)) => {
                    // the stream had an error
                }
                None => {
                    // stream_id has been dropped
                }
            }
        }
    }).detach();
});

依赖项

~2.1–3MB
~59K SLoC