#tokio #sink #stream #api #io #interract

tokio-simplified

一个用于与 tokio 溢出和流交互的简化 API

20 个版本

0.2.2 2019 年 12 月 10 日
0.2.1 2019 年 6 月 5 日
0.2.0 2019 年 5 月 22 日
0.1.15 2019 年 4 月 25 日

#48#sink

Apache-2.0

17KB
269

一个用于与 Tokio 的 SplitSink 和 SplitStream 交互的简化 API

动机

虽然 Tokio 非常强大,但其中一些功能对我来说不太直观。因此,我构建了这个 crate,以简化以我通常的方式进行与 Tokio 的交互

  • 在不真正想对发生的事情做很多事情的情况下写入 IO。
  • 将一个或多个回调订阅到一个 IO。

用法

此 API 应仅从 Tokio 运行时内部使用:它将尝试创建 Tokio 任务,因此如果不这样做则会引发 panic。

标准用法:多个回调

fn tokio_main() {
    let (sink, stream) = LineCodec.framed(tcp_stream).split();
    let io = IoManagerBuilder::new(sink, stream).build();
    let writer = io.get_writer();
    io.subscribe(move |frame| {
        writer.write(frame);
    });
    io.subscribe(move |frame| {
        println!("{}", frame);
    })
}

过滤

您可以使用过滤器以确保仅当帧匹配某些标准时才调用您的回调。

fn tokio_main() {
    let (sink, stream) = LineCodec.framed(tcp_stream).split();
    let io = IoManagerBuilder::new(sink, stream).with_filter(|frame, writer| {
        if frame.to_ascii_lowercase().contains("hello there") {
            writer.write("General Kenobi!");
            return None;
        }
        Some(frame)
    }).build();
    let writer = io.get_writer();
    io.subscribe(move |frame| {
        writer.write(frame);
    });
    io.subscribe(move |frame| {
        println!("{}", frame);
    })
}

单个回调技巧

每次您使用 subscribe(callback) 时,您都会承担一个额外的 futures::sync::mpsc::channel 成本,并且为每个回调调用一个 frame.clone()。这不是很高的成本,但如果您只有一个回调,您可以通过将回调作为始终返回 None 的过滤器来传递以降低这些成本。

fn tokio_main() {
    let (sink, stream) = LineCodec.framed(tcp_stream).split();
    let io = IoManagerBuilder::new(sink, stream).with_filter(|frame, writer| {
        writer.write(frame);
        None
    });
}

错误处理

默认情况下,IoManager 会忽略分配的 Stream 上的错误。如果您想处理错误,您可以通过在构建器中使用 with_error_handler() 将函数传递给构建器来指定错误处理函数。

fn tokio_main() {
    let (sink, stream) = LineCodec.framed(tcp_stream).split();
    let io = IoManagerBuilder::new(sink, stream)
        .with_filter(|frame, writer| {
            if frame.to_ascii_lowercase().contains("hello there") {
                writer.write("General Kenobi!");
                return None;
            }
            Some(frame)
        })
        .with_error_handler(move |error| {
            println!("{}", error);
        })
        .build();
    let writer = io.get_writer();
    io.subscribe(move |frame| {
        writer.write(frame);
    });
    io.subscribe(move |frame| {
        println!("{}", frame);
    })
}

依赖项

~7MB
~52K SLoC