#file-io #async-io #io #parallel #async-parallel #io-write #async

par_io

无需依赖项的并行、异步文件 I/O 库,可控制内存使用

3 个不稳定版本

0.4.2 2023 年 6 月 17 日
0.3.8 2022 年 6 月 2 日
0.3.7 2022 年 6 月 1 日

排名 405异步 类别

BSD-3-Clause

52KB
675

par_io 并行 I/O 库

一个简单的库,无需依赖项,用于并行读取和写入文件,实现生产者-消费者模型。

在读取器和写入线程内部使用对 preadpwrite 的同步调用来传输数据。

不使用异步运行时,因为实际的文件 I/O 是同步的,任务分配和执行由库通过直接调用 Rust 的 threadmpsc API 控制。

Fn 类型对象由客户端代码传递给库,并由生产者或消费者线程调用以生成或消耗数据。

内存缓冲区仅创建一次并在生产者和消费者之间重用,因此缓冲区创建之后不会发生内存分配,除非在客户端代码提供的回调对象中执行。

总内存消耗等于

客户端代码中分配的内存 +

(缓冲区大小) x (每个生产者的缓冲区数量) x (生产者数量)

当每个生产者只提供一个缓冲区时,消费者必须等待生产者发送缓冲区,生产者必须等待消费者发送缓冲区回来,因此每个线程的生产者-消费者执行是同步的。

当每个生产者使用多个缓冲区时,消费者可以在消费者处理它时生成数据,并从单独的缓冲区中读取,因此可以实现完整的异步执行。

当前实现允许使用以下方式设置每个生产者的数据块数量和每个生产者的缓冲区数量

(缓冲区数量) <= (数据块数量)

将来将能够显式指定最大内存使用量。

读取

(生产者 = 读取器)

1. the file is subdivided into chunks
2. each chunk is read by a separate producer thread
3. the producer thread extracts a buffer from a queue and fills it with the data from the file
4. the filled buffer is sent to a consumer thread (round-robin scheduling)
5. the consumer thread passes a reference to the buffer to a consumer callback received from client code
6. the return value from the callback is stored into an array
7. the buffer is moved back to the thread that sent it
8. all the return values from all the consumer threads are merged into a single array and returned to client code

写入

(消费者 = 写入器)

1. producer threads extract buffer from queue
2. mutable reference to buffer is passed to producer callback received from client code
3. result of callback invocation is checked: 
   1. no error: buffer is sent to consumer threads (round robin scheduling);
   2. error: error is sent to consumer threads which then terminate immediately
4. consumer threads receive the buffer and the file offset and store the data into file
5. buffer is moved back to the producer thread that sent it
6. each consumer thread returns the number of bytes written to file  
7. the results from all consumer threads are merged into a single array returned to client code

用法

read_filewrite_to_file 函数用于读取和写入操作。

read_file 函数返回一个

大小 = (每个生产者处理的块数)x (生产者数量)

每个元素包含消费数据的回调对象的返回值。是否返回错误由回调对象自行决定。

write_to_file 函数返回一个包含已写入字节数或 Err(String) 实例的 Result 实例。如果生产者的回调函数因错误而失败,则此错误将被转发给消费者,消费者将立即退出并返回接收到的错误。

并行读取示例

use par_io::read::read_file;
pub fn main() {
    let filename = std::env::args().nth(1).expect("Missing file name");
    let len = std::fs::metadata(&filename)
        .expect("Error reading file size")
        .len();
    let num_producers: u64 = std::env::args()
        .nth(2)
        .expect("Missing num producers")
        .parse()
        .unwrap();
    let num_consumers: u64 = std::env::args()
        .nth(3)
        .expect("Missing num consumers")
        .parse()
        .unwrap();
    let chunks_per_producer: u64 = std::env::args()
        .nth(4)
        .expect("Missing num chunks per producer")
        .parse()
        .unwrap();
    let num_buffers_per_producer: u64 = if let Some(p) = std::env::args().nth(5) {
        p.parse().expect("Wrong num tasks format")
    } else {
        2
    };
    // callback function
    let consume = |buffer: &[u8],   // buffer containing data from file
                   data: &String,   // custom data
                   chunk_id: u64,   // chunk id 
                   num_chunks: u64, // number of chunks per producer
                   _offset: u64     // file offset
     -> Result<usize, String> {
        std::thread::sleep(std::time::Duration::from_secs(1));
        println!(
            "Consumer: {}/{} {} {}",
            chunk_id,
            num_chunks,
            data,
            buffer.len()
        );
        Ok(buffer.len())
    };
    let tag = "TAG".to_string();
    match read_file(
        &filename,
        num_producers,
        num_consumers,
        chunks_per_producer,
        std::sync::Arc::new(consume), // <- callback
        tag,                          // <- client data passed to callback
        num_buffers_per_producer,
    ) {
        Ok(v) => {
            let bytes_consumed = v
                .iter()
                .fold(0, |acc, x| if let (_, Ok(b)) = x { acc + b } else { acc });
            assert_eq!(bytes_consumed, len as usize);
        }
        Err(err) => {
            eprintln!("{}", err.to_string());
        }
    }

并行写入示例

use par_io::write::write_to_file;
pub fn main() {
    let buffer_size: usize = std::env::args()
        .nth(1)
        .expect("Missing buffer size")
        .parse()
        .expect("Wrong buffer size format");
    let filename = std::env::args().nth(2).expect("Missing file name");
    let num_producers: u64 = std::env::args()
        .nth(3)
        .expect("Missing num producers")
        .parse()
        .unwrap();
    let num_consumers: u64 = std::env::args()
        .nth(4)
        .expect("Missing num consumers")
        .parse()
        .unwrap();
    let chunks_per_producer: u64 = std::env::args()
        .nth(5)
        .expect("Missing num chunks per producer")
        .parse()
        .unwrap();
    let num_buffers_per_producer: u64 = if let Some(p) = std::env::args().nth(6) {
        p.parse().expect("Wrong num tasks format")
    } else {
        2
    };
    let producer = |buffer: &mut Vec<u8>, _tag: &String, _offset: u64| -> Result<(), String> {
        std::thread::sleep(std::time::Duration::from_secs(1));
        println!("{:?}> Writing to offset {}", buffer.as_ptr(), offset);
        let len = buffer.len();
        // Warning: data need to be modified in place if not `buffer` will be re-allocated and not reused
        buffer.copy_from_slice(vec![1_u8; len].as_slice());
        Ok(())
    };
    let data = "TAG".to_string();
    match write_to_file(
        &filename,
        num_producers,
        num_consumers,
        chunks_per_producer,
        std::sync::Arc::new(producer),
        data,
        num_buffers_per_producer,
        buffer_size,
    ) {
        Ok(bytes_consumed) => {
            let len = std::fs::metadata(&filename)
                .expect("Cannot access file")
                .len();
            assert_eq!(bytes_consumed, len as usize);
            std::fs::remove_file(&filename).expect("Cannot delete file");
        },
        Err(err) => {
            use par_io::write::{WriteError, ProducerError, ConsumerError};
            match err {
                WriteError::Producer(ProducerError{msg, offset}) => {
                    eprintln!("Producer error: {} at {}", msg, offset);
                },
                WriteError::Consumer(ConsumerError{msg}) => {
                    eprintln!("Consumer error: {}", msg);
                },
                WriteError::Other(err) => {
                    eprintln!("Error: {}", err);
                },
            }
        }
    }
}

无运行时依赖