3个不稳定版本

0.3.1 2023年8月9日
0.3.0 2020年9月15日
0.2.0 2018年12月25日

并发 中排名 201

MIT 许可证

32KB
382

thread_io

docs.rs crates.io Build status

此Crate允许在后台线程中轻松包装读取器和写入器。这对于例如压缩格式的读取器和写入器来说非常有用,可以减少主线程的负载。

thread_io 使用通道(可选自 crossbeam crate)与后台读取器/写入器进行通信和交换数据块。

读取器API文档

写入器API文档

最低Rust版本为 1.38.0

示例

读取

以下代码计算gzip压缩文件中包含 spam 的行数。解压缩使用 flate2 库在后台线程中完成,解压缩数据被发送到主线程中提供给闭包的读取器。如果解压缩和文本搜索大约使用相同的CPU时间,则速度提升应该最大。

结果行数应该与 zcat file.txt.gz | grep 'spam' | wc -l 的输出相同。

use io::prelude::*;
use io;
use fs::File;
use thread_io::read::reader;
use flate2::read::GzDecoder;

// size of buffers sent across threads
const BUF_SIZE: usize = 256 * 1024;
// length of queue with buffers pre-filled in background thread
const QUEUE_LEN: usize = 5;

let f = File::open("file.txt.gz").unwrap();
let gz = GzDecoder::new(f);
let search_term = "spam";

let found = reader(
    BUF_SIZE, 
    QUEUE_LEN,  
    gz, 
    |reader| {
        let mut buf_reader = io::BufReader::new(reader);
        let mut found = 0;
        let mut line = String::new();
        while buf_reader.read_line(&mut line)? > 0 {
            if line.contains(search_term) {
                found += 1;
            }
            line.clear();
        }
        Ok::<_, io::Error>(found)
    }
)
.expect("decoding error");

println!("Found '{}' in {} lines.", search_term, found);

请注意,这只是一个示例。为了提高性能,可以将行读取到 Vec<u8> 缓冲区中(而不是 String),然后使用来自 memchr cratememchr 等方式搜索 spam

编译器有时需要有关从 func 返回的确切错误类型的提示,在这种情况下,通过指定 Ok::<_, io::Error>() 作为返回值来完成此操作。

thread_io::read::reader需要底层读取器实现Send。不幸的是,并不总是如此,例如对于io::StdinLock。存在一个thread_io::read::reader_init函数来处理这种情况。

写入

在后台线程中将内容写入gzip压缩文件的工作方式与读取类似。以下代码将包含spam的所有行写入压缩文件。压缩输出文件file.txt.gz的内容应与运行以下命令相同:grep 'spam' file.txt | gzip -c > file.txt.gz

use fs::File;
use io::prelude::*;
use io;
use thread_io::write::writer;
use flate2::write::{GzEncoder};
use flate2::Compression;

const BUF_SIZE: usize = 256 * 1024;
const QUEUE_LEN: usize = 5;

let infile = File::open("file.txt").unwrap();
let outfile = File::create("file.txt.gz").unwrap();
let mut gz_out = GzEncoder::new(outfile, Compression::default());
let search_term = "spam";

writer(
    BUF_SIZE,
    QUEUE_LEN,
    &mut gz_out,
    |writer| {
        // This function runs in the main thread, all writes are written to
        // 'gz_out' in the background
        let mut buf_infile = io::BufReader::new(infile);
        let mut line = String::new();
        while buf_infile.read_line(&mut line)? > 0 {
            if line.contains(search_term) {
                writer.write(line.as_bytes()).expect("write error");
            }
            line.clear();
        }
        Ok::<_, io::Error>(())
    },
)
.expect("encoding error");
gz_out.finish().expect("finishing failed");

有关确切行为和更灵活的函数的更多详细信息,例如处理非Send写入器类型,可以在写入模块的文档中找到。

func函数返回后,后台写入器始终调用io::Write::flush,确保在文件超出作用域之前捕获可能的刷新错误。

关于错误的说明

当使用此crate的读取器和写入器时,可能会出现两种类型的错误

  • io::Error是从io::Read::read / io::Write::write调用返回的。这种错误不能立即返回,而是推送到队列中,并在后续的读取或写入调用中返回。延迟取决于读取/写入函数的queuelen参数,但也取决于bufsize参数和读取/写入缓冲区的大小。
  • func闭包允许返回任何类型的自定义错误,这些错误可能在使用后台读取器读取后或在写入后台写入器之前在用户程序中发生。由于写入器的工作方式,因此需要额外的特性界限From<io::Error>

在读取和写入过程中,自定义用户错误优先于可能的io::Error。例如,当解析文件时,可能会发生语法错误,程序员从func闭包返回。与此同时,也可能发生io::Error,例如,因为GZIP文件被截断。如果此错误仍然在队列中等待报告,而语法错误发生,最终将返回语法错误,并且io::Error将被丢弃。

在func闭包结束(无论是否有错误)后,会向队列中放置一个信号,告诉后台线程停止处理。然而,在最终停止处理之前,会对queuelen进行读取或写入操作。

关于错误处理的更多详细信息,请参阅readwrite模块的文档。

交叉beam通道

可以通过指定crossbeam_channel功能来使用crossbeam crate的通道实现。我进行的一些测试并没有显示出使用标准库中的通道相比有任何性能提升。

类似的项目

fastq-rs在其thread_reader模块中提供了与thread_io::read::reader非常相似的功能。

依赖项

~350KB