19 个稳定版本
3.1.3 | 2023 年 8 月 22 日 |
---|---|
3.1.2 | 2023 年 8 月 17 日 |
2.0.5 | 2023 年 8 月 12 日 |
1.0.7 | 2023 年 8 月 5 日 |
#307 在 并发
每月 下载 47 次
在 2 个 Crates 中使用 (通过 sharded-thread)
52KB
241 行
非阻塞互斥量
为什么你应该使用 NonBlockingMutex
NonBlockingMutex
目前是在锁下进行昂贵计算的最快方式,或在高并发/负载/竞争下在锁下进行廉价计算 - 请参阅目录 benches
中的基准测试并使用
cargo bench
安装
cargo add non_blocking_mutex
示例
针对 1 种 NonBlockingMutexTask
类型进行优化
use non_blocking_mutex::mutex_guard::MutexGuard;
use non_blocking_mutex::non_blocking_mutex::NonBlockingMutex;
use std::thread::{available_parallelism};
/// How many threads can physically access [NonBlockingMutex]
/// simultaneously, needed for computing `shard_count` of [ShardedQueue],
/// used to store queue of tasks
let max_concurrent_thread_count = available_parallelism().unwrap().get();
let non_blocking_mutex = NonBlockingMutex::new(max_concurrent_thread_count, 0);
/// Will infer exact type and size(0) of this [FnOnce] and
/// make sized [NonBlockingMutex] which takes only this exact [FnOnce]
/// without ever requiring [Box]-ing or dynamic dispatch
non_blocking_mutex.run_if_first_or_schedule_on_first(|mut state: MutexGuard<usize>| {
*state += 1;
});
与任何 FnOnce
都容易使用,但可能 Box
任务并在无法首次尝试获取锁时使用动态调度
use non_blocking_mutex::dynamic_non_blocking_mutex::DynamicNonBlockingMutex;
use std::thread::{available_parallelism, scope};
let mut state_snapshot_before_increment = 0;
let mut state_snapshot_after_increment = 0;
let mut state_snapshot_before_decrement = 0;
let mut state_snapshot_after_decrement = 0;
{
/// How many threads can physically access [NonBlockingMutex]
/// simultaneously, needed for computing `shard_count` of [ShardedQueue],
/// used to store queue of tasks
let max_concurrent_thread_count = available_parallelism().unwrap().get();
/// Will work with any [FnOnce] and is easy to use,
/// but will [Box] tasks and use dynamic dispatch
/// when can't acquire lock on first try
let non_blocking_mutex = DynamicNonBlockingMutex::new(max_concurrent_thread_count, 0);
scope(|scope| {
scope.spawn(|| {
non_blocking_mutex.run_fn_once_if_first_or_schedule_on_first(|mut state| {
*(&mut state_snapshot_before_increment) = *state;
*state += 1;
*(&mut state_snapshot_after_increment) = *state;
});
non_blocking_mutex.run_fn_once_if_first_or_schedule_on_first(|mut state| {
*(&mut state_snapshot_before_decrement) = *state;
*state -= 1;
*(&mut state_snapshot_after_decrement) = *state;
});
});
});
}
assert_eq!(state_snapshot_before_increment, 0);
assert_eq!(state_snapshot_after_increment, 1);
assert_eq!(state_snapshot_before_decrement, 1);
assert_eq!(state_snapshot_after_decrement, 0);
针对捕获变量的多个已知类型的 NonBlockingMutexTask
进行优化
use non_blocking_mutex::mutex_guard::MutexGuard;
use non_blocking_mutex::non_blocking_mutex::NonBlockingMutex;
use non_blocking_mutex::non_blocking_mutex_task::NonBlockingMutexTask;
use std::thread::{available_parallelism, scope};
let mut state_snapshot_before_increment = 0;
let mut state_snapshot_after_increment = 0;
let mut state_snapshot_before_decrement = 0;
let mut state_snapshot_after_decrement = 0;
{
/// How many threads can physically access [NonBlockingMutex]
/// simultaneously, needed for computing `shard_count` of [ShardedQueue],
/// used to store queue of tasks
let max_concurrent_thread_count = available_parallelism().unwrap().get();
/// Will infer exact type and size of struct [Task] and
/// make sized [NonBlockingMutex] which takes only [Task]
/// without ever requiring [Box]-ing or dynamic dispatch
let non_blocking_mutex = NonBlockingMutex::new(max_concurrent_thread_count, 0);
scope(|scope| {
scope.spawn(|| {
non_blocking_mutex.run_if_first_or_schedule_on_first(
Task::new_increment_and_store_snapshots(
&mut state_snapshot_before_increment,
&mut state_snapshot_after_increment,
),
);
non_blocking_mutex.run_if_first_or_schedule_on_first(
Task::new_decrement_and_store_snapshots(
&mut state_snapshot_before_decrement,
&mut state_snapshot_after_decrement,
),
);
});
});
}
assert_eq!(state_snapshot_before_increment, 0);
assert_eq!(state_snapshot_after_increment, 1);
assert_eq!(state_snapshot_before_decrement, 1);
assert_eq!(state_snapshot_after_decrement, 0);
struct SnapshotsBeforeAndAfterChangeRefs<
'snapshot_before_change_ref,
'snapshot_after_change_ref,
> {
/// Where to write snapshot of `State` before applying function to `State`
snapshot_before_change_ref: &'snapshot_before_change_ref mut usize,
/// Where to write snapshot of `State` after applying function to `State
snapshot_after_change_ref: &'snapshot_after_change_ref mut usize,
}
enum TaskType<'snapshot_before_change_ref, 'snapshot_after_change_ref> {
IncrementAndStoreSnapshots(
SnapshotsBeforeAndAfterChangeRefs<
'snapshot_before_change_ref,
'snapshot_after_change_ref,
>,
),
DecrementAndStoreSnapshots(
SnapshotsBeforeAndAfterChangeRefs<
'snapshot_before_change_ref,
'snapshot_after_change_ref,
>,
),
}
struct Task<'snapshot_before_change_ref, 'snapshot_after_change_ref> {
task_type: TaskType<'snapshot_before_change_ref, 'snapshot_after_change_ref>,
}
impl<'snapshot_before_change_ref, 'snapshot_after_change_ref>
Task<'snapshot_before_change_ref, 'snapshot_after_change_ref>
{
fn new_increment_and_store_snapshots(
// Where to write snapshot of `State` before applying function to `State`
snapshot_before_change_ref: &'snapshot_before_change_ref mut usize,
// Where to write snapshot of `State` after applying function to `State
snapshot_after_change_ref: &'snapshot_after_change_ref mut usize,
) -> Self {
Self {
task_type: TaskType::IncrementAndStoreSnapshots(
SnapshotsBeforeAndAfterChangeRefs {
/// Where to write snapshot of `State` before applying function to `State`
snapshot_before_change_ref,
/// Where to write snapshot of `State` after applying function to `State
snapshot_after_change_ref,
},
),
}
}
fn new_decrement_and_store_snapshots(
// Where to write snapshot of `State` before applying function to `State`
snapshot_before_change_ref: &'snapshot_before_change_ref mut usize,
// Where to write snapshot of `State` after applying function to `State
snapshot_after_change_ref: &'snapshot_after_change_ref mut usize,
) -> Self {
Self {
task_type: TaskType::DecrementAndStoreSnapshots(
SnapshotsBeforeAndAfterChangeRefs {
/// Where to write snapshot of `State` before applying function to `State`
snapshot_before_change_ref,
/// Where to write snapshot of `State` after applying function to `State
snapshot_after_change_ref,
},
),
}
}
}
impl<'snapshot_before_change_ref, 'snapshot_after_change_ref> NonBlockingMutexTask<usize>
for Task<'snapshot_before_change_ref, 'snapshot_after_change_ref>
{
fn run_with_state(self, mut state: MutexGuard<usize>) {
match self.task_type {
TaskType::IncrementAndStoreSnapshots(SnapshotsBeforeAndAfterChangeRefs {
snapshot_before_change_ref,
snapshot_after_change_ref,
}) => {
*snapshot_before_change_ref = *state;
*state += 1;
*snapshot_after_change_ref = *state;
}
TaskType::DecrementAndStoreSnapshots(SnapshotsBeforeAndAfterChangeRefs {
snapshot_before_change_ref,
snapshot_after_change_ref,
}) => {
*snapshot_before_change_ref = *state;
*state -= 1;
*snapshot_after_change_ref = *state;
}
}
}
}
为什么你可能不想使用 NonBlockingMutex
-
NonBlockingMutex
强制第一个线程进入同步块以执行所有任务(包括在它运行时添加的,如果任务无限期地添加,则可能无限期地运行) -
在执行同步逻辑之后,继续在同一线程上执行更困难,您需要在某些调度程序上安排继续以在新的线程结束同步逻辑后继续,或者引入其他同步原语,如通道,或
WaitGroup
-s 或类似的原语 -
当并发/负载/竞争较低时,
NonBlockingMutex
的性能不如std::sync::Mutex
-
与
std::sync::Mutex
类似,NonBlockingMutex
不保证执行顺序,只保证操作的原子性
基准测试
请参阅目录 benches
中的基准测试逻辑,并通过运行
cargo bench
单线程中的单快速操作,没有竞争
DynamicNonBlockingMutex
在只有 1 个线程和 1 个操作时,其执行速度仅略慢于 Mutex
(因为 DynamicNonBlockingMutex
不在循环中首先将操作存储在 ShardedQueue
中),而 NonBlockingMutex
在只有 1 个线程和 1 个操作时,在同步选项中表现出色。
基准名称 | 时间 |
---|---|
不带互斥锁的增量一次 | 0.228 ns |
在非阻塞互斥锁下进行一次增量(静态) | 8.544 ns |
在非阻塞互斥锁下进行一次增量(动态) | 9.445 ns |
在互斥锁下进行一次增量 | 8.851 ns |
在自旋互斥锁下进行一次增量 | 10.603 ns |
通过在许多线程和最高竞争下自旋 N 次来模拟昂贵的操作
在高竞争(在我们的情况下,是由于长时间锁定,但也可以是由于更高的 CPU 数量)的情况下,NonBlockingMutex
开始比 std::sync::Mutex
表现更好。
基准名称 | 每个线程的操作次数 | 锁定下的自旋次数 | 并发线程数 | 平均时间 |
---|---|---|---|---|
并发静态非阻塞互斥锁增量 | 1_000 | 0 | 24 | 2.313 ms |
并发动态非阻塞互斥锁增量 | 1_000 | 0 | 24 | 3.408 ms |
并发阻塞互斥锁增量 | 1_000 | 0 | 24 | 1.072 ms |
并发自旋互斥锁增量 | 1_000 | 0 | 24 | 4.376 ms |
并发静态非阻塞互斥锁增量 | 10_000 | 0 | 24 | 23.969 ms |
并发动态非阻塞互斥锁增量 | 10_000 | 0 | 24 | 42.584 ms |
并发阻塞互斥锁增量 | 10_000 | 0 | 24 | 14.960 ms |
并发自旋互斥锁增量 | 10_000 | 0 | 24 | 94.658 ms |
并发静态非阻塞互斥锁增量 | 1_000 | 10 | 24 | 9.457 ms |
并发动态非阻塞互斥锁增量 | 1_000 | 10 | 24 | 12.280 ms |
并发阻塞互斥锁增量 | 1_000 | 10 | 24 | 8.345 ms |
并发自旋互斥锁增量 | 1_000 | 10 | 24 | 34.977 ms |
并发静态非阻塞互斥锁增量 | 10_000 | 10 | 24 | 58.297 ms |
并发动态非阻塞互斥锁增量 | 10_000 | 10 | 24 | 70.013 ms |
并发阻塞互斥锁增量 | 10_000 | 10 | 24 | 84.143 ms |
并发自旋互斥锁增量 | 10_000 | 10 | 24 | 349.070 ms |
并发静态非阻塞互斥锁增量 | 1_000 | 100 | 24 | 39.569 ms |
并发动态非阻塞互斥锁增量 | 1_000 | 100 | 24 | 44.670 ms |
并发阻塞互斥锁增量 | 1_000 | 100 | 24 | 47.335 ms |
并发自旋互斥锁增量 | 1_000 | 100 | 24 | 117.570 ms |
并发静态非阻塞互斥锁增量 | 10_000 | 100 | 24 | 358.480 ms |
并发动态非阻塞互斥锁增量 | 10_000 | 100 | 24 | 378.230 ms |
并发阻塞互斥锁增量 | 10_000 | 100 | 24 | 801.090 ms |
并发自旋互斥锁增量 | 10_000 | 100 | 24 | 1200.400 ms |
设计说明
第一个线程调用 NonBlockingMutex::run_if_first_or_schedule_on_first
,原子地增加 task_count
,如果线程是第一个将 task_count
从 0 增加到 1 的,第一个线程将立即执行第一个任务,然后原子地减少 task_count
并检查 task_count
是否已从 1 减少到 0。如果 task_count
从 1 减少到 0 - 没有更多任务,第一个线程可以完成执行循环,否则第一个线程从 task_queue
获取下一个任务并运行任务,然后在其运行后减少任务计数并重复检查 task_count
是否已从 1 减少到 0,并运行任务,直到没有更多任务。
非第一个线程也原子地增加 task_count
,检查它们是否是第一个,将任务 Box
推送到 task_queue
此设计使我们能够避免锁竞争,但增加了将任务 Box
并将任务 Box
放入并发 task_queue
中以及增加和减少 task_count
的 ~常数时间,因此当锁竞争低时,NonBlockingMutex
的表现不如 std::sync::Mutex
,但当日志竞争高(因为我们有更多的 CPU 或因为我们想在锁下执行昂贵的计算)时,NonBlockingMutex
的表现比 std::sync::Mutex
更好。
依赖关系
~145KB