#table #primitive #reader #high #active #read-lock

active_standby

高并发读取的并发原语

28 个版本 (2 个稳定版)

2.0.0 2022年5月31日
1.0.0 2022年1月3日
0.9.2 2021年10月15日
0.8.4 2021年6月27日
0.4.3 2021年3月13日

#237并发

Download history 7/week @ 2024-04-01

每月58 次下载

MIT/Apache

160KB
3.5K SLoC

一个用于高并发读取的库。

该库以内部持有的两个(相同的)表命名

  • 活动 - 这是所有读取者查看的表。这个表永远不会被写锁,因此读取者永远不会遇到竞争。
  • 备用 - 这是写者修改的表。由于读取者在调用 .read() 时会移动到活动表,因此写者获取此表时应面临最小的竞争。

使用此 crate 有两种方式

  1. 直接与 AsLock/AsLockHandle 交互。这更灵活,因为用户可以传递任何他们想要的 struct 并按自己的选择修改它。但是,所有更新都需要通过传递函数而不是通过可变方法(UpdateTables trait)来完成。
  2. 使用由原语构建的集合,但提供了类似于 RwLock<T> 的 API;写者可以直接调用方法,而无需提供 mutator 函数。

有两种风味/模块

  1. 无锁 - 这种变体在性能提升和改变 API 以使其不那么像 RwLock 之间做出权衡。这围绕着 AsLockHandle,从概念上讲类似于 Arc<RwLock>(每个线程/任务需要单独的 AsLockHandle)。
  2. 同步 - 这围绕着使用 AsLock,它旨在感觉像 RwLock。主要区别在于,由于需要保持它们相同,因此您仍然无法直接访问底层表。

最小化竞争的代价是

  1. 内存 - 内部存在用户创建的底层类型的两个副本。这是为了允许总有一个表供读取者无竞争地访问。
  2. 处理器 - 作者必须将所有更新应用两次,一次应用到每个表中。由于读者使用active_table,因此写入者的锁争用应该小于普通RwLock,因此写入时间本身可能会降低。

示例

3种使用模式的示例:构建自己的包装器、使用预构建的集合和使用原语。这些都可以用同步和无锁的方式完成。

use std::thread::sleep;
use std::time::Duration;
use std::sync::Arc;

// Create wrapper class so that users can interact with the active_standby
// struct via a RwLock-like interface. See the implementation of the
// collections for more examples.
mod wrapper {
    use active_standby::UpdateTables;

    active_standby::generate_lockless_aslockhandle!(i32);

    struct AddOne {}

    impl<'a> UpdateTables<'a, i32, ()> for AddOne {
        fn apply_first(&mut self, table: &'a mut i32) {
            *table = *table + 1;
        }
        fn apply_second(mut self, table: &mut i32) {
            self.apply_first(table);
        }
    }

    // Client's must implement the mutable interface that they want to
    // offer users. Non mutable functions are automatic via Deref.
    impl<'w> AsLockWriteGuard<'w> {
        pub fn add_one(&mut self) {
            self.guard.update_tables(AddOne {})
        }
    }
}

pub fn run_wrapper() {
    let table = wrapper::AsLockHandle::new(0);
    let table2 = table.clone();

    let handle = std::thread::spawn(move || {
        while *table2.read() != 1 {
            sleep(Duration::from_micros(100));
        }
    });

    table.write().add_one();
    handle.join();
}

// Use a premade collection which wraps `AsLock<Vec<T>>`, to provide an
// interface akin to `RwLock<Vec<T>>`.
pub fn run_collection() {
    use active_standby::sync::collections::AsVec;

    let table = Arc::new(AsVec::default());
    let table2 = Arc::clone(&table);

    let handle = std::thread::spawn(move || {
        while *table2.read() != vec![1] {
            sleep(Duration::from_micros(100));
        }
    });

    table.write().push(1);
    handle.join();
}

// Use the raw AsLock interface to update the underlying data.
pub fn run_primitive() {
    use active_standby::sync::AsLock;

    // If the entries in your table are large, you may want to hold only
    // 1 copy shared by both tables. This is safe so long as you never
    // mutate the shared data; only remove and replace it in the table.
    let table = Arc::new(AsLock::new(vec![Arc::new(1)]));
    let table2 = Arc::clone(&table);

    let handle = std::thread::spawn(move || {
        while *table2.read() != vec![Arc::new(2)] {
            sleep(Duration::from_micros(100));
        }
    });

    table.write().update_tables_closure(|table| {
        // Update the entry in the table, not the shared value behind the
        // Arc.
        table[0] = Arc::new(2);
    });
    handle.join();
}

fn main() {
    run_wrapper();
    run_collection();
    run_primitive();
}

测试

active_standby附带了一些测试(例如,见tests/tests_script.sh)

单元测试

基准测试

loom

LLVM Sanitizers

Miri

Rudra

依赖项

~0.4-27MB
~341K SLoC