#parallel-iterator #parallel-processing #parallel #iterator #thread #performance

orx-parallel

适用于迭代器方法组合定义的计算的,高性能且可配置的并行计算库。

5 个稳定版本

1.5.0 2024 年 8 月 22 日
1.4.0 2024 年 8 月 12 日
1.3.0 2024 年 7 月 25 日
1.1.0 2024 年 7 月 17 日
1.0.0 2024 年 7 月 10 日

#183并发

Download history 118/week @ 2024-07-08 124/week @ 2024-07-15 129/week @ 2024-07-22 31/week @ 2024-07-29 123/week @ 2024-08-12

每月 285 次下载

MIT 许可证

225KB
4K SLoC

orx-parallel

orx-parallel crate orx-parallel documentation

适用于迭代器方法组合定义的计算的,高性能且可配置的并行计算库。

通过迭代器实现并行计算

通过并行迭代器 trait Par 实现并行计算,方便地将定义为一个函数组合的迭代器中的顺序代码转换为并行代码,只需添加一个单词: parinto_par

use orx_parallel::prelude::*;

struct Input(String);
struct Output(usize);

let compute = |input: Input| Output(input.0.len());
let select = |output: &Output| output.0.is_power_of_two();

let inputs = || (0..1024).map(|x| Input(x.to_string())).collect::<Vec<_>>();

// sequential computation with regular iterator
let seq_result: usize = inputs()
    .into_iter()
    .map(compute)
    .filter(select)
    .map(|x| x.0)
    .sum();
assert_eq!(seq_result, 286);

// parallel computation with Par
let par_result = inputs()
    .into_par() // parallelize with default settings
    .map(compute)
    .filter(select)
    .map(|x| x.0)
    .sum();
assert_eq!(par_result, 286);

下面的代码块包含一些基本示例,演示了不同来源为并行计算提供引用或值作为输入。

use orx_parallel::prelude::*;
use std::collections::*;

fn test<P: Par<Item = usize>>(iter: P) {
    let result = iter.filter(|x| x % 2 == 1).map(|x| x + 1).sum();
    assert_eq!(6, result);
}

let range = 1..4;
test(range.par());

let vec = vec![1, 2, 3];
test(vec.par().copied()); // use a ref to vec
test(vec.into_par()); // consume vec

// other collections can be used similarly
let set: HashSet<_> = [1, 2, 3].into_iter().collect();
test(set.par().copied());
test(set.into_par());

let bmap: BTreeMap<_, _> = [('a', 1), ('b', 2), ('c', 3)].into_iter().collect();
test(bmap.par().map(|x| x.1).copied());
test(bmap.into_par().map(|x| x.1));

// any regular/sequential iterator can be parallelized
let iter = ["", "a", "bb", "ccc", "dddd"]
    .iter()
    .skip(1)
    .take(3)
    .map(|x| x.len());
test(iter.par());

易于配置

将工作分配给并行线程的复杂性简化为两个简单参数,易于推理

  • NumThreads 表示并行化的程度。它可以设置为以下两种变体之一
    • Auto:所有线程都将假定可用。这是一个上限;当计算不足以具有挑战性时,可能不会达到此数字。
    • Max(n):计算可以启动最多 n 个线程。NumThreads::Max(1) 等同于顺序执行。
  • ChunkSize 表示并行工作者每次空闲时将拉取并处理的元素数量。此参数旨在平衡并行化的开销和任务的异构成本。它可以设置为以下三种变体之一
    • Auto:库旨在选择最佳值以最小化计算时间。
    • Exact(c):块大小将为 c。此变体将完全控制权交给了调用者,因此最适合需要调整的计算。
    • Min(c):块大小将至少为 c。然而,执行可以拉取更多元素,这取决于输入的特征以及使用的线程数,以减少并行化开销的影响。
use orx_parallel::prelude::*;
use std::num::NonZeroUsize;

let _ = (0..42).par().sum(); // both settings at Auto

let _ = (0..42).par().num_threads(4).sum(); // at most 4 threads
let _ = (0..42).par().num_threads(1).sum(); // sequential
let _ = (0..42).par().num_threads(NumThreads::sequential()).sum(); // also sequential
let _ = (0..42).par().num_threads(0).sum(); // shorthand for NumThreads::Auto

let _ = (0..42).par().chunk_size(16).sum(); // chunks of exactly 16 elements
let c = NonZeroUsize::new(64).unwrap();
let _ = (0..42).par().chunk_size(ChunkSize::Min(c)).sum(); // min 64 elements
let _ = (0..42).par().chunk_size(0).sum(); // shorthand for ChunkSize::Auto

let _ = (0..42).par().num_threads(4).chunk_size(16).sum(); // set both params

控制这两个参数并能够轻松地、个别地配置每次计算,在各种方式上都很有用。请参见EasyConfiguration部分以获取示例。

顺序计算和并行计算的一般化

使用NumThreads::Max(1)执行并行计算相当于顺序计算,没有任何并行化开销。在这种意义上,Par是顺序计算和并行计算的一般化。

为了说明这一点,考虑以下函数,它接受一个计算定义作为Par。请注意,就像顺序迭代器一样,Par是惰性的。换句话说,它只是计算的定义。这样的computation与可以通过computation.params()访问的设置一起传递给execute方法。

然而,由于该方法拥有computation,它可以决定如何执行它。此实现将遵循给定的并行设置。除非是星期一,否则它将顺序运行。

use orx_parallel::prelude::*;
use chrono::{Datelike, Local, Weekday};
type Output = String;

fn execute<C: Par<Item = Output>>(computation: C) -> Vec<Output> {
    match Local::now().weekday() {
        Weekday::Mon => computation.num_threads(1).collect_vec(),
        _ => computation.collect_vec(),
    }
}

这个特性使我们免于两次定义相同的计算。我们经常需要编写如下代码,根据输入参数的需要顺序或并行运行。这是重复的、容易出错的且难以维护的。

use orx_parallel::prelude::*;
struct Input(String);
struct Output(usize);
fn compute(input: Input) -> Output {
    Output(input.0.len())
}
fn select(output: &Output) -> bool {
    output.0.is_power_of_two()
}

fn execute_conditionally(inputs: impl Iterator<Item = Input>, parallelize: bool) -> usize {
    match parallelize {
        true => inputs
            .into_iter()
            .par()
            .map(compute)
            .filter(select)
            .map(|x| x.0)
            .sum(),
        false => inputs
            .into_iter()
            .map(compute)
            .filter(select)
            .map(|x| x.0)
            .sum(),
    }
}

使用Par,我们可以有一个单一的版本,当顺序执行时不会产生任何开销。

fn execute_unified(inputs: impl Iterator<Item = Input>, parallelize: bool) -> usize {
    let num_threads = match parallelize {
        true => NumThreads::Auto,
        false => NumThreads::sequential(),
    };
    inputs
        .par()
        .num_threads(num_threads)
        .map(compute)
        .filter(select)
        .map(|x| x.0)
        .sum()
}

底层方法和性能

这个crate是ConcurrentIter的自然后续。您可能已经发现了示例中的并行映射、折叠和查找实现。特别是当与ConcurrentBagConcurrentOrderedBag等并发集合结合使用时,并行计算的实现非常直接。您可以在本节和这个讨论中找到一些细节。

基准测试很棘手,尤其是在并行环境中。尽管如此,本存储库中定义的基准测试Par来说非常有希望。其性能通常与rayon相当。在收集结果的情况下,它可以提供显著的改进,例如map |> filter |> collectflat_map |> collect等。

与rayon的关系

请参阅RelationToRayon部分,以了解orx-parallel与rayon的相似之处和不同之处。

贡献

欢迎贡献!如果您发现错误、有问题或认为某些内容可以改进,请打开一个问题或创建一个PR。

v1的目标是允许Par涵盖实际用例,如果您有无法用其表达和计算的计算,请打开一个问题。

v2的目标是提供一个更动态和智能的并行执行器,请参见并加入相关的讨论这里

许可

此库在MIT许可下发布。请参阅LICENSE以获取详细信息。

依赖

~1MB
~12K SLoC