8 个版本 (4 个重大变更)

0.5.1 2022年2月13日
0.5.0 2022年2月7日
0.4.0 2022年1月27日
0.3.1 2022年1月9日
0.1.0 2021年11月23日

#381 in Rust 模式

Download history 12/week @ 2024-03-13 42/week @ 2024-03-20 44/week @ 2024-03-27 37/week @ 2024-04-03 57/week @ 2024-04-10 194/week @ 2024-04-17 190/week @ 2024-04-24 45/week @ 2024-05-01 22/week @ 2024-05-08 23/week @ 2024-05-15 19/week @ 2024-05-22 34/week @ 2024-05-29 23/week @ 2024-06-05 33/week @ 2024-06-12 24/week @ 2024-06-19 335/week @ 2024-06-26

每月下载量 419
用于 2 crates

MPL-2.0 OR MIT OR Apache-2.0

8.5MB
2K SLoC

Rust 的并行迭代器处理库

有关支持的操作,请参阅 IteratorExt (docs.rs 上的最新 IteratorExt)。

显著功能

  • 标准的迭代器的替代品(*)
    • 保留顺序
    • 懒加载,类似于单线程迭代器
    • panic 传播
  • 支持使用范围线程迭代借用值
  • 背压
  • 分析方法(用于分析管道处理瓶颈)

何时使用和替代方案

此库是向现有基于迭代器的代码添加多线程处理的良好通用解决方案。当您有一系列迭代器步骤,并希望并行处理其中一个或多个步骤以加快速度时,此库可以在所有方面尽可能接近直接替换。

实现基于创建工作线程池和通过通道发送工作,然后接收和排序结果以将其转换回普通迭代器。

通过通道发送迭代器项目速度快,但不是免费的。请确保并行化足够重的操作以证明通过通道发送数据的开销是合理的。例如,涉及 IO 或某些 CPU 重计算的运算。

您可以使用 cargo bench 或本地查看 /docs/bench-report/report/index.html 以获取 criterion.rs 基准报告,但作为一般规则,并行化的函数调用应该超过 200ns,以便并行化可以超过开销。

当您有很多已存储在集合中的项目,想要“滚动并执行一些简单的计算”时,您可能希望使用 rayon。这是一个针对并行处理更大数据集的块优化的库,它最小化了任何每个项目的开销。缺点是rayon 的迭代器转换回有序顺序迭代器是非平凡的

用法

基于现有代码添加新的功能应该相对简单,因此欢迎 PR。

简而言之,如果您有

# fn step_a(x: usize) -> usize {
#   x * 7
# }
# 
# fn filter_b(x: &usize) -> bool {
#   x % 2 == 0
# }
# 
# fn step_c(x: usize) -> usize {
#   x + 1
# }
assert_eq!(
  (0..10)
    .map(step_a)
    .filter(filter_b)
    .map(step_c).collect::<Vec<_>>(),
    vec![1, 15, 29, 43, 57]
);

您可以将其更改为

use dpc_pariter::IteratorExt as _;
# fn step_a(x: usize) -> usize {
#   x * 7
# }
# 
# fn filter_b(x: &usize) -> bool {
#   x % 2 == 0
# }
# 
# fn step_c(x: usize) -> usize {
#   x + 1
# }
assert_eq!(
  (0..10)
    .map(step_a)
    .filter(filter_b)
    .parallel_map(step_c).collect::<Vec<_>>(),
    vec![1, 15, 29, 43, 57]
);

这样它会运行得更快(条件适用),因为 step_c 将在多个线程上并行运行。

迭代借用值

遇到 borrowed value does not live long enough 错误吗?看起来您正在迭代包含借用引用的值。将它们发送到不同的线程进行处理可能会导致内存不安全问题。但别担心,我们为您解决了这个问题。

首先,如果您正在迭代的值可以被廉价地克隆,只需尝试添加 .cloned() 并将它们转换为拥有值。

如果不行,您可以使用来自 crossbeam 包的 scoped-threads API

use dpc_pariter::{IteratorExt as _, scope};
# fn step_a(x: &usize) -> usize {
#   *x * 7
# }
#
# fn filter_b(x: &usize) -> bool {
#   x % 2 == 0
# }
#
# fn step_c(x: usize) -> usize {
#   x + 1
# }
let v : Vec<_> = (0..10).collect();

scope(|scope| {
  assert_eq!(
    v
      .iter() // iterating over `&usize` now, `parallel_map` will not work
      .parallel_map_scoped(scope, step_a)
      .filter(filter_b)
      .map(step_c).collect::<Vec<_>>(),
      vec![1, 15, 29, 43, 57]
  );
});

// or:

assert_eq!(
  scope(|scope| {
  v
    .iter()
    .parallel_map_scoped(scope, step_a)
    .filter(filter_b)
    .map(step_c).collect::<Vec<_>>()}).expect("handle errors properly in production code"),
    vec![1, 15, 29, 43, 57]
);

额外的 scope 参数来自 crossbeam::thread::scope,目的是强制执行内存安全。只需将迭代器链包裹在一个不会超出借用值的 scope 包装器中,一切都会顺利运行。

自定义设置

如果您需要更改设置,如缓冲区大小和线程数量

# use dpc_pariter::IteratorExt as _;
assert_eq!(
  (0..10)
    .map(|x| x + 1)
    .parallel_filter_custom(|o| o.threads(16), |x| *x == 5)
    .map(|x| x /2).collect::<Vec<_>>(),
    vec![2]
);

状态 & 计划

我仍然需要这个确切的功能,所以我清理了我的临时代码,将其放入一个合适的库中。我通常很忙,所以如果您想添加某些内容,请提交一个 PR。

我愿意将所有权和维护权转移到值得信赖的手中。

依赖项