19 个稳定版本
新版本 2.5.0 | 2024 年 8 月 22 日 |
---|---|
2.2.0 | 2024 年 7 月 25 日 |
1.10.0 | 2024 年 7 月 10 日 |
1.2.0 | 2024 年 3 月 28 日 |
0.1.0 | 2024 年 3 月 11 日 |
在 并发 中排名 196
每月下载量 2,654
97KB
291 行代码(不包括注释)
orx-concurrent-vec
一个高效、便捷且轻量级的仅增长读写并发数据结构,允许高性能并发集合。
- 便捷:`ConcurrentVec` 可以简单地作为一个共享引用在线程之间安全地共享。它是一个具有特殊并发状态实现的
PinnedConcurrentCol
。底层的PinnedVec
和并发包可以相互转换。 - 高效:`ConcurrentVec` 是一个无锁结构,适合并发、无复制和高性能增长。
示例
底层的 PinnedVec
保证使得使用共享引用安全增长变得简单,这导致以下便捷的 API,如下所示。
以下示例演示了同时使用两个集合。
- 使用
ConcurrentVec
来收集随机间隔内进行的测量。由于在收集测量的同时,另一个线程将读取它们来计算统计数据(读/写),因此使用了 ConcurrentVec。 - 使用
ConcurrentBag
来收集在定义的时间间隔内从测量中获得的统计数据。由于我们不需要在进程完成之前读取统计数据,因此使用了 ConcurrentBag(仅写)。
use orx_concurrent_vec::*;
use orx_concurrent_bag::*;
use std::time::Duration;
#[derive(Debug, Default)]
struct Metric {
sum: i32,
count: i32,
}
impl Metric {
fn aggregate(self, value: &i32) -> Self {
Self {
sum: self.sum + value,
count: self.count + 1,
}
}
fn average(&self) -> i32 {
match self.count {
0 => 0,
_ => self.sum / self.count,
}
}
}
// record measurements in random intervals, roughly every 2ms (read & write -> ConcurrentVec)
let measurements = ConcurrentVec::new();
let rf_measurements = &measurements; // just &self to share among threads
// collect metrics every 100 milliseconds (only write -> ConcurrentBag)
let metrics = ConcurrentBag::new();
let rf_metrics = &metrics; // just &self to share among threads
std::thread::scope(|s| {
// thread to store measurements as they arrive
s.spawn(move || {
for i in 0..100 {
std::thread::sleep(Duration::from_millis(i % 5));
// collect measurements and push to measurements vec
// simply by calling `push`
rf_measurements.push(i as i32);
}
});
// thread to collect metrics every 100 milliseconds
s.spawn(move || {
for _ in 0..10 {
// safely read from measurements vec to compute the metric
let metric = rf_measurements
.iter()
.fold(Metric::default(), |x, value| x.aggregate(value));
// push result to metrics bag
rf_metrics.push(metric);
std::thread::sleep(Duration::from_millis(100));
}
});
});
let measurements: Vec<_> = measurements
.into_inner()
.into_iter()
.map(|x| x.unwrap())
.collect();
dbg!(&measurements);
let averages: Vec<_> = metrics
.into_inner()
.into_iter()
.map(|x| x.average())
.collect();
println!("averages = {:?}", &averages);
assert_eq!(measurements.len(), 100);
assert_eq!(averages.len(), 10);
并发模型的属性
ConcurrentVec 包装了一个由 ConcurrentOption
元素组成的 PinnedVec
。这种组合导致以下安全保证
- 元素的
ConcurrentOption
包装允许线程安全地初始化,而其他线程可以安全地尝试读取数据。这对于并发读写操作是必需的。 - 底层的
PinnedVec
确保推入元素的记忆位置不会改变。这允许无复制和安全地并发增长。
ConcurrentVec 的并发模型具有以下特性:
- 向集合中写入位置不会阻塞其他写入,可以并发发生多个写入。
- 每个位置正好写入一次 ⟹ 不存在写入和写入的竞态条件。
- 同一时间只能发生一次容量增长。增长是复制的,并且不会改变已推送元素的内存位置。
- 底层固定向量始终有效,并且可以通过
into_inner(self)
在任何时间取出。 - 尝试在值正在写入时从位置读取将安全地返回
None
。如果元素已正确初始化,其他线程将安全地读取Some(T)
。 - 换句话说,一个位置将被正好写入一次,但可以被并发读取多次。ConcurrentVec 通过防止数据竞争,确保只有完全写入后才能读取,从而不存在读取和写入的竞态条件。
基准测试
使用 push
的性能
您可以在 benches/collect_with_push.rs 中找到基准测试的详细信息。
在实验中,使用了 rayon
的并行迭代器、AppendOnlyVec
和 ConcurrentVec
的 push
方法来从多个线程收集结果。此外,还测试了 ConcurrentVec
的不同底层固定向量。我们观察到:
- 默认的
Doubling
增长策略导致结果的高效并发收集。请注意,这个变体不需要任何输入来构建。 - 另一方面,
Linear
增长策略表现显著更好。请注意,此参数的值意味着底层SplitVec
的每个片段将具有 2^12 (4096) 个元素的能力。改进的潜在原因是更少的浪费,并且可以通过对要推送的数据的轻微了解而优先考虑。 - 最后,
Fixed
增长策略最不灵活,需要完全了解硬约束的容量(如果超出将引发恐慌)。由于它没有优于Linear
,即使我们有完美的知识,我们也不一定优先考虑Fixed
。
rayon/num_threads=8,num_items_per_thread-type=[16384]
time: [16.057 ms 16.390 ms 16.755 ms]
append_only_vec/num_threads=8,num_items_per_thread-type=[16384]
time: [23.679 ms 24.480 ms 25.439 ms]
concurrent_vec(Doubling)/num_threads=8,num_items_per_thread-type=[16384]
time: [14.055 ms 14.287 ms 14.526 ms]
concurrent_vec(Linear(12))/num_threads=8,num_items_per_thread-type=[16384]
time: [8.4686 ms 8.6396 ms 8.8373 ms]
concurrent_vec(Fixed)/num_threads=8,num_items_per_thread-type=[16384]
time: [9.8297 ms 9.9945 ms 10.151 ms]
rayon/num_threads=8,num_items_per_thread-type=[65536]
time: [43.118 ms 44.065 ms 45.143 ms]
append_only_vec/num_threads=8,num_items_per_thread-type=[65536]
time: [110.66 ms 114.09 ms 117.94 ms]
concurrent_vec(Doubling)/num_threads=8,num_items_per_thread-type=[65536]
time: [61.461 ms 62.547 ms 63.790 ms]
concurrent_vec(Linear(12))/num_threads=8,num_items_per_thread-type=[65536]
time: [37.420 ms 37.740 ms 38.060 ms]
concurrent_vec(Fixed)/num_threads=8,num_items_per_thread-type=[65536]
time: [43.017 ms 43.584 ms 44.160 ms]
可以通过使用 extend
方法而不是 push
来进一步提高性能。您可以在下一个子节中看到结果,并在 ConcurrentBag
的 性能说明 中找到详细信息,该说明具有类似的特征。
使用 extend
的性能
您可以在 benches/collect_with_extend.rs 中找到基准测试的详细信息。
在这个后续实验中,唯一的区别是我们使用 extend
而不是 push
与 ConcurrentVec
。预期这种方法将解决由于伪共享导致的性能下降。实验结果表明,预期是正确的,并且似乎有重大影响。
- 扩展而不是推送可能将增长性能加倍。
- 通过 64 个元素或 65536 个元素的批次扩展之间没有显著差异。我们不需要精心调整的数字,足够大的批次大小似乎就足够了。
- 并非所有场景都允许批量扩展;然而,显著的性能提升使得在可能的情况下选择扩展是更优的。
rayon/num_threads=8,num_items_per_thread-type=[16384]
time: [16.102 ms 16.379 ms 16.669 ms]
append_only_vec/num_threads=8,num_items_per_thread-type=[16384]
time: [27.922 ms 28.611 ms 29.356 ms]
concurrent_vec(Doubling) | batch-size=64/num_threads=8,num_items_per_thread-type=[16384]
time: [8.7361 ms 8.8347 ms 8.9388 ms]
concurrent_vec(Linear(12)) | batch-size=64/num_threads=8,num_items_per_thread-type=[16384]
time: [4.2035 ms 4.2975 ms 4.4012 ms]
concurrent_vec(Fixed) | batch-size=64/num_threads=8,num_items_per_thread-type=[16384]
time: [4.9670 ms 5.0928 ms 5.2217 ms]
concurrent_vec(Doubling) | batch-size=16384/num_threads=8,num_items_per_thread-type=[16384]
time: [9.2441 ms 9.3988 ms 9.5594 ms]
concurrent_vec(Linear(12)) | batch-size=16384/num_threads=8,num_items_per_thread-type=[16384]
time: [3.5663 ms 3.6527 ms 3.7405 ms]
concurrent_vec(Fixed) | batch-size=16384/num_threads=8,num_items_per_thread-type=[16384]
time: [5.0839 ms 5.2169 ms 5.3576 ms]
rayon/num_threads=8,num_items_per_thread-type=[65536]
time: [47.861 ms 48.836 ms 49.843 ms]
append_only_vec/num_threads=8,num_items_per_thread-type=[65536]
time: [125.52 ms 128.89 ms 132.41 ms]
concurrent_vec(Doubling) | batch-size=64/num_threads=8,num_items_per_thread-type=[65536]
time: [42.516 ms 43.097 ms 43.715 ms]
concurrent_vec(Linear(12)) | batch-size=64/num_threads=8,num_items_per_thread-type=[65536]
time: [20.025 ms 20.269 ms 20.521 ms]
concurrent_vec(Fixed) | batch-size=64/num_threads=8,num_items_per_thread-type=[65536]
time: [25.284 ms 25.818 ms 26.375 ms]
concurrent_vec(Doubling) | batch-size=65536/num_threads=8,num_items_per_thread-type=[65536]
time: [39.371 ms 39.887 ms 40.470 ms]
concurrent_vec(Linear(12)) | batch-size=65536/num_threads=8,num_items_per_thread-type=[65536]
time: [17.808 ms 17.923 ms 18.046 ms]
concurrent_vec(Fixed) | batch-size=65536/num_threads=8,num_items_per_thread-type=[65536]
time: [24.291 ms 24.702 ms 25.133 ms]
并发友元集合
ConcurrentBag |
ConcurrentVec |
ConcurrentOrderedBag |
|
---|---|---|---|
写入 | 通过push 或extend 方法确保每个元素仅写入一次 |
通过push 或extend 方法确保每个元素仅写入一次 |
有两点不同。首先,一个位置可以多次写入。其次,可以使用set_value 和set_values 方法在任何时候以任何顺序写入袋中的任意元素。这提供了极大的灵活性,同时将安全性责任移交给调用者;因此,设置方法被标记为unsafe 。 |
读取 | 主要是一个只写集合。已推送元素的并发读取通过unsafe 的get 和iter 方法进行。调用者需要避免竞争条件。 |
一个读写集合。已推送的元素可以通过get 和iter 方法安全地读取。 |
目前不支持。由于写入操作灵活但不可靠,很难为调用者提供所需的安全性保证。 |
元素的排序 | 由于写入操作是通过将元素添加到固定向量的末尾来进行的,即通过push 和extend ,因此两个多线程执行的代码可能将元素以不同的顺序收集到集合中。 |
由于写入操作是通过将元素添加到固定向量的末尾来进行的,即通过push 和extend ,因此两个多线程执行的代码可能将元素以不同的顺序收集到集合中。 |
这是此集合的主要目标,允许并发且正确地收集元素。虽然这并不明显;当使用ConcurrentOrderedBag 和ConcurrentIter 时,几乎可以轻易实现。 |
into_inner |
一旦并发收集完成,袋可以安全且低成本地转换为其底层的PinnedVec<T> 。 |
一旦并发收集完成,vec可以安全地转换为其底层的PinnedVec<ConcurrentOption<T>> 。注意,元素被包裹在ConcurrentOption 中以提供线程安全的并发读写操作。 |
通过灵活的设置器进行扩展,允许写入任何位置,ConcurrentOrderedBag 存在包含空隙的风险。into_inner 调用提供了一些有用的指标,例如推送的元素数量是否与向量的最大索引匹配;但是,它不能保证袋无空隙。调用者需要负责通过一个unsafe 调用来解包以获取底层的PinnedVec<T> 。 |
贡献
欢迎贡献!如果您发现错误,有疑问或认为可以改进,请打开一个问题或创建一个PR。
许可
此库受MIT许可。有关详细信息,请参阅LICENSE。
依赖
~575KB