2 个版本

0.5.1 2022年2月13日
0.5.0 2022年2月13日

#216 in 并发

Download history 529/week @ 2024-02-26 1501/week @ 2024-03-04 1354/week @ 2024-03-11 291/week @ 2024-03-18 370/week @ 2024-03-25 337/week @ 2024-04-01 492/week @ 2024-04-08 346/week @ 2024-04-15 509/week @ 2024-04-22 522/week @ 2024-04-29 304/week @ 2024-05-06 327/week @ 2024-05-13 208/week @ 2024-05-20 279/week @ 2024-05-27 265/week @ 2024-06-03 283/week @ 2024-06-10

1,086 每月下载量
11 个 Crates 中使用 (5 个直接使用)

MPL-2.0 OR MIT OR Apache-2.0

4MB
1K SLoC

Rust 的并行迭代器处理库

有关支持的操作,请参阅 IteratorExt (最新 IteratorExt 在 docs.rs 上)。

显著特性

  • 标准迭代器的直接替换(*)
    • 保持顺序
    • 懒加载,类似于单线程迭代器
    • panic 传播
  • 支持使用作用域线程迭代借用值
  • 背压
  • 分析方法(用于分析流水线处理的瓶颈非常有用)

何时使用和替代方案

此库是将多线程处理添加到现有基于迭代器的代码的通用解决方案。当您有一系列迭代器步骤,并希望并行处理其中之一或多个以提高速度时,此库尽可能使其在各个方面都接近直接替换。

实现基于生成工作线程线程池并使用通道发送它们工作,然后接收并排序结果以将其转换为正常迭代器。

通过通道发送迭代器项很快,但不是免费的。请确保并行化足够重的操作以证明通过通道发送数据的开销。例如,涉及 IO 或一些 CPU 重的计算的操作。

您可以使用 cargo bench 或在本地查看 /docs/bench-report/report/index.html 以获取 criterion.rs 基准报告,但作为一般规则,并行化的函数调用应该超过 200ns,以使并行化超过开销。

当您有很多已存储在集合中的项目,并想要“滚动并执行一些简单的计算”时,您可能想使用 rayon。这是一个针对并行化处理大型数据集的整个数据块进行优化的库,这最大限度地减少了每个项目的开销。缺点是,将 rayon 的迭代器转换回有序顺序迭代器是非平凡的

使用方法

基于现有代码添加新的应该相对简单,因此欢迎 PR。

简而言之,如果您有

# fn step_a(x: usize) -> usize {
#   x * 7
# }
# 
# fn filter_b(x: &usize) -> bool {
#   x % 2 == 0
# }
# 
# fn step_c(x: usize) -> usize {
#   x + 1
# }
assert_eq!(
  (0..10)
    .map(step_a)
    .filter(filter_b)
    .map(step_c).collect::<Vec<_>>(),
    vec![1, 15, 29, 43, 57]
);

您可以将它改为

use pariter::IteratorExt as _;
# fn step_a(x: usize) -> usize {
#   x * 7
# }
# 
# fn filter_b(x: &usize) -> bool {
#   x % 2 == 0
# }
# 
# fn step_c(x: usize) -> usize {
#   x + 1
# }
assert_eq!(
  (0..10)
    .map(step_a)
    .filter(filter_b)
    .parallel_map(step_c).collect::<Vec<_>>(),
    vec![1, 15, 29, 43, 57]
);

这将运行得更快(条件适用),因为step_c将在多线程上并行运行。

迭代借用值

遇到borrowed value does not live long enough错误?看起来您正在迭代包含借用引用的值。将它们发送到不同的线程进行处理可能会导致内存不安全的问题。但别担心,我们有解决方案。

首先,如果您正在迭代的值可以被廉价地克隆,只需尝试添加.cloned()并将它们转换为拥有值。

如果不能,您可以使用来自crossbeam存储库的scoped-threads API

use pariter::{IteratorExt as _, scope};
# fn step_a(x: &usize) -> usize {
#   *x * 7
# }
#
# fn filter_b(x: &usize) -> bool {
#   x % 2 == 0
# }
#
# fn step_c(x: usize) -> usize {
#   x + 1
# }
let v : Vec<_> = (0..10).collect();

scope(|scope| {
  assert_eq!(
    v
      .iter() // iterating over `&usize` now, `parallel_map` will not work
      .parallel_map_scoped(scope, step_a)
      .filter(filter_b)
      .map(step_c).collect::<Vec<_>>(),
      vec![1, 15, 29, 43, 57]
  );
});

// or:

assert_eq!(
  scope(|scope| {
  v
    .iter()
    .parallel_map_scoped(scope, step_a)
    .filter(filter_b)
    .map(step_c).collect::<Vec<_>>()}).expect("handle errors properly in production code"),
    vec![1, 15, 29, 43, 57]
);

额外的scope参数来自crossbeam::thread::scope,用于强制执行内存安全。只需将您的迭代器链包装在一个不会超出借用值的scope包装器中,一切都会顺利运行。

自定义设置

如果您需要更改设置,如缓冲区大小和线程数

# use pariter::IteratorExt as _;
assert_eq!(
  (0..10)
    .map(|x| x + 1)
    .parallel_filter_custom(|o| o.threads(16), |x| *x == 5)
    .map(|x| x /2).collect::<Vec<_>>(),
    vec![2]
);

状态 & 计划

我始终需要这个确切的功能,所以我清理了我的临时代码,将其放入了一个适当的库。我通常很忙,所以如果您想添加某些内容,请提交一个PR。

我愿意将共享/所有权和维护权转移到值得信赖的手中。

依赖项