#thread-pool #tokio #future #io

废弃 tokio-io-pool

高效的执行短、I/O密集型futures的tokio线程池

11次发布

0.2.0-alpha.42020年4月7日
0.2.0-alpha.32019年10月1日
0.2.0-alpha.22019年9月23日
0.1.6 2019年6月6日
0.1.3 2018年7月20日

#156 in #thread-pool

Download history 96/week @ 2024-03-30 14/week @ 2024-04-06 8/week @ 2024-04-13 15/week @ 2024-04-20 8/week @ 2024-04-27 3/week @ 2024-05-04 5/week @ 2024-05-11 3/week @ 2024-05-18 19/week @ 2024-05-25 30/week @ 2024-06-01 24/week @ 2024-06-08 20/week @ 2024-06-15 18/week @ 2024-06-22 15/week @ 2024-06-29 31/week @ 2024-07-06 24/week @ 2024-07-13

每月91次下载
用于tokio-reactor

MIT/Apache

27KB
426

tokio-io-pool

Crates.io

该crate提供了一种线程池,用于高效执行短、I/O密集型futures。

在tokio的调度器改进之后,这不再必要,改进的调度器在许多套接字上处理并发I/O比旧调度器要好得多。Tokio 0.2也不允许相同的钩子提供自定义调度器,因此即使有必要,更新tokio-io-pool以像tokio 0.1一样“透明”地工作也很困难。因此,该项目已被取消。如果您发现tokio的性能问题,请将其作为tokio错误提交:)


lib.rs:

该crate提供了一个线程池,用于高效执行短、I/O密集型futures。

tokio提供的标准Runtime使用线程池来允许并发执行计算密集型futures。然而,其工作窃取机制使得futures可能会在不同的线程上执行,而它们的reactor却在运行,这导致不必要的同步,从而降低了可达到的吞吐量。虽然这种权衡对于许多异步应用程序来说效果良好,因为它可以更均匀地分配负载,但它并不适合高性能I/O密集型应用程序,因为同步线程的成本很高。例如,如果您的应用程序执行频繁但小的I/O操作,就会发生这种情况。

该crate提供了一个基于futures的线程池的替代实现。它启动一个线程池,每个线程运行一个tokio::runtime::current_thread::Runtime(因此每个都有自己的I/O reactor),并通过将futures分配给线程来轮询启动池。一旦futures被分配到线程,它以及它可能通过tokio::spawn产生的任何子futures,都将受到同一线程的控制。

通常情况下,您只有在进行大量非常短的I/O操作并且发现您在常规的tokio运行时受到工作窃取或反应器通知的限制时才应使用tokio-io-pool。如果您不确定使用哪个,请从tokio运行时开始。

请注意,此池不支持blocking函数,因为它不被底层current_thread::Runtime支持。[详情](https://github.com/tokio-rs/tokio/issues/432)。希望这个问题最终会得到解决。

有关尝试将此池合并到tokio本身的讨论;这一努力在[tokio-rs/tokio#486](https://github.com/tokio-rs/tokio/issues/486)中跟踪。

示例

use tokio::prelude::*;
use tokio::io::AsyncReadExt;
use tokio::net::TcpListener;

fn main() {
    let server = async move {
        // Bind the server's socket.
        let mut listener = TcpListener::bind("127.0.0.1:12345").await.expect("unable to bind TCP listener");
        // Pull out a stream of sockets for incoming connections
        loop {
            let (sock, _) = listener.accept().await.expect("acccept failed");
            tokio::spawn(async move {
                let mut sock = sock;
                let (mut reader, mut writer) = sock.split();
                let bytes_copied = reader.copy(&mut writer);
                let n = bytes_copied.await.expect("I/O error");
                println!("wrote {} bytes", n);
            });
        }
    };

    // Start the Tokio runtime
    tokio_io_pool::run(server);
}

依赖项

~4.5MB
~66K SLoC