9个版本
0.1.8 | 2023年7月21日 |
---|---|
0.1.6 | 2023年3月3日 |
0.1.5 | 2022年12月17日 |
0.1.4 | 2022年8月3日 |
0.1.0 | 2019年4月22日 |
#84 在 并发
每月251次下载
用于 rust-quiz
27KB
204 行
非交错输出队列
一种机制,在允许任意数量的任务(包括当前打印输出的任务)进步的同时,防止任务输出的交错。
[dependencies]
oqueue = "0.1"
用例
这个crate解决了一个看似狭窄的用例,但我在几种不同情况下都遇到了。
假设我们有一些明显并行的工作负载,每项工作都可能想要写入stdout/stderr。如果我们天真地并行化,不同任务的输出将交错,最终变得难以阅读。如果我们让每个任务锁定输出流,完成工作,然后解锁,我们就可以避免交错,但任务将无法并行运行。如果我们让每个任务将其输出写入本地缓冲区,并在最后原子性地打印所有输出,所有输出都将被不必要地延迟,应用程序可能会感觉跳跃和反应迟钝,因为没有输出被实时看到。
目标
-
我们有一个有序的任务序列 0..N。
-
我们希望按顺序接收任务0的所有输出,然后是任务1的所有输出,等等。任务输出不能与其他任务交错,并且必须遵循任务顺序。
-
我们希望任务并行执行。
-
我们希望所有输出尽可能快地打印出来,这意味着对于恰好一个任务来说是实时,对于其他任务则是在实时任务替换之前延迟。
示例
此示例使用oqueue在Rayon线程池上对工作线程的输出进行排序。
use oqueue::{Color::Red, Sequencer, Task};
use rayon::ThreadPoolBuilder;
use std::error::Error;
use std::fs;
use std::path::{Path, PathBuf};
use std::thread;
use std::time::Duration;
type Result<T> = std::result::Result<T, Box<dyn Error>>;
fn main() -> Result<()> {
// Come up with some work that needs to be performed. Let's pretend to
// perform work on each file in the current directory.
let mut files = Vec::new();
for entry in fs::read_dir(".")? {
files.push(entry?.path());
}
files.sort();
// Build a thread pool with one thread per cpu.
let cpus = num_cpus::get();
let pool = ThreadPoolBuilder::new().num_threads(cpus).build()?;
// Spin up the right number of worker threads. They will write to stderr.
let oqueue = Sequencer::stderr();
pool.scope(|scope| {
for _ in 0..cpus {
scope.spawn(|_| worker(&oqueue, &files));
}
});
Ok(())
}
fn worker(oqueue: &Sequencer, inputs: &[PathBuf]) {
// Perform tasks indicated by the sequencer.
loop {
let task = oqueue.begin();
match inputs.get(task.index) {
Some(path) => work(task, path),
None => return,
}
}
}
fn work(task: Task, path: &Path) {
// Produce output by writing to the task.
write!(task, "evaluating ");
task.bold();
writeln!(task, "{}", path.display());
// Do some expensive work...
let string = path.to_string_lossy();
thread::sleep(Duration::from_millis(150 * string.len() as u64));
// ... which may fail or succeed.
if string.contains('c') {
task.bold_color(Red);
write!(task, " ERROR");
task.reset_color();
writeln!(task, ": path contains the letter 'c'");
}
}
此程序的输出保证以期望的排序顺序显示任务,且非交错。任务将并行进步,无需等待执行输出。所有输出都将尽可能早地显示,包括始终有一个任务在实时执行。
evaluating ./.git evaluating ./.gitignore evaluating ./Cargo.lock ERROR: path contains the letter 'c' evaluating ./Cargo.toml evaluating ./LICENSE-APACHE evaluating ./LICENSE-MIT evaluating ./README.md evaluating ./examples evaluating ./src ERROR: path contains the letter 'c' evaluating ./target
进一步阅读
-
oqueue::Sequencer
文档涵盖了在不同任务间分配工作项的一些不同技术。 -
oqueue::Task
文档显示了设置输出颜色和将输出写入任务的API。
许可证
本软件基于Apache License, Version 2.0或MIT许可证授权,您可根据需要选择。除非您明确声明,否则您有意提交以包含在本软件包中的任何贡献,根据Apache-2.0许可证定义,应双重授权如上所述,无需附加条款或条件。
依赖项
~0.4–8.5MB
~62K SLoC