1 个不稳定版本
0.1.5 | 2023年2月5日 |
---|---|
0.1.4 |
|
#917 in 并发
8KB
106 行
chunker
rayon 的 par_iter
/par_chunks
+ 内部迭代 + reduce
的最小化替代品,用于默认具有进度条的切片并行处理。尽管名为 chunker,但这个包实际上非常小巧:只有 80 SLOC。
用法
cargo add chunker
使用这些参数调用 chunker::run
或 chunker::run_mut
items
— 要并行处理的值的切片config
— 配置:thread_count
、chunk_size
、progress_bar
、bar_step
init
— 初始化工作者的中间结果的函数work
— 接受来自items
的单个值并修改工作者的中间结果(以及/或修改值本身,在run_mut
的情况下)的Fn
gather
— 接受工作者的中间结果接收器mpsc::Receiver
的FnMut
并返回最终结果(通常通过reduce
)
示例
平方和
chunker::run(
&input,
chunker::Config::default(),
|| 0,
|thread_sum, i| *thread_sum += i * i,
|rx| rx.iter().sum::<i64>()
)
简单的并行实现 单词计数
use std::{collections::HashMap, io::{stdin, stdout, Read, Write, BufWriter}, cmp::Reverse};
fn main() {
let mut text = String::new();
stdin().read_to_string(&mut text).unwrap();
let lower = text.to_ascii_lowercase();
let lines: Vec<_> = lower.lines().collect();
let word_counts = chunker::run(
&lines,
chunker::Config { chunk_size: 10_000, ..Default::default() },
|| HashMap::<&str, u32>::new(),
|counts, line| {
for word in line.split_whitespace() {
*counts.entry(word).or_default() += 1;
}
},
|rx| rx.into_iter().reduce(|mut word_counts, counts| {
for (word, count) in counts {
*word_counts.entry(word).or_default() += count;
}
word_counts
}).unwrap(),
);
let mut sorted_word_counts = Vec::from_iter(word_counts);
sorted_word_counts.sort_unstable_by_key(|&(_, count)| Reverse(count));
let mut stdout = BufWriter::new(stdout().lock());
for (word, count) in sorted_word_counts {
writeln!(stdout, "{word} {count}").unwrap();
}
}
$ hyperfine 'target/release/examples/count_words <kjvbible_x10.txt'
Benchmark 1: target/release/examples/count_words <kjvbible_x10.txt
Time (mean ± σ): 78.7 ms ± 1.8 ms [User: 283.2 ms, System: 20.8 ms]
Range (min … max): 74.9 ms … 84.2 ms 36 runs
依赖关系
~2–14MB
~142K SLoC