#write #broadcast #read #parallel #bus #destination #multiple

bus_writer

单读多写 & 单读多验证;并行将读取广播到多个可写目的地

3个版本

使用旧Rust 2015

0.1.2 2018年8月1日
0.1.1 2018年7月31日
0.1.0 2018年7月29日

#8#destination

每月下载 23次

MIT许可证

27KB
379

Bus Writer

这个Rust包提供了一个通用的单读多写库,支持用于监控进度的回调。它还提供了一个通用的单读多验证器,以便您可以使用类似的技术验证结果。您提供任何实现了 io::Read 的类型作为源,以及一组实现了 io::Write 的目的地。可以提供回调来控制写入和验证的取消,以及监控每个目的地的进度。

原因

System76使用此包的用例是为我们的Popsicle[0]实用程序。Popsicle是一个CLI + GTK3实用程序,从ISO读取,并将ISO并行写入所有指定的USB设备。与传统工具并行操作时,串行闪存USB驱动器会非常耗时,因为读取I/O量会很大。目前也没有任何优秀的CLI或GTK实用程序能够以简单的方式处理此过程。

实现

实现的关键是我们使用 bus 包中的 Bus 通道类型。正如 bus 文档中所写,它是一个 "无锁、有界、单生产者、多消费者、广播通道"[1]BusWriter 的目标是读取源目的地,并将缓冲数据的 Arc'd 缓冲区传输到每个目的地。

每个目的地都会启动一个线程,该线程监听这些数据块以进行写入,并将事件传输到另一个线程,该线程监控来自这些线程的事件,并使用提供的回调,以便调用者可以处理这些事件(例如,用于进度条以跟踪每个设备的进度)。

主事件循环在达到通道中未读消息的最大限制时阻塞,这是为了防止您的应用程序超过所有可用的内存,这很容易发生,如果写入目的地的速度比从源读取的速度慢,那么就会发生这种情况。因此,这使得能够在不预先将整个文件缓冲到内存中的情况下将大型文件写入多个目的地。

当达到上限时在广播上阻塞意味着写入速度过快的设备在其端点上的所有消息耗尽后将会阻塞。因此,您的写入速度将限制在最慢设备的速度上。

阅读完成后,函数将等待所有后台线程完成后再返回结果。这是为了确保所有事件都已接收和处理。

不安全

尽管我们的源代码中没有使用不安全,但像 buscrossbeam 这样的依赖项可能使用它。

可配置性

默认情况下,桶大小为 16 MiB。您可以使用 BusWriter::with_bucket() 方法进行配置,这允许您为主事件循环中存储读取内容提供一个自己的可变引用到缓冲区。这允许您重用现有的缓冲区,因此是那些想要这样做的人的一个优化选择。

此外,在任何给定时间内在内层的 Bus 中存储最多 4 个桶。您可以使用 BusWriter::buckets() 方法配置您想要使用的桶的数量。

参考

示例

extern crate bus_writer;

use bus_writer::*;
use std::io::{BufReader, Cursor, Read};
use std::fs::{self, File};
use std::process::exit;

fn main() {
    let data: Vec<u8> = [0u8; 1024 * 1024 * 5].into_iter()
        .zip([1u8; 1024 * 1024 * 5].into_iter())
        .cycle()
        .take(50 * 1024 * 1024)
        .fold(Vec::with_capacity(100 * 1024 * 1024), |mut acc, (&x, &y)| {
            acc.push(x);
            acc.push(y);
            acc
        });

    let mut source = Cursor::new(&data);

    let files = ["a", "b", "c", "d", "e", "f", "g", "h"];
    let mut temp_files = [
        fs::OpenOptions::new().read(true).write(true).create(true).open(files[0]).unwrap(),
        fs::OpenOptions::new().read(true).write(true).create(true).open(files[1]).unwrap(),
        fs::OpenOptions::new().read(true).write(true).create(true).open(files[2]).unwrap(),
        fs::OpenOptions::new().read(true).write(true).create(true).open(files[3]).unwrap(),
        fs::OpenOptions::new().read(true).write(true).create(true).open(files[4]).unwrap(),
        fs::OpenOptions::new().read(true).write(true).create(true).open(files[5]).unwrap(),
        fs::OpenOptions::new().read(true).write(true).create(true).open(files[6]).unwrap(),
        fs::OpenOptions::new().read(true).write(true).create(true).open(files[7]).unwrap(),
    ];

    let mut errored = false;
    let result = BusWriter::new(
        &mut source,
        &mut temp_files,
        // Reports progress of each device so that callers may create their own progress bars
        // for each destination being written to, as seen in System76's Popsicle GTK UI.
        |event| match event {
            BusWriterMessage::Written { id, bytes_written } => {
                println!("{}: {} total bytes written", files[id], bytes_written);
            }
            BusWriterMessage::Completed { id } => {
                println!("{}: Completed", files[id]);
            }
            BusWriterMessage::Errored { id, why } => {
                println!("{} errored: {}", files[id], why);
                errored = true;
            }
        },
        // Executed at certain points while writing to check if the process needs to be cancelled
        || false
    ).write();

    if let Err(why) = result {
        eprintln!("writing failed: {}", why);
        exit(1);
    } else if errored {
        eprintln!("an error occurred");
        exit(1);
    }

    eprintln!("finished writing; validating files");

    let result = BusVerifier::new(
        source,
        &mut temp_files,
        |event| match event {
            BusVerifierMessage::Read { id, bytes_read } => {
                println!("{}: {} bytes verified", files[id], bytes_read);
            }
            BusVerifierMessage::Valid { id } => {
                println!("{}: Validated", files[id]);
            }
            BusVerifierMessage::Invalid { id } => {
                println!("{}: Invalid", id);
                errored = true;
            }
            BusVerifierMessage::Errored { id, why } => {
                println!("{} errored while verifying: {}", files[id], why);
                errored = true;
            }
        },
        || false
    ).verify();

    if let Err(why) = result {
        eprintln!("writing failed: {}", why);
        exit(1);
    } else if errored {
        eprintln!("Error occurred");
        exit(1);
    }

    eprintln!("All files validated!");
}

依赖项

~0.5–6MB
~13K SLoC