29 个稳定版本
2.0.1 | 2023 年 8 月 17 日 |
---|---|
2.0.0 | 2023 年 8 月 15 日 |
1.0.27 | 2023 年 8 月 15 日 |
0.0.1 |
|
#143 在 并发 中
每月 下载 36 次
用于 3 个 Crates(其中 2 个直接使用)
35KB
79 行
ShardedQueue
为什么你应该使用 ShardedQueue
ShardedQueue
是目前大多数流行解决方案中在最高并发和负载下可以使用的最快的并发集合,如 concurrent-queue
- 在 benches
目录中查看基准测试,并使用以下命令运行它们:
cargo bench
安装
cargo add sharded_queue
示例
use std::thread::{available_parallelism};
use sharded_queue::ShardedQueue;
/// How many threads can physically access [ShardedQueue]
/// simultaneously, needed for computing `shard_count`
let max_concurrent_thread_count = available_parallelism().unwrap().get();
let sharded_queue = ShardedQueue::new(max_concurrent_thread_count);
sharded_queue.push_back(1);
let item = sharded_queue.pop_front_or_spin_wait_item();
为什么你可能不想使用 ShardedQueue
-
与其他并发队列不同,FIFO 顺序不能保证。虽然看起来FIFO顺序是保证的,但实际上不是,因为可能存在这种情况:多个消费者或生产者触发了非常大的分片的长时间调整大小(除了最后一个),然后经过足够的时间让调整大小完成,然后1个消费者或生产者触发了最后一个分片的长时间调整大小,然后所有其他线程开始消费或生产,并最终开始围绕最后一个分片自旋,不能保证哪个线程将首先获取自旋锁,所以我们甚至不能保证在第一次尝试中
ShardedQueue::pop_front_or_spin_wait_item
将在ShardedQueue::push_back
之前获取锁 -
ShardedQueue
不跟踪长度,因为长度的增加/减少逻辑可能取决于用例,以及当它从1变为0或相反(在某些情况下,如NonBlockingMutex
,我们甚至不会在计数达到1时将操作添加到队列中,而是立即在同一线程中执行它),或甚至是负数(为了优化某些热点路径,例如在某些调度器中,因为将计数恢复到正确状态比强制在某些调度器中不允许其变为负数要便宜),所以它不跟踪长度 -
ShardedQueue
没有太多功能,仅实现了必要的方法ShardedQueue::pop_front_or_spin_wait_item
和ShardedQueue::push_back
基准测试
ShardedQueue
比其他并发队列表现更好。在 benches
目录中查看基准测试逻辑,并通过以下命令运行以重现结果:
cargo bench
基准测试名称 | 每个线程的操作数 | 并发线程数 | 平均时间 |
---|---|---|---|
sharded_queue_push_and_pop_concurrently | 1_000 | 24 | 1.1344 毫秒 |
并发队列推入和弹出 | 1_000 | 24 | 4.8130 毫秒 |
crossbeam_queue_push_and_pop_concurrently | 1_000 | 24 | 5.3154 毫秒 |
queue_mutex_push_and_pop_concurrently | 1_000 | 24 | 6.4846 毫秒 |
sharded_queue_push_and_pop_concurrently | 10_000 | 24 | 8.1651 毫秒 |
并发队列推入和弹出 | 10_000 | 24 | 44.660 毫秒 |
crossbeam_queue_push_and_pop_concurrently | 10_000 | 24 | 49.234 毫秒 |
queue_mutex_push_and_pop_concurrently | 10_000 | 24 | 69.207 毫秒 |
sharded_queue_push_and_pop_concurrently | 100_000 | 24 | 77.167 毫秒 |
并发队列推入和弹出 | 100_000 | 24 | 445.88 毫秒 |
crossbeam_queue_push_and_pop_concurrently | 100_000 | 24 | 434.00 毫秒 |
queue_mutex_push_and_pop_concurrently | 100_000 | 24 | 476.59 毫秒 |
设计说明
ShardedQueue
是设计用于某些调度器和 NonBlockingMutex
,在最高并发和负载下(并发栈无法超越它,因为,与队列不同,队列在 front
和 back
之间分散 pop
和 push
冲突,而栈的 pop
从 back
开始,push
到 back
,这使得队列的冲突加倍,而每次 pop
或 push
的原子增量数量与队列中相同))的最有效集合。
ShardedQueue
使用由单独的 Mutex
保护并由多个队列(碎片)组成的数组,当发生 pop
或 push
时,原子地增加 head_index
或 tail_index
,并通过将 head_index
或 tail_index
应用到模运算来计算当前操作的碎片索引。
模运算已优化,因为
x % 2^n == x & (2^n - 1)
因此,只要队列(碎片)的数量是 2 的幂,我们就可以使用公式非常有效地计算模数
operation_number % shard_count == operation_number & (shard_count - 1)
只要队列(碎片)的数量是 2 的幂,并且大于或等于 CPU 数量,并且 CPU 在 push
/pop
(时间大约相同,因为它是摊销 O(1))上花费的时间相同,多个 CPU 物理上不能同时访问相同的碎片,我们就有最佳的性能。同步底层非并发队列的成本仅为
- 每次
push
或pop
时额外的 1 个原子增量(增加head_index
或tail_index
) - 1 个额外的
compare_and_swap
和 1 个原子存储(无冲突的Mutex
获取和释放) - 1 次便宜的字操作(获取模数)
- 通过索引从队列(碎片)列表中获取 1 次
复杂示例
use sharded_queue::ShardedQueue;
use std::cell::UnsafeCell;
use std::fmt::{Debug, Display, Formatter};
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicUsize, Ordering};
pub struct NonBlockingMutex<'captured_variables, State: ?Sized> {
task_count: AtomicUsize,
task_queue: ShardedQueue<Box<dyn FnOnce(MutexGuard<State>) + Send + 'captured_variables>>,
unsafe_state: UnsafeCell<State>,
}
/// [NonBlockingMutex] is needed to run actions atomically without thread blocking, or context
/// switch, or spin lock contention, or rescheduling on some scheduler
///
/// Notice that it uses [ShardedQueue] which doesn't guarantee order of retrieval, hence
/// [NonBlockingMutex] doesn't guarantee order of execution too, even of already added
/// items
impl<'captured_variables, State> NonBlockingMutex<'captured_variables, State> {
pub fn new(max_concurrent_thread_count: usize, state: State) -> Self {
Self {
task_count: AtomicUsize::new(0),
task_queue: ShardedQueue::new(max_concurrent_thread_count),
unsafe_state: UnsafeCell::new(state),
}
}
/// Please don't forget that order of execution is not guaranteed. Atomicity of operations is guaranteed,
/// but order can be random
pub fn run_if_first_or_schedule_on_first(
&self,
run_with_state: impl FnOnce(MutexGuard<State>) + Send + 'captured_variables,
) {
if self.task_count.fetch_add(1, Ordering::Acquire) != 0 {
self.task_queue.push_back(Box::new(run_with_state));
} else {
// If we acquired first lock, run should be executed immediately and run loop started
run_with_state(unsafe { MutexGuard::new(self) });
/// Note that if [`fetch_sub`] != 1
/// => some thread entered first if block in method
/// => [ShardedQueue::push_back] is guaranteed to be called
/// => [ShardedQueue::pop_front_or_spin_wait_item] will not deadlock while spins until it gets item
///
/// Notice that we run action first, and only then decrement count
/// with releasing(pushing) memory changes, even if it looks otherwise
while self.task_count.fetch_sub(1, Ordering::Release) != 1 {
self.task_queue.pop_front_or_spin_wait_item()(unsafe { MutexGuard::new(self) });
}
}
}
}
/// [Send], [Sync], and [MutexGuard] logic was taken from [std::sync::Mutex]
/// and [std::sync::MutexGuard]
///
/// these are the only places where `T: Send` matters; all other
/// functionality works fine on a single thread.
unsafe impl<'captured_variables, State: ?Sized + Send> Send
for NonBlockingMutex<'captured_variables, State>
{
}
unsafe impl<'captured_variables, State: ?Sized + Send> Sync
for NonBlockingMutex<'captured_variables, State>
{
}
/// Code was mostly taken from [std::sync::MutexGuard], it is expected to protect [State]
/// from moving out of synchronized loop
pub struct MutexGuard<
'captured_variables,
'non_blocking_mutex_ref,
State: ?Sized + 'non_blocking_mutex_ref,
> {
non_blocking_mutex: &'non_blocking_mutex_ref NonBlockingMutex<'captured_variables, State>,
/// Adding it to ensure that [MutexGuard] implements [Send] and [Sync] in same cases
/// as [std::sync::MutexGuard] and protects [State] from going out of synchronized
/// execution loop
///
/// todo remove when this error is no longer actual
/// negative trait bounds are not yet fully implemented; use marker types for now [E0658]
_phantom_unsend: PhantomData<std::sync::MutexGuard<'non_blocking_mutex_ref, State>>,
}
// todo uncomment when this error is no longer actual
// negative trait bounds are not yet fully implemented; use marker types for now [E0658]
// impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized> !Send
// for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
// {
// }
unsafe impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized + Sync> Sync
for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
{
}
impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized>
MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
{
unsafe fn new(
non_blocking_mutex: &'non_blocking_mutex_ref NonBlockingMutex<'captured_variables, State>,
) -> Self {
Self {
non_blocking_mutex,
_phantom_unsend: PhantomData,
}
}
}
impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized> Deref
for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
{
type Target = State;
fn deref(&self) -> &State {
unsafe { &*self.non_blocking_mutex.unsafe_state.get() }
}
}
impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized> DerefMut
for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
{
fn deref_mut(&mut self) -> &mut State {
unsafe { &mut *self.non_blocking_mutex.unsafe_state.get() }
}
}
impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized + Debug> Debug
for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Debug::fmt(&**self, f)
}
}
impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized + Display> Display
for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
(**self).fmt(f)
}
}
依赖关系
~115KB