#read-write #vec #lock-free #atomic #reader

orx-concurrent-vec

一个高效、便捷且轻量级的仅增长读写并发数据结构,允许高性能并发集合。

19 个稳定版本

新版本 2.5.0 2024 年 8 月 22 日
2.2.0 2024 年 7 月 25 日
1.10.0 2024 年 7 月 10 日
1.2.0 2024 年 3 月 28 日
0.1.0 2024 年 3 月 11 日

并发 中排名 196

Download history 75/week @ 2024-05-03 71/week @ 2024-05-10 10/week @ 2024-05-17 3/week @ 2024-05-24 211/week @ 2024-06-14 9/week @ 2024-06-21 181/week @ 2024-06-28 117/week @ 2024-07-05 45/week @ 2024-07-12 392/week @ 2024-07-19 750/week @ 2024-07-26 788/week @ 2024-08-02 450/week @ 2024-08-09 513/week @ 2024-08-16

每月下载量 2,654

MIT 许可证

97KB
291 行代码(不包括注释)

orx-concurrent-vec

orx-concurrent-vec crate orx-concurrent-vec documentation

一个高效、便捷且轻量级的仅增长读写并发数据结构,允许高性能并发集合。

  • 便捷:`ConcurrentVec` 可以简单地作为一个共享引用在线程之间安全地共享。它是一个具有特殊并发状态实现的 PinnedConcurrentCol。底层的 PinnedVec 和并发包可以相互转换。
  • 高效:`ConcurrentVec` 是一个无锁结构,适合并发、无复制和高性能增长。

示例

底层的 PinnedVec 保证使得使用共享引用安全增长变得简单,这导致以下便捷的 API,如下所示。

以下示例演示了同时使用两个集合。

  • 使用 ConcurrentVec 来收集随机间隔内进行的测量。由于在收集测量的同时,另一个线程将读取它们来计算统计数据(读/写),因此使用了 ConcurrentVec。
  • 使用 ConcurrentBag 来收集在定义的时间间隔内从测量中获得的统计数据。由于我们不需要在进程完成之前读取统计数据,因此使用了 ConcurrentBag(仅写)。
use orx_concurrent_vec::*;
use orx_concurrent_bag::*;
use std::time::Duration;

#[derive(Debug, Default)]
struct Metric {
    sum: i32,
    count: i32,
}
impl Metric {
    fn aggregate(self, value: &i32) -> Self {
        Self {
            sum: self.sum + value,
            count: self.count + 1,
        }
    }

    fn average(&self) -> i32 {
        match self.count {
            0 => 0,
            _ => self.sum / self.count,
        }
    }
}

// record measurements in random intervals, roughly every 2ms (read & write -> ConcurrentVec)
let measurements = ConcurrentVec::new();
let rf_measurements = &measurements; // just &self to share among threads

// collect metrics every 100 milliseconds (only write -> ConcurrentBag)
let metrics = ConcurrentBag::new();
let rf_metrics = &metrics; // just &self to share among threads

std::thread::scope(|s| {
    // thread to store measurements as they arrive
    s.spawn(move || {
        for i in 0..100 {
            std::thread::sleep(Duration::from_millis(i % 5));

            // collect measurements and push to measurements vec
            // simply by calling `push`
            rf_measurements.push(i as i32);
        }
    });

    // thread to collect metrics every 100 milliseconds
    s.spawn(move || {
        for _ in 0..10 {
            // safely read from measurements vec to compute the metric
            let metric = rf_measurements
                .iter()
                .fold(Metric::default(), |x, value| x.aggregate(value));

            // push result to metrics bag
            rf_metrics.push(metric);

            std::thread::sleep(Duration::from_millis(100));
        }
    });
});

let measurements: Vec<_> = measurements
    .into_inner()
    .into_iter()
    .map(|x| x.unwrap())
    .collect();
dbg!(&measurements);

let averages: Vec<_> = metrics
    .into_inner()
    .into_iter()
    .map(|x| x.average())
    .collect();
println!("averages = {:?}", &averages);

assert_eq!(measurements.len(), 100);
assert_eq!(averages.len(), 10);

并发模型的属性

ConcurrentVec 包装了一个由 ConcurrentOption 元素组成的 PinnedVec。这种组合导致以下安全保证

  • 元素的 ConcurrentOption 包装允许线程安全地初始化,而其他线程可以安全地尝试读取数据。这对于并发读写操作是必需的。
  • 底层的 PinnedVec 确保推入元素的记忆位置不会改变。这允许无复制和安全地并发增长。

ConcurrentVec 的并发模型具有以下特性:

  • 向集合中写入位置不会阻塞其他写入,可以并发发生多个写入。
  • 每个位置正好写入一次 ⟹ 不存在写入和写入的竞态条件。
  • 同一时间只能发生一次容量增长。增长是复制的,并且不会改变已推送元素的内存位置。
  • 底层固定向量始终有效,并且可以通过 into_inner(self) 在任何时间取出。
  • 尝试在值正在写入时从位置读取将安全地返回 None。如果元素已正确初始化,其他线程将安全地读取 Some(T)
  • 换句话说,一个位置将被正好写入一次,但可以被并发读取多次。ConcurrentVec 通过防止数据竞争,确保只有完全写入后才能读取,从而不存在读取和写入的竞态条件。

基准测试

使用 push 的性能

您可以在 benches/collect_with_push.rs 中找到基准测试的详细信息。

在实验中,使用了 rayon 的并行迭代器、AppendOnlyVecConcurrentVecpush 方法来从多个线程收集结果。此外,还测试了 ConcurrentVec 的不同底层固定向量。我们观察到:

  • 默认的 Doubling 增长策略导致结果的高效并发收集。请注意,这个变体不需要任何输入来构建。
  • 另一方面,Linear 增长策略表现显著更好。请注意,此参数的值意味着底层 SplitVec 的每个片段将具有 2^12 (4096) 个元素的能力。改进的潜在原因是更少的浪费,并且可以通过对要推送的数据的轻微了解而优先考虑。
  • 最后,Fixed 增长策略最不灵活,需要完全了解硬约束的容量(如果超出将引发恐慌)。由于它没有优于 Linear,即使我们有完美的知识,我们也不一定优先考虑 Fixed
rayon/num_threads=8,num_items_per_thread-type=[16384]
                        time:   [16.057 ms 16.390 ms 16.755 ms]
append_only_vec/num_threads=8,num_items_per_thread-type=[16384]
                        time:   [23.679 ms 24.480 ms 25.439 ms]
concurrent_vec(Doubling)/num_threads=8,num_items_per_thread-type=[16384]
                        time:   [14.055 ms 14.287 ms 14.526 ms]
concurrent_vec(Linear(12))/num_threads=8,num_items_per_thread-type=[16384]
                        time:   [8.4686 ms 8.6396 ms 8.8373 ms]
concurrent_vec(Fixed)/num_threads=8,num_items_per_thread-type=[16384]
                        time:   [9.8297 ms 9.9945 ms 10.151 ms]

rayon/num_threads=8,num_items_per_thread-type=[65536]
                        time:   [43.118 ms 44.065 ms 45.143 ms]
append_only_vec/num_threads=8,num_items_per_thread-type=[65536]
                        time:   [110.66 ms 114.09 ms 117.94 ms]
concurrent_vec(Doubling)/num_threads=8,num_items_per_thread-type=[65536]
                        time:   [61.461 ms 62.547 ms 63.790 ms]
concurrent_vec(Linear(12))/num_threads=8,num_items_per_thread-type=[65536]
                        time:   [37.420 ms 37.740 ms 38.060 ms]
concurrent_vec(Fixed)/num_threads=8,num_items_per_thread-type=[65536]
                        time:   [43.017 ms 43.584 ms 44.160 ms]

可以通过使用 extend 方法而不是 push 来进一步提高性能。您可以在下一个子节中看到结果,并在 ConcurrentBag性能说明 中找到详细信息,该说明具有类似的特征。

使用 extend 的性能

您可以在 benches/collect_with_extend.rs 中找到基准测试的详细信息。

在这个后续实验中,唯一的区别是我们使用 extend 而不是 pushConcurrentVec。预期这种方法将解决由于伪共享导致的性能下降。实验结果表明,预期是正确的,并且似乎有重大影响。

  • 扩展而不是推送可能将增长性能加倍。
  • 通过 64 个元素或 65536 个元素的批次扩展之间没有显著差异。我们不需要精心调整的数字,足够大的批次大小似乎就足够了。
  • 并非所有场景都允许批量扩展;然而,显著的性能提升使得在可能的情况下选择扩展是更优的。
rayon/num_threads=8,num_items_per_thread-type=[16384]
                        time:   [16.102 ms 16.379 ms 16.669 ms]
append_only_vec/num_threads=8,num_items_per_thread-type=[16384]
                        time:   [27.922 ms 28.611 ms 29.356 ms]
concurrent_vec(Doubling) | batch-size=64/num_threads=8,num_items_per_thread-type=[16384]
                        time:   [8.7361 ms 8.8347 ms 8.9388 ms]
concurrent_vec(Linear(12)) | batch-size=64/num_threads=8,num_items_per_thread-type=[16384]
                        time:   [4.2035 ms 4.2975 ms 4.4012 ms]
concurrent_vec(Fixed) | batch-size=64/num_threads=8,num_items_per_thread-type=[16384]
                        time:   [4.9670 ms 5.0928 ms 5.2217 ms]
concurrent_vec(Doubling) | batch-size=16384/num_threads=8,num_items_per_thread-type=[16384]
                        time:   [9.2441 ms 9.3988 ms 9.5594 ms]
concurrent_vec(Linear(12)) | batch-size=16384/num_threads=8,num_items_per_thread-type=[16384]
                        time:   [3.5663 ms 3.6527 ms 3.7405 ms]
concurrent_vec(Fixed) | batch-size=16384/num_threads=8,num_items_per_thread-type=[16384]
                        time:   [5.0839 ms 5.2169 ms 5.3576 ms]

rayon/num_threads=8,num_items_per_thread-type=[65536]
                        time:   [47.861 ms 48.836 ms 49.843 ms]
append_only_vec/num_threads=8,num_items_per_thread-type=[65536]
                        time:   [125.52 ms 128.89 ms 132.41 ms]
concurrent_vec(Doubling) | batch-size=64/num_threads=8,num_items_per_thread-type=[65536]
                        time:   [42.516 ms 43.097 ms 43.715 ms]
concurrent_vec(Linear(12)) | batch-size=64/num_threads=8,num_items_per_thread-type=[65536]
                        time:   [20.025 ms 20.269 ms 20.521 ms]
concurrent_vec(Fixed) | batch-size=64/num_threads=8,num_items_per_thread-type=[65536]
                        time:   [25.284 ms 25.818 ms 26.375 ms]
concurrent_vec(Doubling) | batch-size=65536/num_threads=8,num_items_per_thread-type=[65536]
                        time:   [39.371 ms 39.887 ms 40.470 ms]
concurrent_vec(Linear(12)) | batch-size=65536/num_threads=8,num_items_per_thread-type=[65536]
                        time:   [17.808 ms 17.923 ms 18.046 ms]
concurrent_vec(Fixed) | batch-size=65536/num_threads=8,num_items_per_thread-type=[65536]
                        time:   [24.291 ms 24.702 ms 25.133 ms]

并发友元集合

ConcurrentBag ConcurrentVec ConcurrentOrderedBag
写入 通过pushextend方法确保每个元素仅写入一次 通过pushextend方法确保每个元素仅写入一次 有两点不同。首先,一个位置可以多次写入。其次,可以使用set_valueset_values方法在任何时候以任何顺序写入袋中的任意元素。这提供了极大的灵活性,同时将安全性责任移交给调用者;因此,设置方法被标记为unsafe
读取 主要是一个只写集合。已推送元素的并发读取通过unsafegetiter方法进行。调用者需要避免竞争条件。 一个读写集合。已推送的元素可以通过getiter方法安全地读取。 目前不支持。由于写入操作灵活但不可靠,很难为调用者提供所需的安全性保证。
元素的排序 由于写入操作是通过将元素添加到固定向量的末尾来进行的,即通过pushextend,因此两个多线程执行的代码可能将元素以不同的顺序收集到集合中。 由于写入操作是通过将元素添加到固定向量的末尾来进行的,即通过pushextend,因此两个多线程执行的代码可能将元素以不同的顺序收集到集合中。 这是此集合的主要目标,允许并发且正确地收集元素。虽然这并不明显;当使用ConcurrentOrderedBagConcurrentIter时,几乎可以轻易实现。
into_inner 一旦并发收集完成,袋可以安全且低成本地转换为其底层的PinnedVec<T> 一旦并发收集完成,vec可以安全地转换为其底层的PinnedVec<ConcurrentOption<T>>。注意,元素被包裹在ConcurrentOption中以提供线程安全的并发读写操作。 通过灵活的设置器进行扩展,允许写入任何位置,ConcurrentOrderedBag存在包含空隙的风险。into_inner调用提供了一些有用的指标,例如推送的元素数量是否与向量的最大索引匹配;但是,它不能保证袋无空隙。调用者需要负责通过一个unsafe调用来解包以获取底层的PinnedVec<T>

贡献

欢迎贡献!如果您发现错误,有疑问或认为可以改进,请打开一个问题或创建一个PR。

许可

此库受MIT许可。有关详细信息,请参阅LICENSE。

依赖

~575KB