#mutex #lock #非阻塞锁

non_blocking_mutex

非阻塞互斥量是目前在锁下进行昂贵计算或在高并发/负载/竞争下在锁下进行廉价计算的最快方式

19 个稳定版本

3.1.3 2023 年 8 月 22 日
3.1.2 2023 年 8 月 17 日
2.0.5 2023 年 8 月 12 日
1.0.7 2023 年 8 月 5 日

#307并发

每月 下载 47
2 个 Crates 中使用 (通过 sharded-thread)

MIT/Apache

52KB
241

非阻塞互斥量

github crates.io docs.rs Build and test Rust

为什么你应该使用 NonBlockingMutex

NonBlockingMutex 目前是在锁下进行昂贵计算的最快方式,或在高并发/负载/竞争下在锁下进行廉价计算 - 请参阅目录 benches 中的基准测试并使用

cargo bench

安装

cargo add non_blocking_mutex

示例

针对 1 种 NonBlockingMutexTask 类型进行优化

use non_blocking_mutex::mutex_guard::MutexGuard;
use non_blocking_mutex::non_blocking_mutex::NonBlockingMutex;
use std::thread::{available_parallelism};

/// How many threads can physically access [NonBlockingMutex]
/// simultaneously, needed for computing `shard_count` of [ShardedQueue],
/// used to store queue of tasks
let max_concurrent_thread_count = available_parallelism().unwrap().get();

let non_blocking_mutex = NonBlockingMutex::new(max_concurrent_thread_count, 0);
/// Will infer exact type and size(0) of this [FnOnce] and
/// make sized [NonBlockingMutex] which takes only this exact [FnOnce]
/// without ever requiring [Box]-ing or dynamic dispatch
non_blocking_mutex.run_if_first_or_schedule_on_first(|mut state: MutexGuard<usize>| {
    *state += 1;
});

与任何 FnOnce 都容易使用,但可能 Box 任务并在无法首次尝试获取锁时使用动态调度

use non_blocking_mutex::dynamic_non_blocking_mutex::DynamicNonBlockingMutex;
use std::thread::{available_parallelism, scope};

let mut state_snapshot_before_increment = 0;
let mut state_snapshot_after_increment = 0;

let mut state_snapshot_before_decrement = 0;
let mut state_snapshot_after_decrement = 0;

{
    /// How many threads can physically access [NonBlockingMutex]
    /// simultaneously, needed for computing `shard_count` of [ShardedQueue],
    /// used to store queue of tasks
    let max_concurrent_thread_count = available_parallelism().unwrap().get();

    /// Will work with any [FnOnce] and is easy to use,
    /// but will [Box] tasks and use dynamic dispatch
    /// when can't acquire lock on first try
    let non_blocking_mutex = DynamicNonBlockingMutex::new(max_concurrent_thread_count, 0);

    scope(|scope| {
        scope.spawn(|| {
            non_blocking_mutex.run_fn_once_if_first_or_schedule_on_first(|mut state| {
                *(&mut state_snapshot_before_increment) = *state;
                *state += 1;
                *(&mut state_snapshot_after_increment) = *state;
            });
            non_blocking_mutex.run_fn_once_if_first_or_schedule_on_first(|mut state| {
                *(&mut state_snapshot_before_decrement) = *state;
                *state -= 1;
                *(&mut state_snapshot_after_decrement) = *state;
            });
        });
    });
}

assert_eq!(state_snapshot_before_increment, 0);
assert_eq!(state_snapshot_after_increment, 1);

assert_eq!(state_snapshot_before_decrement, 1);
assert_eq!(state_snapshot_after_decrement, 0);

针对捕获变量的多个已知类型的 NonBlockingMutexTask 进行优化

use non_blocking_mutex::mutex_guard::MutexGuard;
use non_blocking_mutex::non_blocking_mutex::NonBlockingMutex;
use non_blocking_mutex::non_blocking_mutex_task::NonBlockingMutexTask;
use std::thread::{available_parallelism, scope};

let mut state_snapshot_before_increment = 0;
let mut state_snapshot_after_increment = 0;

let mut state_snapshot_before_decrement = 0;
let mut state_snapshot_after_decrement = 0;

{
    /// How many threads can physically access [NonBlockingMutex]
    /// simultaneously, needed for computing `shard_count` of [ShardedQueue],
    /// used to store queue of tasks
    let max_concurrent_thread_count = available_parallelism().unwrap().get();

    /// Will infer exact type and size of struct [Task] and
    /// make sized [NonBlockingMutex] which takes only [Task]
    /// without ever requiring [Box]-ing or dynamic dispatch
    let non_blocking_mutex = NonBlockingMutex::new(max_concurrent_thread_count, 0);

    scope(|scope| {
        scope.spawn(|| {
            non_blocking_mutex.run_if_first_or_schedule_on_first(
                Task::new_increment_and_store_snapshots(
                    &mut state_snapshot_before_increment,
                    &mut state_snapshot_after_increment,
                ),
            );
            non_blocking_mutex.run_if_first_or_schedule_on_first(
                Task::new_decrement_and_store_snapshots(
                    &mut state_snapshot_before_decrement,
                    &mut state_snapshot_after_decrement,
                ),
            );
        });
    });
}

assert_eq!(state_snapshot_before_increment, 0);
assert_eq!(state_snapshot_after_increment, 1);

assert_eq!(state_snapshot_before_decrement, 1);
assert_eq!(state_snapshot_after_decrement, 0);

struct SnapshotsBeforeAndAfterChangeRefs<
    'snapshot_before_change_ref,
    'snapshot_after_change_ref,
> {
    /// Where to write snapshot of `State` before applying function to `State`
    snapshot_before_change_ref: &'snapshot_before_change_ref mut usize,
    /// Where to write snapshot of `State` after applying function to `State
    snapshot_after_change_ref: &'snapshot_after_change_ref mut usize,
}

enum TaskType<'snapshot_before_change_ref, 'snapshot_after_change_ref> {
    IncrementAndStoreSnapshots(
        SnapshotsBeforeAndAfterChangeRefs<
            'snapshot_before_change_ref,
            'snapshot_after_change_ref,
        >,
    ),
    DecrementAndStoreSnapshots(
        SnapshotsBeforeAndAfterChangeRefs<
            'snapshot_before_change_ref,
            'snapshot_after_change_ref,
        >,
    ),
}

struct Task<'snapshot_before_change_ref, 'snapshot_after_change_ref> {
    task_type: TaskType<'snapshot_before_change_ref, 'snapshot_after_change_ref>,
}

impl<'snapshot_before_change_ref, 'snapshot_after_change_ref>
    Task<'snapshot_before_change_ref, 'snapshot_after_change_ref>
{
    fn new_increment_and_store_snapshots(
        // Where to write snapshot of `State` before applying function to `State`
        snapshot_before_change_ref: &'snapshot_before_change_ref mut usize,
        // Where to write snapshot of `State` after applying function to `State
        snapshot_after_change_ref: &'snapshot_after_change_ref mut usize,
    ) -> Self {
        Self {
            task_type: TaskType::IncrementAndStoreSnapshots(
                SnapshotsBeforeAndAfterChangeRefs {
                    /// Where to write snapshot of `State` before applying function to `State`
                    snapshot_before_change_ref,
                    /// Where to write snapshot of `State` after applying function to `State
                    snapshot_after_change_ref,
                },
            ),
        }
    }

    fn new_decrement_and_store_snapshots(
        // Where to write snapshot of `State` before applying function to `State`
        snapshot_before_change_ref: &'snapshot_before_change_ref mut usize,
        // Where to write snapshot of `State` after applying function to `State
        snapshot_after_change_ref: &'snapshot_after_change_ref mut usize,
    ) -> Self {
        Self {
            task_type: TaskType::DecrementAndStoreSnapshots(
                SnapshotsBeforeAndAfterChangeRefs {
                    /// Where to write snapshot of `State` before applying function to `State`
                    snapshot_before_change_ref,
                    /// Where to write snapshot of `State` after applying function to `State
                    snapshot_after_change_ref,
                },
            ),
        }
    }
}

impl<'snapshot_before_change_ref, 'snapshot_after_change_ref> NonBlockingMutexTask<usize>
    for Task<'snapshot_before_change_ref, 'snapshot_after_change_ref>
{
    fn run_with_state(self, mut state: MutexGuard<usize>) {
        match self.task_type {
            TaskType::IncrementAndStoreSnapshots(SnapshotsBeforeAndAfterChangeRefs {
                snapshot_before_change_ref,
                snapshot_after_change_ref,
            }) => {
                *snapshot_before_change_ref = *state;
                *state += 1;
                *snapshot_after_change_ref = *state;
            }
            TaskType::DecrementAndStoreSnapshots(SnapshotsBeforeAndAfterChangeRefs {
                snapshot_before_change_ref,
                snapshot_after_change_ref,
            }) => {
                *snapshot_before_change_ref = *state;
                *state -= 1;
                *snapshot_after_change_ref = *state;
            }
        }
    }
}

为什么你可能不想使用 NonBlockingMutex

  • NonBlockingMutex 强制第一个线程进入同步块以执行所有任务(包括在它运行时添加的,如果任务无限期地添加,则可能无限期地运行)

  • 在执行同步逻辑之后,继续在同一线程上执行更困难,您需要在某些调度程序上安排继续以在新的线程结束同步逻辑后继续,或者引入其他同步原语,如通道,或 WaitGroup-s 或类似的原语

  • 当并发/负载/竞争较低时,NonBlockingMutex 的性能不如 std::sync::Mutex

  • std::sync::Mutex 类似,NonBlockingMutex 不保证执行顺序,只保证操作的原子性

基准测试

请参阅目录 benches 中的基准测试逻辑,并通过运行

cargo bench

单线程中的单快速操作,没有竞争

DynamicNonBlockingMutex 在只有 1 个线程和 1 个操作时,其执行速度仅略慢于 Mutex(因为 DynamicNonBlockingMutex 不在循环中首先将操作存储在 ShardedQueue 中),而 NonBlockingMutex 在只有 1 个线程和 1 个操作时,在同步选项中表现出色。

基准名称 时间
不带互斥锁的增量一次 0.228 ns
在非阻塞互斥锁下进行一次增量(静态) 8.544 ns
在非阻塞互斥锁下进行一次增量(动态) 9.445 ns
在互斥锁下进行一次增量 8.851 ns
在自旋互斥锁下进行一次增量 10.603 ns

通过在许多线程和最高竞争下自旋 N 次来模拟昂贵的操作

在高竞争(在我们的情况下,是由于长时间锁定,但也可以是由于更高的 CPU 数量)的情况下,NonBlockingMutex 开始比 std::sync::Mutex 表现更好。

基准名称 每个线程的操作次数 锁定下的自旋次数 并发线程数 平均时间
并发静态非阻塞互斥锁增量 1_000 0 24 2.313 ms
并发动态非阻塞互斥锁增量 1_000 0 24 3.408 ms
并发阻塞互斥锁增量 1_000 0 24 1.072 ms
并发自旋互斥锁增量 1_000 0 24 4.376 ms
并发静态非阻塞互斥锁增量 10_000 0 24 23.969 ms
并发动态非阻塞互斥锁增量 10_000 0 24 42.584 ms
并发阻塞互斥锁增量 10_000 0 24 14.960 ms
并发自旋互斥锁增量 10_000 0 24 94.658 ms
并发静态非阻塞互斥锁增量 1_000 10 24 9.457 ms
并发动态非阻塞互斥锁增量 1_000 10 24 12.280 ms
并发阻塞互斥锁增量 1_000 10 24 8.345 ms
并发自旋互斥锁增量 1_000 10 24 34.977 ms
并发静态非阻塞互斥锁增量 10_000 10 24 58.297 ms
并发动态非阻塞互斥锁增量 10_000 10 24 70.013 ms
并发阻塞互斥锁增量 10_000 10 24 84.143 ms
并发自旋互斥锁增量 10_000 10 24 349.070 ms
并发静态非阻塞互斥锁增量 1_000 100 24 39.569 ms
并发动态非阻塞互斥锁增量 1_000 100 24 44.670 ms
并发阻塞互斥锁增量 1_000 100 24 47.335 ms
并发自旋互斥锁增量 1_000 100 24 117.570 ms
并发静态非阻塞互斥锁增量 10_000 100 24 358.480 ms
并发动态非阻塞互斥锁增量 10_000 100 24 378.230 ms
并发阻塞互斥锁增量 10_000 100 24 801.090 ms
并发自旋互斥锁增量 10_000 100 24 1200.400 ms

设计说明

第一个线程调用 NonBlockingMutex::run_if_first_or_schedule_on_first,原子地增加 task_count,如果线程是第一个将 task_count 从 0 增加到 1 的,第一个线程将立即执行第一个任务,然后原子地减少 task_count 并检查 task_count 是否已从 1 减少到 0。如果 task_count 从 1 减少到 0 - 没有更多任务,第一个线程可以完成执行循环,否则第一个线程从 task_queue 获取下一个任务并运行任务,然后在其运行后减少任务计数并重复检查 task_count 是否已从 1 减少到 0,并运行任务,直到没有更多任务。

非第一个线程也原子地增加 task_count,检查它们是否是第一个,将任务 Box 推送到 task_queue

此设计使我们能够避免锁竞争,但增加了将任务 Box 并将任务 Box 放入并发 task_queue 中以及增加和减少 task_count 的 ~常数时间,因此当锁竞争低时,NonBlockingMutex 的表现不如 std::sync::Mutex,但当日志竞争高(因为我们有更多的 CPU 或因为我们想在锁下执行昂贵的计算)时,NonBlockingMutex 的表现比 std::sync::Mutex 更好。

依赖关系

~145KB