#队列 #mpmc #并发队列

sharded_queue

ShardedQueue 是目前最高并发和负载下可以使用的最快的并发集合

29 个稳定版本

2.0.1 2023 年 8 月 17 日
2.0.0 2023 年 8 月 15 日
1.0.27 2023 年 8 月 15 日
0.0.1 2023 年 8 月 3 日

#143并发

每月 下载 36
用于 3 个 Crates(其中 2 个直接使用)

MIT/Apache

35KB
79

ShardedQueue

github crates.io docs.rs Build and test Rust

为什么你应该使用 ShardedQueue

ShardedQueue 是目前大多数流行解决方案中在最高并发和负载下可以使用的最快的并发集合,如 concurrent-queue - 在 benches 目录中查看基准测试,并使用以下命令运行它们:

cargo bench

安装

cargo add sharded_queue

示例

use std::thread::{available_parallelism};
use sharded_queue::ShardedQueue;

/// How many threads can physically access [ShardedQueue]
/// simultaneously, needed for computing `shard_count`
let max_concurrent_thread_count = available_parallelism().unwrap().get();

let sharded_queue = ShardedQueue::new(max_concurrent_thread_count);

sharded_queue.push_back(1);
let item = sharded_queue.pop_front_or_spin_wait_item();

为什么你可能不想使用 ShardedQueue

  • 与其他并发队列不同,FIFO 顺序不能保证。虽然看起来FIFO顺序是保证的,但实际上不是,因为可能存在这种情况:多个消费者或生产者触发了非常大的分片的长时间调整大小(除了最后一个),然后经过足够的时间让调整大小完成,然后1个消费者或生产者触发了最后一个分片的长时间调整大小,然后所有其他线程开始消费或生产,并最终开始围绕最后一个分片自旋,不能保证哪个线程将首先获取自旋锁,所以我们甚至不能保证在第一次尝试中 ShardedQueue::pop_front_or_spin_wait_item 将在 ShardedQueue::push_back 之前获取锁

  • ShardedQueue 不跟踪长度,因为长度的增加/减少逻辑可能取决于用例,以及当它从1变为0或相反(在某些情况下,如 NonBlockingMutex,我们甚至不会在计数达到1时将操作添加到队列中,而是立即在同一线程中执行它),或甚至是负数(为了优化某些热点路径,例如在某些调度器中,因为将计数恢复到正确状态比强制在某些调度器中不允许其变为负数要便宜),所以它不跟踪长度

  • ShardedQueue 没有太多功能,仅实现了必要的方法 ShardedQueue::pop_front_or_spin_wait_itemShardedQueue::push_back

基准测试

ShardedQueue 比其他并发队列表现更好。在 benches 目录中查看基准测试逻辑,并通过以下命令运行以重现结果:

cargo bench
基准测试名称 每个线程的操作数 并发线程数 平均时间
sharded_queue_push_and_pop_concurrently 1_000 24 1.1344 毫秒
并发队列推入和弹出 1_000 24 4.8130 毫秒
crossbeam_queue_push_and_pop_concurrently 1_000 24 5.3154 毫秒
queue_mutex_push_and_pop_concurrently 1_000 24 6.4846 毫秒
sharded_queue_push_and_pop_concurrently 10_000 24 8.1651 毫秒
并发队列推入和弹出 10_000 24 44.660 毫秒
crossbeam_queue_push_and_pop_concurrently 10_000 24 49.234 毫秒
queue_mutex_push_and_pop_concurrently 10_000 24 69.207 毫秒
sharded_queue_push_and_pop_concurrently 100_000 24 77.167 毫秒
并发队列推入和弹出 100_000 24 445.88 毫秒
crossbeam_queue_push_and_pop_concurrently 100_000 24 434.00 毫秒
queue_mutex_push_and_pop_concurrently 100_000 24 476.59 毫秒

设计说明

ShardedQueue 是设计用于某些调度器和 NonBlockingMutex,在最高并发和负载下(并发栈无法超越它,因为,与队列不同,队列在 frontback 之间分散 poppush 冲突,而栈的 popback 开始,pushback,这使得队列的冲突加倍,而每次 poppush 的原子增量数量与队列中相同))的最有效集合。

ShardedQueue 使用由单独的 Mutex 保护并由多个队列(碎片)组成的数组,当发生 poppush 时,原子地增加 head_indextail_index,并通过将 head_indextail_index 应用到模运算来计算当前操作的碎片索引。

模运算已优化,因为

x % 2^n == x & (2^n - 1)

因此,只要队列(碎片)的数量是 2 的幂,我们就可以使用公式非常有效地计算模数

operation_number % shard_count == operation_number & (shard_count - 1)

只要队列(碎片)的数量是 2 的幂,并且大于或等于 CPU 数量,并且 CPU 在 push/pop(时间大约相同,因为它是摊销 O(1))上花费的时间相同,多个 CPU 物理上不能同时访问相同的碎片,我们就有最佳的性能。同步底层非并发队列的成本仅为

  • 每次 pushpop 时额外的 1 个原子增量(增加 head_indextail_index
  • 1 个额外的 compare_and_swap 和 1 个原子存储(无冲突的 Mutex 获取和释放)
  • 1 次便宜的字操作(获取模数)
  • 通过索引从队列(碎片)列表中获取 1 次

复杂示例

use sharded_queue::ShardedQueue;
use std::cell::UnsafeCell;
use std::fmt::{Debug, Display, Formatter};
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicUsize, Ordering};

pub struct NonBlockingMutex<'captured_variables, State: ?Sized> {
    task_count: AtomicUsize,
    task_queue: ShardedQueue<Box<dyn FnOnce(MutexGuard<State>) + Send + 'captured_variables>>,
    unsafe_state: UnsafeCell<State>,
}

/// [NonBlockingMutex] is needed to run actions atomically without thread blocking, or context
/// switch, or spin lock contention, or rescheduling on some scheduler
///
/// Notice that it uses [ShardedQueue] which doesn't guarantee order of retrieval, hence
/// [NonBlockingMutex] doesn't guarantee order of execution too, even of already added
/// items
impl<'captured_variables, State> NonBlockingMutex<'captured_variables, State> {
    pub fn new(max_concurrent_thread_count: usize, state: State) -> Self {
        Self {
            task_count: AtomicUsize::new(0),
            task_queue: ShardedQueue::new(max_concurrent_thread_count),
            unsafe_state: UnsafeCell::new(state),
        }
    }

    /// Please don't forget that order of execution is not guaranteed. Atomicity of operations is guaranteed,
    /// but order can be random
    pub fn run_if_first_or_schedule_on_first(
        &self,
        run_with_state: impl FnOnce(MutexGuard<State>) + Send + 'captured_variables,
    ) {
        if self.task_count.fetch_add(1, Ordering::Acquire) != 0 {
            self.task_queue.push_back(Box::new(run_with_state));
        } else {
            // If we acquired first lock, run should be executed immediately and run loop started
            run_with_state(unsafe { MutexGuard::new(self) });
            /// Note that if [`fetch_sub`] != 1
            /// => some thread entered first if block in method
            /// => [ShardedQueue::push_back] is guaranteed to be called
            /// => [ShardedQueue::pop_front_or_spin_wait_item] will not deadlock while spins until it gets item
            ///
            /// Notice that we run action first, and only then decrement count
            /// with releasing(pushing) memory changes, even if it looks otherwise
            while self.task_count.fetch_sub(1, Ordering::Release) != 1 {
                self.task_queue.pop_front_or_spin_wait_item()(unsafe { MutexGuard::new(self) });
            }
        }
    }
}

/// [Send], [Sync], and [MutexGuard] logic was taken from [std::sync::Mutex]
/// and [std::sync::MutexGuard]
///
/// these are the only places where `T: Send` matters; all other
/// functionality works fine on a single thread.
unsafe impl<'captured_variables, State: ?Sized + Send> Send
    for NonBlockingMutex<'captured_variables, State>
{
}
unsafe impl<'captured_variables, State: ?Sized + Send> Sync
    for NonBlockingMutex<'captured_variables, State>
{
}

/// Code was mostly taken from [std::sync::MutexGuard], it is expected to protect [State]
/// from moving out of synchronized loop
pub struct MutexGuard<
    'captured_variables,
    'non_blocking_mutex_ref,
    State: ?Sized + 'non_blocking_mutex_ref,
> {
    non_blocking_mutex: &'non_blocking_mutex_ref NonBlockingMutex<'captured_variables, State>,
    /// Adding it to ensure that [MutexGuard] implements [Send] and [Sync] in same cases
    /// as [std::sync::MutexGuard] and protects [State] from going out of synchronized
    /// execution loop
    ///
    /// todo remove when this error is no longer actual
    ///  negative trait bounds are not yet fully implemented; use marker types for now [E0658]
    _phantom_unsend: PhantomData<std::sync::MutexGuard<'non_blocking_mutex_ref, State>>,
}

// todo uncomment when this error is no longer actual
//  negative trait bounds are not yet fully implemented; use marker types for now [E0658]
// impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized> !Send
//     for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
// {
// }
unsafe impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized + Sync> Sync
    for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
{
}

impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized>
    MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
{
    unsafe fn new(
        non_blocking_mutex: &'non_blocking_mutex_ref NonBlockingMutex<'captured_variables, State>,
    ) -> Self {
        Self {
            non_blocking_mutex,
            _phantom_unsend: PhantomData,
        }
    }
}

impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized> Deref
    for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
{
    type Target = State;

    fn deref(&self) -> &State {
        unsafe { &*self.non_blocking_mutex.unsafe_state.get() }
    }
}

impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized> DerefMut
    for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
{
    fn deref_mut(&mut self) -> &mut State {
        unsafe { &mut *self.non_blocking_mutex.unsafe_state.get() }
    }
}

impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized + Debug> Debug
    for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
{
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        Debug::fmt(&**self, f)
    }
}

impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized + Display> Display
    for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
{
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        (**self).fmt(f)
    }
}

依赖关系

~115KB