2 个版本
0.5.1 | 2022年2月13日 |
---|---|
0.5.0 | 2022年2月13日 |
#216 in 并发
1,086 每月下载量
在 11 个 Crates 中使用 (5 个直接使用)
4MB
1K SLoC
Rust 的并行迭代器处理库
有关支持的操作,请参阅 IteratorExt
(最新 IteratorExt
在 docs.rs 上)。
显著特性
- 标准迭代器的直接替换(*)
- 保持顺序
- 懒加载,类似于单线程迭代器
- panic 传播
- 支持使用作用域线程迭代借用值
- 背压
- 分析方法(用于分析流水线处理的瓶颈非常有用)
何时使用和替代方案
此库是将多线程处理添加到现有基于迭代器的代码的通用解决方案。当您有一系列迭代器步骤,并希望并行处理其中之一或多个以提高速度时,此库尽可能使其在各个方面都接近直接替换。
实现基于生成工作线程线程池并使用通道发送它们工作,然后接收并排序结果以将其转换为正常迭代器。
通过通道发送迭代器项很快,但不是免费的。请确保并行化足够重的操作以证明通过通道发送数据的开销。例如,涉及 IO 或一些 CPU 重的计算的操作。
您可以使用 cargo bench
或在本地查看 /docs/bench-report/report/index.html
以获取 criterion.rs 基准报告,但作为一般规则,并行化的函数调用应该超过 200ns,以使并行化超过开销。
当您有很多已存储在集合中的项目,并想要“滚动并执行一些简单的计算”时,您可能想使用 rayon
。这是一个针对并行化处理大型数据集的整个数据块进行优化的库,这最大限度地减少了每个项目的开销。缺点是,将 rayon
的迭代器转换回有序顺序迭代器是非平凡的。
使用方法
基于现有代码添加新的应该相对简单,因此欢迎 PR。
简而言之,如果您有
# fn step_a(x: usize) -> usize {
# x * 7
# }
#
# fn filter_b(x: &usize) -> bool {
# x % 2 == 0
# }
#
# fn step_c(x: usize) -> usize {
# x + 1
# }
assert_eq!(
(0..10)
.map(step_a)
.filter(filter_b)
.map(step_c).collect::<Vec<_>>(),
vec![1, 15, 29, 43, 57]
);
您可以将它改为
use pariter::IteratorExt as _;
# fn step_a(x: usize) -> usize {
# x * 7
# }
#
# fn filter_b(x: &usize) -> bool {
# x % 2 == 0
# }
#
# fn step_c(x: usize) -> usize {
# x + 1
# }
assert_eq!(
(0..10)
.map(step_a)
.filter(filter_b)
.parallel_map(step_c).collect::<Vec<_>>(),
vec![1, 15, 29, 43, 57]
);
这将运行得更快(条件适用),因为step_c
将在多线程上并行运行。
迭代借用值
遇到borrowed value does not live long enough
错误?看起来您正在迭代包含借用引用的值。将它们发送到不同的线程进行处理可能会导致内存不安全的问题。但别担心,我们有解决方案。
首先,如果您正在迭代的值可以被廉价地克隆,只需尝试添加.cloned()
并将它们转换为拥有值。
如果不能,您可以使用来自crossbeam
存储库的scoped-threads API
use pariter::{IteratorExt as _, scope};
# fn step_a(x: &usize) -> usize {
# *x * 7
# }
#
# fn filter_b(x: &usize) -> bool {
# x % 2 == 0
# }
#
# fn step_c(x: usize) -> usize {
# x + 1
# }
let v : Vec<_> = (0..10).collect();
scope(|scope| {
assert_eq!(
v
.iter() // iterating over `&usize` now, `parallel_map` will not work
.parallel_map_scoped(scope, step_a)
.filter(filter_b)
.map(step_c).collect::<Vec<_>>(),
vec![1, 15, 29, 43, 57]
);
});
// or:
assert_eq!(
scope(|scope| {
v
.iter()
.parallel_map_scoped(scope, step_a)
.filter(filter_b)
.map(step_c).collect::<Vec<_>>()}).expect("handle errors properly in production code"),
vec![1, 15, 29, 43, 57]
);
额外的scope
参数来自crossbeam::thread::scope
,用于强制执行内存安全。只需将您的迭代器链包装在一个不会超出借用值的scope
包装器中,一切都会顺利运行。
自定义设置
如果您需要更改设置,如缓冲区大小和线程数
# use pariter::IteratorExt as _;
assert_eq!(
(0..10)
.map(|x| x + 1)
.parallel_filter_custom(|o| o.threads(16), |x| *x == 5)
.map(|x| x /2).collect::<Vec<_>>(),
vec![2]
);
状态 & 计划
我始终需要这个确切的功能,所以我清理了我的临时代码,将其放入了一个适当的库。我通常很忙,所以如果您想添加某些内容,请提交一个PR。
我愿意将共享/所有权和维护权转移到值得信赖的手中。