#options #lock-free #atomic #data-structures #self-reference

orx-concurrent-option

ConcurrentOption 是一个无锁并发读写选项类型

2 个稳定版本

1.1.0 2024年8月13日
1.0.0 2024年8月12日

678并发 中排名

Download history 230/week @ 2024-08-09 42/week @ 2024-08-16

272 每月下载量
用于 orx-concurrent-vec

MIT 许可证

115KB
822

orx-concurrent-option

orx-concurrent-option crate orx-concurrent-option documentation

ConcurrentOption 是一个线程安全且无锁的读写选项类型。

ConcurrentOption 方法分组

ConcurrentOption 方法基于标准的 Option,只在细微之处有所不同,以更好地适应并发程序。

例如,代替 fn map<U, F>(self, f: F) -> Option<U>

  • ConcurrentOption 实现 fn map<U, F>(&self, f: F) -> Option<U>,它专门用于映射引用,同时保证没有数据竞争。
  • 注意,当拥有所有权时,可以通过 maybe.exclusive_take().map(f) 容易地获得先前结果。

⬤ 需要self或&mut self的方法

这些方法通过借用检查器保证安全,并且它们的行为与原始版本类似。

为了将它们与线程安全的版本区分开来,需要&mut self的方法以 exclusive_ 为前缀。

此类方法包括 unwrapexpectexclusive_mutexclusive_take

⬤ 调用方法的安全版本

提供了线程安全的变体,可以使用共享的 &self 引用安全地调用。

一些示例包括 taketake_ifreplace 等。

这些方法保证在突变期间没有其他突变或读取操作。

⬤ 读取方法的线程安全版本

存在访问底层值以计算结果的线程安全方法变体。

一些示例包括 is_somemapand_then 等。

这些方法保证在读取数据时没有突变。

⬤ 部分线程安全的方法

返回共享引用 &T 或可变引用 &mut T 到可选值底层值的这些方法被标记为 unsafe

这些方法内部保证在不存在数据竞争的情况下创建有效的引用。从这个意义上讲,它们是线程安全的。

另一方面,由于它们返回引用,引用会泄漏到类型外部。后续的突变可能导致数据竞争,从而引起未定义行为。

一些示例方法包括 as_refas_derefinsert 等。

⬤ 允许手动控制并发的函数

ConcurrentOption 还公开了接受一个 std::sync::atomic::Ordering 并将控制权交给调用者的方法。这些方法除了状态外都带有 with_order 后缀。

这样的方法包括 stateas_ref_with_orderget_raw_with_orderclone_with_order 等。

示例

并发读取 & 写入

以下示例演示了使用多个读取和写入线程在安全读取底层数据的同时轻松突变选项的状态的便捷性。

use orx_concurrent_option::*;
use std::time::Duration;

enum MutOperation {
    InitializeIfNone,
    UpdateIfSome,
    Replace,
    Take,
    TakeIf,
}

impl MutOperation {
    fn new(i: usize) -> Self {
        match i % 5 {
            0 => Self::InitializeIfNone,
            1 => Self::UpdateIfSome,
            2 => Self::Replace,
            3 => Self::Take,
            _ => Self::TakeIf,
        }
    }
}

let num_readers = 8;
let num_writers = 8;

let values = vec![ConcurrentOption::<String>::none(); 8];

std::thread::scope(|s| {
    for _ in 0..num_readers {
        s.spawn(|| {
            for _ in 0..100 {
                std::thread::sleep(Duration::from_millis(100));
                let mut num_chars = 0;
                for maybe in &values {
                    // concurrently access the value
                    num_chars += maybe.map(|x| x.len()).unwrap_or(0);
                }
                assert!(num_chars <= 100);
            }
        });
    }

    for _ in 0..num_writers {
        s.spawn(|| {
            for i in 0..100 {
                std::thread::sleep(Duration::from_millis(100));
                let e = i % values.len();

                // concurrently update the option
                match MutOperation::new(i) {
                    MutOperation::InitializeIfNone => {
                        values[e].initialize_if_none(e.to_string());
                    }
                    MutOperation::UpdateIfSome => {
                        values[e].update_if_some(|x| *x = format!("{}!", x));
                    }
                    MutOperation::Replace => {
                        values[e].replace(e.to_string());
                    }
                    MutOperation::Take => {
                        _ = values[e].take();
                    }
                    MutOperation::TakeIf => _ = values[e].take_if(|x| x.len() < 2),
                }
                let e = i % values.len();
                _ = values[e].initialize_if_none(e.to_string());
            }
        });
    }
})

并发初始化 & 读取

选项的一个常见用例是模拟延迟初始化;而不是并发突变。换句话说,我们从一个 None 变体开始,在某个时刻我们接收到值并将我们的选项转换为 Some(value),之后它将保持为 Some(value) 直到其生命周期结束。

此场景演示了我们可以在安全地将引用泄漏到可选之外的情况。

  • ConcurrentOption 提供的所有引用在获取时都是有效的且没有数据竞争。换句话说,我们只能在值初始化后获取引用;即,选项变为 Some(value)。
  • 由于我们将在初始化后永远不会突变选项,因此我们可以安全地保持对其的引用而不用担心数据竞争。
    • 然而,进一步的突变不是我们作为调用者的承诺和责任。ConcurrentOption 无法控制泄漏的引用;因此,获取引用是通过不安全的 as_ref 方法实现的。

对于此场景,我们可以使用两个匹配的方法

  • initialize_if_none 是一个线程安全的方法,用于将选项的值初始化为给定的值。在 Some 变体上调用此方法是安全的,它不会有任何影响。此外,它确保在值完全初始化之前没有读者可以访问该值。
  • as_ref 方法返回底层值的引用,如果选项是 Some 变体。否则,如果值尚未初始化,我们将安全地接收到 None。注意,如果我们想手动模拟访问顺序,我们也可以使用 as_ref_with_orderAcquireSeqCst 顺序配合使用。
use orx_concurrent_option::*;

fn reader(maybe: &ConcurrentOption<String>) {
    let mut is_none_at_least_once = false;
    let mut is_seven_at_least_once = false;
    for _ in 0..100 {
        std::thread::sleep(std::time::Duration::from_millis(100));

        let read = unsafe { maybe.as_ref() };
        let is_none = read.is_none();
        let is_seven = read == Some(&7.to_string());

        assert!(is_none || is_seven);

        is_none_at_least_once |= is_none;
        is_seven_at_least_once |= is_seven;
    }
    assert!(is_none_at_least_once && is_seven_at_least_once);
}

fn initializer(maybe: &ConcurrentOption<String>) {
    for _ in 0..50 {
        // wait for a while to simulate a delay
        std::thread::sleep(std::time::Duration::from_millis(100));
    }

    let _ = maybe.initialize_if_none(7.to_string());

    for _ in 0..50 {
        // it is safe to call `initialize_if_none` on Some variant
        // it will do nothing
        let inserted = maybe.initialize_if_none(1_000_000.to_string());
        assert!(!inserted);
    }
}

let num_readers = 8;
let num_writers = 8;

let maybe = ConcurrentOption::<String>::none();
let maybe_ref = &maybe;

std::thread::scope(|s| {
    for _ in 0..num_readers {
        s.spawn(|| reader(maybe_ref));
    }
    for _ in 0..num_writers {
        s.spawn(|| initializer(maybe_ref));
    }
});

assert_eq!(maybe.unwrap(), 7.to_string());

贡献

欢迎贡献!如果您注意到错误,有任何问题或认为某些内容可以改进,请打开 问题 或创建一个 PR。

许可

此库受 MIT 许可证许可。有关详细信息,请参阅 LICENSE。

无运行时依赖