#队列 #批处理 #SPSC

batch_queue

具有智能批处理的单个生产者单个消费者 Rust 队列

1 个不稳定版本

0.1.0 2021 年 12 月 19 日

#1210并发

MIT/Apache

29KB
599 行(不含注释)

批处理队列

一个在生产者和消费者之间实现智能批处理的库。换句话说,这是一个尝试在休眠之前让任一侧运行一段时间(以利用 L1 缓存)的单个生产者单个消费者队列。

该队列的一个优点是所有桶都是预先分配的,除非使用 recv_batchtry_recv_batch 方法,否则在发送和接收项时不会进行分配。

示例

# tokio_test::block_on(async {
use batch_queue::{batch_queue, pipe};

let (tx, mut rx) = batch_queue::<u32, 128>(20);
const N: u32 = 10_000_000;

tokio::spawn(async move {
    let stream = futures::stream::iter(0..N);
    pipe(stream, tx).await;
});

let mut x = 0;
while let Ok(iter) = rx.recv().await {
    for y in iter {
        assert_eq!(y, x);
        x += 1;
    }
}
assert_eq!(x, N);
# })

在这里,由 recv() 返回的迭代器允许您在不进行分配的情况下消费桶内容。在我的 AMD Hetzner 箱子上,每项大约需要 6 纳秒。

工作原理

队列被建模为一个桶的箱切片,其中每个桶都是一个适当大小的 MaybeUninit 初始化项的数组和一个填充级别。读取器和写入器各自维护一个当前位置,包括桶索引以及一个循环计数器(以检测写入器是否正好领先于读取器一个完整循环)。

插入项

  • 检查写入者的桶是否可用于写入(即它不是上一个循环中读取者的桶)
  • 如果是这样,则在桶中的下一个槽位中插入;如果已满,则移动写入桶位置

声明当前批次的结束

  • 检查当前写入桶是否有元素 ⇒ 移动写入桶位置

取桶

  • 检查写入者的桶是否领先于读取者的桶 ⇒ 取走它并移动读取桶位置

依赖关系

~1–1.7MB
~34K SLoC