#iterator #parallelism #atomic #iteration

orx-concurrent-iter

线程安全、易用且轻量级的并发迭代器特性和高效实现

36 个稳定版本

1.22.1 2024年7月14日
1.22.0 2024年6月30日
1.16.0 2024年5月23日
1.6.0 2024年4月30日

并发 中排名 650

Download history 120/week @ 2024-04-18 278/week @ 2024-04-25 734/week @ 2024-05-02 996/week @ 2024-05-09 554/week @ 2024-05-16 307/week @ 2024-05-23 2/week @ 2024-05-30 3/week @ 2024-06-06 213/week @ 2024-06-13 370/week @ 2024-06-20 386/week @ 2024-06-27 19/week @ 2024-07-04 126/week @ 2024-07-11 20/week @ 2024-07-18 183/week @ 2024-07-25 38/week @ 2024-08-01

每月下载量 368
用于 2 crates

MIT 许可证

150KB
2.5K SLoC

orx-concurrent-iter

orx-concurrent-iter crate orx-concurrent-iter documentation

线程安全、易用且轻量级的并发迭代器特性和高效实现。

  • 易用:实现 ConcurrentIter 的迭代器可以在线程之间安全地共享。它可以被多个线程并发迭代,使用 forwhile let。它还提供了更高层次的方法,如 for_eachfold,这些方法允许安全、简单和高效地并行处理。
  • 高效轻量级:此 crate 中提供的所有并发迭代器实现都扩展了原子迭代器,这些迭代器是无锁的,只依赖于原子原语。

示例

基本用法

ConcurrentIter 可以在线程之间安全地共享,并且可以并发迭代。如预期的那样,它将按顺序仅生成每个元素一次。生成的元素将在基于先到先得的方式下在线程之间共享。换句话说,线程将并发地从迭代器中拉取剩余的元素。

use orx_concurrent_iter::*;
use std::fmt::Debug;

fn fake_work<T: Debug>(_x: T) {
    std::thread::sleep(std::time::Duration::from_nanos(10));
}

/// `process` elements of `iter` concurrently using `num_threads` threads
fn process_concurrently<T, I, F>(process: &F, num_threads: usize, iter: I)
where
    T: Send + Sync,
    F: Fn(T) + Send + Sync,
    I: ConcurrentIter<Item = T>,
{
    std::thread::scope(|s| {
        for _ in 0..num_threads {
            s.spawn(|| {
                // concurrently iterate over values in a `for` loop
                for value in iter.values() {
                    process(value);
                }
            });
        }
    });
}

/// executes `fake_work` concurrently on all elements of the `concurrent_iter`
fn run<T: Send + Sync + Debug>(concurrent_iter: impl ConcurrentIter<Item = T>) {
    process_concurrently(&fake_work, 8, concurrent_iter)
}

// non-consuming iteration over references
let names: [String; 3] = [
    String::from("foo"),
    String::from("bar"),
    String::from("baz"),
];
run::<&String>(names.con_iter());

let values: Vec<i32> = (0..8).map(|x| 3 * x + 1).collect();
run::<&i32>(values.con_iter());

let slice: &[i32] = values.as_slice();
run::<&i32>(slice.con_iter());
run::<i32>(slice.con_iter().cloned());

// consuming iteration over values
run::<String>(names.into_con_iter());
run::<i32>(values.into_con_iter());

// any Iterator into ConcurrentIter
let values: Vec<i32> = (0..1024).collect();

let evens = values.iter().filter(|x| *x % 2 == 0);
run::<&i32>(evens.into_con_iter());

let evens = values.iter().filter(|x| *x % 2 == 0);
run::<i32>(evens.into_con_iter().cloned());

let iter_val = values
    .iter()
    .filter(|x| *x % 2 == 0)
    .map(|x| (7 * x + 3) as usize)
    .skip(2)
    .take(5);
run::<usize>(iter_val.into_con_iter());

循环方式

ConcurrentIter 实现了 next 方法,这是 Iterator::next 的并发对应物。因此,迭代器可以在多个线程中几乎像常规 Iterator 一样安全地使用。以下示例演示并解释了迭代 ConcurrentIter 的不同方式。

use orx_concurrent_iter::*;
use std::fmt::Debug;

fn process_one_by_one<T, I, F>(process: &F, num_threads: usize, iter: &I)
where
    T: Send + Sync + Debug,
    F: Fn(T) + Send + Sync,
    I: ConcurrentIter<Item = T>,
{
    std::thread::scope(|s| {
        for _ in 0..num_threads {
            s.spawn(|| {
                // pull values 1 by 1
                for value in iter.values() {
                    process(value);
                }

                while let Some(value) = iter.next() {
                    process(value);
                }

                // pull values and corresponding index 1 by 1
                for (idx, value) in iter.ids_and_values() {
                    dbg!(idx);
                    process(value);
                }

                while let Some(x) = iter.next_id_and_value() {
                    dbg!(x.idx);
                    process(x.value);
                }
            });
        }
    });
}

fn process_in_chunks<T, I, F>(
    process: &F,
    num_threads: usize,
    iter: &I,
    chunk_size: usize,
) where
    T: Send + Sync + Debug,
    F: Fn(T) + Send + Sync,
    I: ConcurrentIter<Item = T>,
{
    std::thread::scope(|s| {
        for _ in 0..num_threads {
            s.spawn(|| {
                // pull values in chunks of `chunk_size`
                let mut chunk_iter = iter.buffered_iter(16);
                while let Some(chunk) = chunk_iter.next() {
                    assert!(chunk.values.len() <= chunk_size);

                    for (i, value) in chunk.values.enumerate() {
                        let idx = chunk.begin_idx + i;

                        dbg!(idx);
                        process(value);
                    }
                }
            });
        }
    });
}

let process = |x| {
    dbg!(x);
};

process_one_by_one(&process, 8, &(0..1024).con_iter());
process_in_chunks(&process, 8, &(0..1024).con_iter(), 64);
  • forwhile let 循环的 process_one_by_one 示例展示了最基本的使用,其中线程将在完成前一个元素的处理后,从迭代器中拉取下一个元素。
  • 请注意,每个线程将根据它们完成循环内任务的速度,在迭代器的不同位置拉取不同的元素。因此,线程内部的对 enumerate 的调用,或者通过特定线程计数拉取的元素,**不**提供原始数据源中元素的索引。《ConcurrentIter》还通过 ids_and_valuesnext_id_and_value 方法提供了原始索引。
  • 当循环内部要执行的工作很小(例如上面示例中的dbg调用)时,逐个取元素可能不是最优的。在这种情况下,更好的想法是批量取元素。在process_in_chunks中,我们创建了一个缓冲区块迭代器,在每次next调用时取出chunk_size(或更少,如果剩余不足)个**连续**的元素。请注意,由chunk_iter.next()返回的chunk是一个具有已知lenExactSizeIterator
  • 在批量迭代时,我们仍然可以访问元素的原始idxchunk.begin_idx代表返回的chunk.values迭代器的第一个元素的原始索引。请注意,chunk.values始终不为空;即,始终至少有一个元素,否则next返回None。此外,块迭代器包含连续的元素。因此,我们可以通过将chunk.begin_idx与当前chunk的局部索引结合来获取所有元素的原始索引,该索引是通过chunk.values.enumerate获得的;即,let idx = chunk.begin_idx + i

并行折叠

将迭代的元素视为过程的输入,ConcurrentIter方便地允许将任务分配给多个线程。下面是一个使用并发迭代器的并行折叠实现示例。

use orx_concurrent_iter::*;

fn compute(input: u64) -> u64 {
    input * 2
}

fn fold(aggregated: u64, value: u64) -> u64 {
    aggregated + value
}

fn par_fold(num_threads: usize, inputs: impl ConcurrentIter<Item = u64>) -> u64 {
    std::thread::scope(|s| {
        (0..num_threads)
            .map(|_| s.spawn(|| inputs.values().map(compute).fold(0u64, fold)))
            .collect::<Vec<_>>()
            .into_iter()
            .map(|x| x.join().expect("-_-"))
            .fold(0u64, fold)
    })
}

// validate
for num_threads in [1, 2, 4, 8] {
    let values = (0..1024).map(|x| 2 * x);
    let par_result = par_fold(num_threads, values.into_con_iter());
    assert_eq!(par_result, 2 * 1023 * 1024);
}

关于实现的说明

  • 并发迭代器允许简单的7行并行折叠实现。
  • 并行折叠操作定义为两个折叠操作。
    • 第一个.map(_).fold(_)定义了由num_threads个线程执行的并行折叠操作。每个线程返回自己的聚合结果。
    • 第二个map(_).fold(_)定义了最终的对每个线程获得的num_threads结果执行的顺序折叠操作。

并行映射

并行映射也可以通过合并返回的转换后的集合,例如向量来实现。特别是对于较大的数据类型,可以更有效地将ConcurrentIter与并发集合,如orx_concurrent_bag::ConcurrentBag配对,这允许高效地并发收集结果而不需要复制。

use orx_concurrent_iter::*;
use orx_concurrent_bag::*;

fn map(input: u64) -> String {
    input.to_string()
}

fn parallel_map(num_threads: usize, iter: impl ConcurrentIter<Item = u64>) -> SplitVec<String> {
    let outputs = ConcurrentBag::new();
    std::thread::scope(|s| {
        for _ in 0..num_threads {
            s.spawn(|| {
                for output in iter.values().map(map) {
                    outputs.push(output);
                }
            });
        }
    });
    outputs.into_inner()
}

// test
for num_threads in [1, 2, 4, 8] {
    let inputs = (0..1024).map(|x| 2 * x);
    let outputs = parallel_map(num_threads, inputs.into_con_iter());
    assert_eq!(1024, outputs.len());
}

请注意,由于并行化,outputs的顺序不保证与inputs相同。为了在输出中保留输入的顺序,可以使用带索引的迭代来相应地排序结果。作为后排序的替代,可以使用ConcurrentBag替换为orx_concurrent_bag::ConcurrentOrderedBag来按顺序收集。

并行查找,线程间的一点点通信

如上图所示,使用ConcurrentIter可以方便地实现不同方法的并行化。并发迭代器线程之间隐式共享的信息只有一位:剩余的元素。在不需要迭代所有元素的场景中,我们可以使用这些信息在多个线程之间共享信息。我们可能会将这些情况称为早期返回场景。一个常见的例子是find方法,其中我们在寻找匹配元素,希望在找到匹配项后立即终止搜索。

以下是一个find方法的并行实现。

use orx_concurrent_iter::*;

fn par_find<I, P>(iter: I, predicate: P, n_threads: usize) -> Option<(usize, I::Item)>
where
    I: ConcurrentIter,
    P: Fn(&I::Item) -> bool + Send + Sync,
{
    std::thread::scope(|s| {
        (0..n_threads)
            .map(|_| {
                s.spawn(|| {
                    for (i, x) in iter.ids_and_values() {
                        if predicate(&x) {
                            iter.skip_to_end();
                            return Some((i, x));
                        }
                    }
                    None
                })
            })
            .collect::<Vec<_>>()
            .into_iter()
            .flat_map(|x| x.join().expect("(-)"))
            .min_by_key(|x| x.0)
    })
}

let mut names: Vec<_> = (0..8785).map(|x| x.to_string()).collect();
names[42] = "foo".to_string();

let result = par_find(names.con_iter(), |x| x.starts_with('x'), 4);
assert_eq!(result, None);

let result = par_find(names.con_iter(), |x| x.starts_with('f'), 4);
assert_eq!(result, Some((42, &"foo".to_string())));

names[43] = "foo_second_match".to_string();
let result = par_find(names.con_iter(), |x| x.starts_with('f'), 4);
assert_eq!(result, Some((42, &"foo".to_string())));

请注意,并行化find实现分为两部分

  • (并行搜索) 在每个线程内部,我们遍历并发迭代器的元素,并在找到满足predicate的第一个值时,连同其索引一起返回。
  • (顺序封装) 由于这是一个并行执行,我们可能会从多个线程接收到多个匹配项。在第二部分中,我们调查线程结果,并返回位置索引最小的那个(min_by_key(|x| x.0)),因为这是原始迭代器中出现的位置最靠前的元素。

到目前为止,这很简单,与并行fold实现类似。然而,区别在于额外的iter.skip_to_end()调用。这个调用将立即消耗迭代器中剩余的所有元素。因此,当另一个线程尝试在for (i, x) in iter.ids_and_values()中前进迭代器时,它将不会收到任何其他元素。因此,它们也会在完成处理最后一个提取的元素后立即返回。这建立了线程之间非常简单的通信,这在实现早期返回场景(如find方法)中的效率至关重要。为了展示,假设我们在上述实现中没有使用iter.skip_to_end()

  • 在第二个示例中,迭代器有8785个元素,其中只有一个元素满足谓词,即位于42位置的"foo"。
  • 4个线程中的一个,比如A,将找到这个元素并立即返回。
  • 其他3个线程将永远不会看到这个元素,因为它是由A提取的。它们将遍历所有剩余的元素,并最终返回None
  • 最终结果将是正确的。然而,这种实现将评估迭代器中的所有元素,而不管第一个匹配元素的位置在哪里。虽然并行化了,但这将是一个非常低效的实现。

特性和实现者

定义可以安全地在多个线程中并行迭代的类型的特是ConcurrentIter

此外,还有两个特定义了可以提供ConcurrentIter的类型。

  • ConcurrentIterable类型实现了con_iter(&self)方法,该方法返回一个并发迭代器而不消耗自身。
  • 另一方面,实现了 IntoConcurrentIter 特性的类型具有 into_con_iter 方法,该方法消耗并转换类型为并发迭代器。此外,还存在一个名为 IterIntoConcurrentIter 的特例,它在功能上与 IntoConcurrentIter 相同,但仅由常规迭代器实现,目的是为了允许对向量和数组进行特殊实现。

下表总结了此 crate 中标准类型的实现。

类型 ConcurrentIterable
con_iter 元素类型
IntoConcurrentIter
into_con_iter 元素类型
&'a [T] &'aT &'aT
Range<Idx> Idx Idx
Vec<T> &T T
[T;N] &T T
Iter: Iterator<Item = T> - T

最后,元素类型为 CloneCopy 类型引用的并发迭代器具有 clonedcopied 方法,允许遍历复制的值。

贡献

欢迎贡献!如果您发现错误、有疑问或认为某些地方可以改进,请打开一个 问题 或创建一个 PR。

许可证

此库采用 MIT 许可证。有关详细信息,请参阅 LICENSE。

依赖关系