36 个稳定版本
1.22.1 | 2024年7月14日 |
---|---|
1.22.0 | 2024年6月30日 |
1.16.0 | 2024年5月23日 |
1.6.0 | 2024年4月30日 |
在 并发 中排名 650
每月下载量 368
用于 2 crates
150KB
2.5K SLoC
orx-concurrent-iter
线程安全、易用且轻量级的并发迭代器特性和高效实现。
- 易用:实现
ConcurrentIter
的迭代器可以在线程之间安全地共享。它可以被多个线程并发迭代,使用for
或while let
。它还提供了更高层次的方法,如for_each
和fold
,这些方法允许安全、简单和高效地并行处理。 - 高效 和 轻量级:此 crate 中提供的所有并发迭代器实现都扩展了原子迭代器,这些迭代器是无锁的,只依赖于原子原语。
示例
基本用法
ConcurrentIter
可以在线程之间安全地共享,并且可以并发迭代。如预期的那样,它将按顺序仅生成每个元素一次。生成的元素将在基于先到先得的方式下在线程之间共享。换句话说,线程将并发地从迭代器中拉取剩余的元素。
use orx_concurrent_iter::*;
use std::fmt::Debug;
fn fake_work<T: Debug>(_x: T) {
std::thread::sleep(std::time::Duration::from_nanos(10));
}
/// `process` elements of `iter` concurrently using `num_threads` threads
fn process_concurrently<T, I, F>(process: &F, num_threads: usize, iter: I)
where
T: Send + Sync,
F: Fn(T) + Send + Sync,
I: ConcurrentIter<Item = T>,
{
std::thread::scope(|s| {
for _ in 0..num_threads {
s.spawn(|| {
// concurrently iterate over values in a `for` loop
for value in iter.values() {
process(value);
}
});
}
});
}
/// executes `fake_work` concurrently on all elements of the `concurrent_iter`
fn run<T: Send + Sync + Debug>(concurrent_iter: impl ConcurrentIter<Item = T>) {
process_concurrently(&fake_work, 8, concurrent_iter)
}
// non-consuming iteration over references
let names: [String; 3] = [
String::from("foo"),
String::from("bar"),
String::from("baz"),
];
run::<&String>(names.con_iter());
let values: Vec<i32> = (0..8).map(|x| 3 * x + 1).collect();
run::<&i32>(values.con_iter());
let slice: &[i32] = values.as_slice();
run::<&i32>(slice.con_iter());
run::<i32>(slice.con_iter().cloned());
// consuming iteration over values
run::<String>(names.into_con_iter());
run::<i32>(values.into_con_iter());
// any Iterator into ConcurrentIter
let values: Vec<i32> = (0..1024).collect();
let evens = values.iter().filter(|x| *x % 2 == 0);
run::<&i32>(evens.into_con_iter());
let evens = values.iter().filter(|x| *x % 2 == 0);
run::<i32>(evens.into_con_iter().cloned());
let iter_val = values
.iter()
.filter(|x| *x % 2 == 0)
.map(|x| (7 * x + 3) as usize)
.skip(2)
.take(5);
run::<usize>(iter_val.into_con_iter());
循环方式
ConcurrentIter
实现了 next
方法,这是 Iterator::next
的并发对应物。因此,迭代器可以在多个线程中几乎像常规 Iterator
一样安全地使用。以下示例演示并解释了迭代 ConcurrentIter
的不同方式。
use orx_concurrent_iter::*;
use std::fmt::Debug;
fn process_one_by_one<T, I, F>(process: &F, num_threads: usize, iter: &I)
where
T: Send + Sync + Debug,
F: Fn(T) + Send + Sync,
I: ConcurrentIter<Item = T>,
{
std::thread::scope(|s| {
for _ in 0..num_threads {
s.spawn(|| {
// pull values 1 by 1
for value in iter.values() {
process(value);
}
while let Some(value) = iter.next() {
process(value);
}
// pull values and corresponding index 1 by 1
for (idx, value) in iter.ids_and_values() {
dbg!(idx);
process(value);
}
while let Some(x) = iter.next_id_and_value() {
dbg!(x.idx);
process(x.value);
}
});
}
});
}
fn process_in_chunks<T, I, F>(
process: &F,
num_threads: usize,
iter: &I,
chunk_size: usize,
) where
T: Send + Sync + Debug,
F: Fn(T) + Send + Sync,
I: ConcurrentIter<Item = T>,
{
std::thread::scope(|s| {
for _ in 0..num_threads {
s.spawn(|| {
// pull values in chunks of `chunk_size`
let mut chunk_iter = iter.buffered_iter(16);
while let Some(chunk) = chunk_iter.next() {
assert!(chunk.values.len() <= chunk_size);
for (i, value) in chunk.values.enumerate() {
let idx = chunk.begin_idx + i;
dbg!(idx);
process(value);
}
}
});
}
});
}
let process = |x| {
dbg!(x);
};
process_one_by_one(&process, 8, &(0..1024).con_iter());
process_in_chunks(&process, 8, &(0..1024).con_iter(), 64);
for
和while let
循环的process_one_by_one
示例展示了最基本的使用,其中线程将在完成前一个元素的处理后,从迭代器中拉取下一个元素。- 请注意,每个线程将根据它们完成循环内任务的速度,在迭代器的不同位置拉取不同的元素。因此,线程内部的对
enumerate
的调用,或者通过特定线程计数拉取的元素,**不**提供原始数据源中元素的索引。《ConcurrentIter》还通过ids_and_values
或next_id_and_value
方法提供了原始索引。 - 当循环内部要执行的工作很小(例如上面示例中的
dbg
调用)时,逐个取元素可能不是最优的。在这种情况下,更好的想法是批量取元素。在process_in_chunks
中,我们创建了一个缓冲区块迭代器,在每次next
调用时取出chunk_size
(或更少,如果剩余不足)个**连续**的元素。请注意,由chunk_iter.next()
返回的chunk
是一个具有已知len
的ExactSizeIterator
。 - 在批量迭代时,我们仍然可以访问元素的原始
idx
。chunk.begin_idx
代表返回的chunk.values
迭代器的第一个元素的原始索引。请注意,chunk.values
始终不为空;即,始终至少有一个元素,否则next
返回None
。此外,块迭代器包含连续的元素。因此,我们可以通过将chunk.begin_idx
与当前chunk
的局部索引结合来获取所有元素的原始索引,该索引是通过chunk.values.enumerate
获得的;即,let idx = chunk.begin_idx + i
。
并行折叠
将迭代的元素视为过程的输入,ConcurrentIter
方便地允许将任务分配给多个线程。下面是一个使用并发迭代器的并行折叠实现示例。
use orx_concurrent_iter::*;
fn compute(input: u64) -> u64 {
input * 2
}
fn fold(aggregated: u64, value: u64) -> u64 {
aggregated + value
}
fn par_fold(num_threads: usize, inputs: impl ConcurrentIter<Item = u64>) -> u64 {
std::thread::scope(|s| {
(0..num_threads)
.map(|_| s.spawn(|| inputs.values().map(compute).fold(0u64, fold)))
.collect::<Vec<_>>()
.into_iter()
.map(|x| x.join().expect("-_-"))
.fold(0u64, fold)
})
}
// validate
for num_threads in [1, 2, 4, 8] {
let values = (0..1024).map(|x| 2 * x);
let par_result = par_fold(num_threads, values.into_con_iter());
assert_eq!(par_result, 2 * 1023 * 1024);
}
关于实现的说明
- 并发迭代器允许简单的7行并行折叠实现。
- 并行折叠操作定义为两个折叠操作。
- 第一个
.map(_).fold(_)
定义了由num_threads
个线程执行的并行折叠操作。每个线程返回自己的聚合结果。 - 第二个
map(_).fold(_)
定义了最终的对每个线程获得的num_threads
结果执行的顺序折叠操作。
- 第一个
并行映射
并行映射也可以通过合并返回的转换后的集合,例如向量来实现。特别是对于较大的数据类型,可以更有效地将ConcurrentIter
与并发集合,如orx_concurrent_bag::ConcurrentBag
配对,这允许高效地并发收集结果而不需要复制。
use orx_concurrent_iter::*;
use orx_concurrent_bag::*;
fn map(input: u64) -> String {
input.to_string()
}
fn parallel_map(num_threads: usize, iter: impl ConcurrentIter<Item = u64>) -> SplitVec<String> {
let outputs = ConcurrentBag::new();
std::thread::scope(|s| {
for _ in 0..num_threads {
s.spawn(|| {
for output in iter.values().map(map) {
outputs.push(output);
}
});
}
});
outputs.into_inner()
}
// test
for num_threads in [1, 2, 4, 8] {
let inputs = (0..1024).map(|x| 2 * x);
let outputs = parallel_map(num_threads, inputs.into_con_iter());
assert_eq!(1024, outputs.len());
}
请注意,由于并行化,outputs
的顺序不保证与inputs
相同。为了在输出中保留输入的顺序,可以使用带索引的迭代来相应地排序结果。作为后排序的替代,可以使用ConcurrentBag
替换为orx_concurrent_bag::ConcurrentOrderedBag
来按顺序收集。
并行查找,线程间的一点点通信
如上图所示,使用ConcurrentIter
可以方便地实现不同方法的并行化。并发迭代器线程之间隐式共享的信息只有一位:剩余的元素。在不需要迭代所有元素的场景中,我们可以使用这些信息在多个线程之间共享信息。我们可能会将这些情况称为早期返回场景。一个常见的例子是find
方法,其中我们在寻找匹配元素,希望在找到匹配项后立即终止搜索。
以下是一个find方法的并行实现。
use orx_concurrent_iter::*;
fn par_find<I, P>(iter: I, predicate: P, n_threads: usize) -> Option<(usize, I::Item)>
where
I: ConcurrentIter,
P: Fn(&I::Item) -> bool + Send + Sync,
{
std::thread::scope(|s| {
(0..n_threads)
.map(|_| {
s.spawn(|| {
for (i, x) in iter.ids_and_values() {
if predicate(&x) {
iter.skip_to_end();
return Some((i, x));
}
}
None
})
})
.collect::<Vec<_>>()
.into_iter()
.flat_map(|x| x.join().expect("(-)"))
.min_by_key(|x| x.0)
})
}
let mut names: Vec<_> = (0..8785).map(|x| x.to_string()).collect();
names[42] = "foo".to_string();
let result = par_find(names.con_iter(), |x| x.starts_with('x'), 4);
assert_eq!(result, None);
let result = par_find(names.con_iter(), |x| x.starts_with('f'), 4);
assert_eq!(result, Some((42, &"foo".to_string())));
names[43] = "foo_second_match".to_string();
let result = par_find(names.con_iter(), |x| x.starts_with('f'), 4);
assert_eq!(result, Some((42, &"foo".to_string())));
请注意,并行化find实现分为两部分
- (并行搜索) 在每个线程内部,我们遍历并发迭代器的元素,并在找到满足
predicate
的第一个值时,连同其索引一起返回。 - (顺序封装) 由于这是一个并行执行,我们可能会从多个线程接收到多个匹配项。在第二部分中,我们调查线程结果,并返回位置索引最小的那个(
min_by_key(|x| x.0)
),因为这是原始迭代器中出现的位置最靠前的元素。
到目前为止,这很简单,与并行fold实现类似。然而,区别在于额外的iter.skip_to_end()
调用。这个调用将立即消耗迭代器中剩余的所有元素。因此,当另一个线程尝试在for (i, x) in iter.ids_and_values()
中前进迭代器时,它将不会收到任何其他元素。因此,它们也会在完成处理最后一个提取的元素后立即返回。这建立了线程之间非常简单的通信,这在实现早期返回场景(如find方法)中的效率至关重要。为了展示,假设我们在上述实现中没有使用iter.skip_to_end()
。
- 在第二个示例中,迭代器有8785个元素,其中只有一个元素满足谓词,即位于42位置的"foo"。
- 4个线程中的一个,比如
A
,将找到这个元素并立即返回。 - 其他3个线程将永远不会看到这个元素,因为它是由
A
提取的。它们将遍历所有剩余的元素,并最终返回None
。 - 最终结果将是正确的。然而,这种实现将评估迭代器中的所有元素,而不管第一个匹配元素的位置在哪里。虽然并行化了,但这将是一个非常低效的实现。
特性和实现者
定义可以安全地在多个线程中并行迭代的类型的特是ConcurrentIter
。
此外,还有两个特定义了可以提供ConcurrentIter
的类型。
ConcurrentIterable
类型实现了con_iter(&self)
方法,该方法返回一个并发迭代器而不消耗自身。- 另一方面,实现了
IntoConcurrentIter
特性的类型具有into_con_iter
方法,该方法消耗并转换类型为并发迭代器。此外,还存在一个名为IterIntoConcurrentIter
的特例,它在功能上与IntoConcurrentIter
相同,但仅由常规迭代器实现,目的是为了允许对向量和数组进行特殊实现。
下表总结了此 crate 中标准类型的实现。
类型 | ConcurrentIterablecon_iter 元素类型 |
IntoConcurrentIterinto_con_iter 元素类型 |
---|---|---|
&'a [T] |
&'aT |
&'aT |
Range<Idx> |
Idx |
Idx |
Vec<T> |
&T |
T |
[T;N] |
&T |
T |
Iter: Iterator<Item = T> |
- | T |
最后,元素类型为 Clone
或 Copy
类型引用的并发迭代器具有 cloned
或 copied
方法,允许遍历复制的值。
贡献
欢迎贡献!如果您发现错误、有疑问或认为某些地方可以改进,请打开一个 问题 或创建一个 PR。
许可证
此库采用 MIT 许可证。有关详细信息,请参阅 LICENSE。