10 个稳定版本
新 2.4.0 | 2024年8月22日 |
---|---|
2.3.0 | 2024年8月12日 |
2.2.0 | 2024年7月25日 |
1.3.0 | 2024年6月30日 |
1.0.0 | 2024年4月12日 |
#623 in 并发
2,144 每月下载量
用于 orx-parallel
60KB
296 行
orx-concurrent-ordered-bag
一个高效、便捷且轻量级的只增并发数据结构,允许高性能和有序的并发集合。
- 便捷:
ConcurrentOrderedBag
可以安全地作为共享引用在多个线程间共享。它是一个具有特殊并发状态实现的PinnedConcurrentCol
。底层的PinnedVec
和并发袋可以相互转换。这个集合的主要目标是实现高效的并行操作,同时具有非常简单的实现。 - 高效:
ConcurrentOrderedBag
是一个无锁结构,适合并发、免拷贝和高性能增长,同时允许按所需顺序收集结果。
安全要求
与 ConcurrentBag
和 ConcurrentVec
不同,向 CollectionOrderedBag
添加元素是通过 unsafe
设置器方法实现的,它允许在任何顺序的袋中写入任何位置。为了安全地使用袋子,调用者应满足以下两个安全要求
- 每个位置只写入一次,因此不存在竞争条件。
- 在调用
into_inner
以获取收集元素的底层向量时,袋中不得包含任何空隙。- 设
m
为我们写入元素的位置的最大索引。 - 袋子假定向量的长度等于
m + 1
。 - 因此,它期望恰好写入
m + 1
个元素到袋子中。 - 如果也满足第一个条件;那么,这个条件足以得出袋子没有空隙并且可以展开的结论。
- 设
在某些情况下,满足这两个条件很容易,而在其他情况下则更困难。在复杂情况下,一个不错的方法是将 ConcurrentOrderedBag
与一个 ConcurrentIter
配对,以极大地减轻复杂性和安全风险,请参见下面的并行映射示例。
示例
手动示例
在下面的示例中,我们将计算分配到两个线程:第一个线程处理索引为偶数的输入,第二个线程处理索引为奇数的输入。这满足了上面提到的安全要求。
use orx_concurrent_ordered_bag::*;
let n = 1024;
let evens_odds = ConcurrentOrderedBag::new();
// just take a reference and share among threads
let bag = &evens_odds;
std::thread::scope(|s| {
s.spawn(move || {
for i in (0..n).filter(|x| x % 2 == 0) {
unsafe { bag.set_value(i, i as i32) };
}
});
s.spawn(move || {
for i in (0..n).filter(|x| x % 2 == 1) {
unsafe { bag.set_value(i, -(i as i32)) };
}
});
});
let vec = unsafe { evens_odds.into_inner().unwrap_only_if_counts_match() };
assert_eq!(vec.len(), n);
for i in 0..n {
if i % 2 == 0 {
assert_eq!(vec[i], i as i32);
} else {
assert_eq!(vec[i], -(i as i32));
}
}
请注意,只要满足无间隙和一次写入保证,ConcurrentOrderedBag
在写入顺序方面非常灵活。考虑以下示例。我们仅启动一个线程来两次写入集合的末尾。然后我们启动许多其他线程来填充集合的开头。这可以正常工作,无需任何锁或等待。
use orx_concurrent_ordered_bag::*;
let n = 1024;
let num_additional_threads = 4;
let bag = ConcurrentOrderedBag::new();
let con_bag = &bag;
std::thread::scope(|s| {
s.spawn(move || {
// start writing to the end
unsafe { con_bag.set_value(n - 1, 42) };
});
for thread in 0..num_additional_threads {
s.spawn(move || {
// then fill the rest concurrently from the beginning
for i in (0..(n - 1)).filter(|i| i % num_additional_threads == thread) {
unsafe { con_bag.set_value(i, i as i32) };
}
});
}
});
let vec = unsafe { bag.into_inner().unwrap_only_if_counts_match() };
assert_eq!(vec.len(), n);
for i in 0..(n - 1) {
assert_eq!(vec[i], i as i32);
}
assert_eq!(vec[n - 1], 42);
这些示例代表了一些可以在线程之间简单分割工作并满足安全要求的情况。在一般情况下,需要特别注意才能满足安全要求。通过在输入端将 ConcurrentOrderedBag
与 ConcurrentIter
配对,可以显著避免这种复杂性和安全风险。
使用 ConcurrentIter
的并行映射
并行映射操作是我们关心收集元素顺序的情况之一,因此,ConcurrentBag
就不再适用了。另一方面,使用 ConcurrentOrderedBag
和 ConcurrentIter
可以实现一个非常简单但高效的实现。
use orx_concurrent_ordered_bag::*;
use orx_concurrent_iter::*;
fn parallel_map<In, Out, Map, Inputs>(
num_threads: usize,
inputs: Inputs,
map: &Map,
) -> ConcurrentOrderedBag<Out>
where
Inputs: ConcurrentIter<Item = In>,
Map: Fn(In) -> Out + Send + Sync,
Out: Send + Sync,
{
let outputs = ConcurrentOrderedBag::new();
let inputs = &inputs;
let out = &outputs;
std::thread::scope(|s| {
for _ in 0..num_threads {
s.spawn(|| {
while let Some(next) = inputs.next_id_and_value() {
unsafe { out.set_value(next.idx, map(next.value)) };
}
});
}
});
outputs
}
let len = 2465;
let input: Vec<_> = (0..len).map(|x| x.to_string()).collect();
let bag = parallel_map(4, input.into_con_iter(), &|x| x.to_string().len());
let output = unsafe { bag.into_inner().unwrap_only_if_counts_match() };
assert_eq!(output.len(), len);
for (i, value) in output.iter().enumerate() {
assert_eq!(value, &i.to_string().len());
}
如您所见,无需手动工作或关注即可满足安全要求。迭代器中的每个元素都按顺序处理和写入,就像在顺序实现中一样。
使用 ConcurrentIter
的并行映射
对上述并行映射实现进行进一步性能改进的方法是将任务在多个线程中分块分配。这种方法的目标是避免伪共享,您可以在 此处 了解更多详情。这可以通过将 set_values
方法与 ConcurrentOrderedBag
配对,而不是与 ConcurrentIter 配对来实现。
use orx_concurrent_ordered_bag::*;
use orx_concurrent_iter::*;
fn parallel_map<In, Out, Map, Inputs>(
num_threads: usize,
inputs: Inputs,
map: &Map,
chunk_size: usize,
) -> ConcurrentOrderedBag<Out>
where
Inputs: ConcurrentIter<Item = In>,
Map: Fn(In) -> Out + Send + Sync,
Out: Send + Sync,
{
let outputs = ConcurrentOrderedBag::new();
let inputs = &inputs;
let out = &outputs;
std::thread::scope(|s| {
for _ in 0..num_threads {
s.spawn(|| {
while let Some(next) = inputs.next_chunk(chunk_size) {
unsafe { out.set_values(next.begin_idx, next.values.map(map)) };
}
});
}
});
outputs
}
let len = 2465;
let input: Vec<_> = (0..len).map(|x| x.to_string()).collect();
let bag = parallel_map(4, input.into_con_iter(), &|x| x.to_string().len(), 64);
let output = unsafe { bag.into_inner().unwrap_only_if_counts_match() };
for (i, value) in output.iter().enumerate() {
assert_eq!(value, &i.to_string().len());
}
并发状态和属性
并发状态简单地通过原子容量进行建模。此状态与 PinnedConcurrentCol
的组合导致以下属性
- 写入集合的位置不会阻止其他写入,可以同时发生多个写入。
- 调用者需要保证每个位置只写入一次。
- ⇒ 调用者负责避免写 & 写冲突。
- 一次只能发生一个增长。
- 读取只能在将袋子转换为底层的
PinnedVec
之后进行。 - ⇒ 不存在读 & 写冲突。
并发友元集合
ConcurrentBag |
ConcurrentVec |
ConcurrentOrderedBag |
|
---|---|---|---|
写入 | 通过 push 或 extend 方法确保每个元素只写入一次 |
通过 push 或 extend 方法确保每个元素只写入一次 |
有两个不同之处。首先,一个位置可以多次写入。其次,可以使用 set_value 和 set_values 方法在任意时刻以任意顺序写入袋中的任意元素。这提供了极大的灵活性,同时将安全责任移交给调用者;因此,设置方法都是 unsafe 。 |
读取 | 主要是只写集合。通过 unsafe get 和 iter 方法进行已推送元素的并发读取。调用者需要避免冲突。 |
写入和读取集合。已推入的元素可以通过 get 和 iter 方法安全地读取。 |
目前不支持。由于写入操作的灵活但不可靠的特性,作为调用者很难提供所需的安全性保证。 |
元素排序 | 由于写入操作是通过通过 push 和 extend 将元素添加到固定向量的末尾来完成的,因此对将元素收集到袋子中的代码的多线程执行可能会导致收集的元素顺序不同。 |
由于写入操作是通过通过 push 和 extend 将元素添加到固定向量的末尾来完成的,因此对将元素收集到袋子中的代码的多线程执行可能会导致收集的元素顺序不同。 |
这是这个集合的主要目标,允许同时以正确的顺序收集元素。虽然这并不简单;当与 ConcurrentOrderedBag 一起使用 ConcurrentIter 时,几乎可以轻松实现。 |
into_inner |
一旦完成并发收集,可以安全且成本低廉地将袋子转换为它底层的 PinnedVec<T> 。 |
一旦完成并发收集,向量可以安全地转换为它底层的 PinnedVec<ConcurrentOption<T>> 。注意,元素被包装在 ConcurrentOption 中,以便提供线程安全的并发读写操作。 |
通过灵活的设置器进行增长,允许写入任何位置,ConcurrentOrderedBag 有包含间隙的风险。into_inner 调用提供了一些有用的度量,例如是否推入的元素数量与向量的最大索引匹配;然而,它不能保证袋子是无间隙的。调用者需要承担责任,通过 unsafe 调用来解包以获取底层的 PinnedVec<T> 。 |
许可
此库根据 MIT 许可证授权。有关详细信息,请参阅 LICENSE。
依赖关系
~460KB