9个版本

0.1.8 2023年7月21日
0.1.6 2023年3月3日
0.1.5 2022年12月17日
0.1.4 2022年8月3日
0.1.0 2019年4月22日

#84并发

Download history 69/week @ 2024-03-14 63/week @ 2024-03-21 69/week @ 2024-03-28 76/week @ 2024-04-04 69/week @ 2024-04-11 64/week @ 2024-04-18 67/week @ 2024-04-25 62/week @ 2024-05-02 62/week @ 2024-05-09 68/week @ 2024-05-16 69/week @ 2024-05-23 67/week @ 2024-05-30 67/week @ 2024-06-06 67/week @ 2024-06-13 71/week @ 2024-06-20 37/week @ 2024-06-27

每月251次下载
用于 rust-quiz

MIT/Apache

27KB
204

非交错输出队列

github crates.io docs.rs build status

一种机制,在允许任意数量的任务(包括当前打印输出的任务)进步的同时,防止任务输出的交错。

[dependencies]
oqueue = "0.1"

用例

这个crate解决了一个看似狭窄的用例,但我在几种不同情况下都遇到了。

假设我们有一些明显并行的工作负载,每项工作都可能想要写入stdout/stderr。如果我们天真地并行化,不同任务的输出将交错,最终变得难以阅读。如果我们让每个任务锁定输出流,完成工作,然后解锁,我们就可以避免交错,但任务将无法并行运行。如果我们让每个任务将其输出写入本地缓冲区,并在最后原子性地打印所有输出,所有输出都将被不必要地延迟,应用程序可能会感觉跳跃和反应迟钝,因为没有输出被实时看到。


目标

  • 我们有一个有序的任务序列 0..N。

  • 我们希望按顺序接收任务0的所有输出,然后是任务1的所有输出,等等。任务输出不能与其他任务交错,并且必须遵循任务顺序。

  • 我们希望任务并行执行。

  • 我们希望所有输出尽可能快地打印出来,这意味着对于恰好一个任务来说是实时,对于其他任务则是在实时任务替换之前延迟。


示例

此示例使用oqueue在Rayon线程池上对工作线程的输出进行排序。

use oqueue::{Color::Red, Sequencer, Task};
use rayon::ThreadPoolBuilder;
use std::error::Error;
use std::fs;
use std::path::{Path, PathBuf};
use std::thread;
use std::time::Duration;

type Result<T> = std::result::Result<T, Box<dyn Error>>;

fn main() -> Result<()> {
    // Come up with some work that needs to be performed. Let's pretend to
    // perform work on each file in the current directory.
    let mut files = Vec::new();
    for entry in fs::read_dir(".")? {
        files.push(entry?.path());
    }
    files.sort();

    // Build a thread pool with one thread per cpu.
    let cpus = num_cpus::get();
    let pool = ThreadPoolBuilder::new().num_threads(cpus).build()?;

    // Spin up the right number of worker threads. They will write to stderr.
    let oqueue = Sequencer::stderr();
    pool.scope(|scope| {
        for _ in 0..cpus {
            scope.spawn(|_| worker(&oqueue, &files));
        }
    });

    Ok(())
}

fn worker(oqueue: &Sequencer, inputs: &[PathBuf]) {
    // Perform tasks indicated by the sequencer.
    loop {
        let task = oqueue.begin();
        match inputs.get(task.index) {
            Some(path) => work(task, path),
            None => return,
        }
    }
}

fn work(task: Task, path: &Path) {
    // Produce output by writing to the task.
    write!(task, "evaluating ");
    task.bold();
    writeln!(task, "{}", path.display());

    // Do some expensive work...
    let string = path.to_string_lossy();
    thread::sleep(Duration::from_millis(150 * string.len() as u64));

    // ... which may fail or succeed.
    if string.contains('c') {
        task.bold_color(Red);
        write!(task, "  ERROR");
        task.reset_color();
        writeln!(task, ": path contains the letter 'c'");
    }
}

此程序的输出保证以期望的排序顺序显示任务,且非交错。任务将并行进步,无需等待执行输出。所有输出都将尽可能早地显示,包括始终有一个任务在实时执行。

evaluating ./.git
evaluating ./.gitignore
evaluating ./Cargo.lock
  ERROR: path contains the letter 'c'
evaluating ./Cargo.toml
evaluating ./LICENSE-APACHE
evaluating ./LICENSE-MIT
evaluating ./README.md
evaluating ./examples
evaluating ./src
  ERROR: path contains the letter 'c'
evaluating ./target

进一步阅读

  • oqueue::Sequencer 文档涵盖了在不同任务间分配工作项的一些不同技术。

  • oqueue::Task 文档显示了设置输出颜色和将输出写入任务的API。


许可证

本软件基于Apache License, Version 2.0MIT许可证授权,您可根据需要选择。
除非您明确声明,否则您有意提交以包含在本软件包中的任何贡献,根据Apache-2.0许可证定义,应双重授权如上所述,无需附加条款或条件。

依赖项

~0.4–8.5MB
~62K SLoC