#lock-free-queue #constant-time #batching #tasks #buffer #swapping #batch

swap-queue

一种无锁的线程所有队列,任务通过缓冲区交换整体被窃取者获取

6个版本 (2个稳定版)

1.1.0 2021年11月21日
1.0.0 2021年11月1日
0.0.6 2021年10月28日

数据结构中排名第1402

每月下载量21

MIT授权

27KB
476

Swap Queue

License Cargo Documentation CI

一种无锁的线程所有队列,任务通过缓冲区交换整体被窃取者获取。对于批处理用例,这种设计具有优点,即无论批次大小如何,所有任务都可以在恒定时间内作为一个批次被获取,而使用crossbeam_deque::Workertokio::sync::mpsc的替代品需要单独收集每个任务,并且在某些情况下缺乏明确的截止点。这种设计确保了当你在等待资源(如连接)可用时,一旦它变得可用,在处理任务批次之前不会出现进一步的延迟。虽然推送行为本身比crossbeam_deque::Worker慢,但比tokio::sync::mpsc快,但整体批处理性能比crossbeam_deque::Worker快约11-19%,比tokio::sync::mpsc快约28-45%,并且在批次之间没有慢速截止点。

示例

use swap_queue::Worker;
use tokio::{
  runtime::Handle,
  sync::oneshot::{channel, Sender},
};

// Jemalloc makes this library substantially faster
#[global_allocator]
static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc;

// Worker needs to be thread local because it is !Sync
thread_local! {
  static QUEUE: Worker<(u64, Sender<u64>)> = Worker::new();
}

// This mechanism will batch optimally without overhead within an async-context because spawn will happen after things already scheduled
async fn push_echo(i: u64) -> u64 {
  {
    let (tx, rx) = channel();

    QUEUE.with(|queue| {
      // A new stealer is returned whenever the buffer is new or was empty
      if let Some(stealer) = queue.push((i, tx)) {
        Handle::current().spawn(async move {
          // Take the underlying buffer in entirety; the next push will return a new Stealer
          let batch = stealer.take().await;

          // Some sort of batched operation, such as a database query

          batch.into_iter().for_each(|(i, tx)| {
            tx.send(i).ok();
          });
        });
      }
    });

    rx
  }
  .await
  .unwrap()
}

基准测试

在ami-06391d741144b83c2上运行的基准测试

异步批处理

Benchmarks, 64 tasks Benchmarks, 128 tasks Benchmarks, 256 tasks Benchmarks, 512 tasks Benchmarks, 1024 tasks

推送

Benchmarks, 1024 tasks

批次收集

Benchmarks, 1024 tasks

在ThreadSanitizer、LeakSanitizer、Miri和Loom下进行CI测试。

依赖项

~3–31MB
~410K SLoC