2 个不稳定版本
0.2.1 | 2024 年 7 月 12 日 |
---|---|
0.2.0 |
|
0.1.0 | 2023 年 6 月 26 日 |
#127 in 并发
120KB
2K SLoC
swap-buffer-queue
缓冲型 MPSC 队列。
该库旨在成为(更好,希望如此)缓冲型消费者场景下传统 MPSC 队列的替代方案,通过将缓冲部分直接移入队列中。
它特别适合 IO 写入工作流程,请参阅 缓冲实现。
该包是 no_std
– 一些缓冲实现可能需要 alloc
包。
除了低级的 Queue
实现之外,还提供了一个更高级的 SynchronizedQueue
,其中包含阻塞和异步方法。同步功能需要 std
。
示例
use std::ops::Deref;
use swap_buffer_queue::{buffer::{IntoValueIter, VecBuffer}, Queue};
// Initialize the queue with a capacity
let queue: Queue<VecBuffer<usize>> = Queue::with_capacity(42);
// Enqueue some value
queue.try_enqueue([0]).unwrap();
// Multiple values can be enqueued at the same time
// (optimized compared to multiple enqueuing)
queue.try_enqueue([1, 2]).unwrap();
let mut values = vec![3, 4];
queue
.try_enqueue(values.drain(..).into_value_iter())
.unwrap();
// Dequeue a slice to the enqueued values
let slice = queue.try_dequeue().unwrap();
assert_eq!(slice.deref(), &[0, 1, 2, 3, 4]);
// Enqueued values can also be retrieved
assert_eq!(slice.into_iter().collect::<Vec<_>>(), vec![0, 1, 2, 3, 4]);
缓冲实现
除了简单的 ArrayBuffer
和 VecBuffer
之外,此包还提供了有用的面向写入的实现。
写入
WriteArrayBuffer
和 WriteVecBuffer
在有已知序列化大小的对象需要序列化时非常适合。实际上,对象可以直接在队列的缓冲区上进行序列化,从而避免分配。
use std::io::Write;
use swap_buffer_queue::{Queue, write::{WriteBytesSlice, WriteVecBuffer}};
// Creates a WriteVecBuffer queue with a 2-bytes header
let queue: Queue<WriteVecBuffer<2>> = Queue::with_capacity((1 << 16) - 1);
queue
.try_enqueue((256, |slice: &mut [u8]| { /* write the slice */ }))
.unwrap();
queue
.try_enqueue((42, |slice: &mut [u8]| { /* write the slice */ }))
.unwrap();
let mut slice = queue.try_dequeue().unwrap();
// Adds a header with the len of the buffer
let len = (slice.len() as u16).to_be_bytes();
slice.header().copy_from_slice(&len);
// Let's pretend we have a writer
let mut writer: Vec<u8> = Default::default();
assert_eq!(writer.write(slice.frame()).unwrap(), 300);
写入向量
WriteVectoredArrayBuffer
和 WriteVectoredVecBuffer
允许缓冲 IoSlice
的切片,从而节省了逐个出队 io-slices 并在之后收集它们的成本。(内部使用两个缓冲区,一个用于值,一个用于 io-slices)
作为一个便利的特制,可以检索缓冲的 io-slices 的总大小。
use std::io::{Write};
use swap_buffer_queue::{Queue, write_vectored::WriteVectoredVecBuffer};
// Creates a WriteVectoredVecBuffer queue
let queue: Queue<WriteVectoredVecBuffer<Vec<u8>>, Vec<u8>> = Queue::with_capacity(100);
queue.try_enqueue([vec![0; 256]]).unwrap();
queue.try_enqueue([vec![42; 42]]).unwrap();
let mut total_size = 0u16.to_be_bytes();
let mut slice = queue.try_dequeue().unwrap();
// Adds a header with the total size of the slices
total_size.copy_from_slice(&(slice.total_size() as u16).to_be_bytes());
let mut frame = slice.frame(.., Some(&total_size), None);
// Let's pretend we have a writer
let mut writer: Vec<u8> = Default::default();
assert_eq!(writer.write_vectored(&mut frame).unwrap(), 300);
工作原理
内部,此队列使用 2 个缓冲区:一个用于入队,另一个用于出队。
当调用 Queue::try_enqueue
时,它原子性地在当前入队缓冲区中预留一个槽位。然后值将被插入到该槽位。
当调用 Queue::try_dequeue
时,两个缓冲区会被原子性地交换,所以出队缓冲区将包含之前入队的值,而新的入队值将进入另一个(空的)缓冲区。
由于两阶段入队不能是原子的,队列可能会处于中间状态,其中槽位已被预留但尚未写入。在这种情况下,出队操作将失败并需要重试。
公平性
SynchronizedQueue
的实现并不公平,即它不保证当容量可用时,最老的阻塞入队者将成功。
然而,由于所有容量同时可用,这个问题得到了相当程度的缓解,因此所有阻塞的入队者都可能成功(特别是对于单大小值)。
对于可能的大变量大小值的特殊情况,仍然可以将队列与信号量结合使用,例如 tokio::sync::Semaphore
。性能会受到一些影响,但算法足够快,可以承受这种影响。
我仍在考虑如何在算法中直接包含公平性,但这并不是一件容易的事情。
不安全
这个库使用不安全代码,原因有三
- 缓冲区被包装在
UnsafeCell
中,以便允许对出队缓冲区进行可变访问; - 缓冲区实现可能使用不安全代码,以便允许使用共享引用进行插入;
Buffer
特性需要不安全接口来保证其不变性,因为它是一个公开的特性。
为了确保算法的安全性,它使用了
- 测试(目前主要是文档测试,但需要完成)
- 基准测试
- MIRI(带有测试)
Loom 目前部分集成,但 loom 测试仍在待办事项列表中。
性能
swap-buffer-queue 非常高效——它实际上是已知最快的 MPSC 队列。
以下是 crossbeam 基准测试的分支 forked
基准测试 | crossbeam | swap-buffer-queue |
---|---|---|
bounded1_mpsc | 1.545秒 | 1.763秒 |
bounded1_spsc | 1.652秒 | 1.000秒 |
bounded_mpsc | 0.362秒 | 0.137秒 |
然而,为了达到最大性能,需要足够大的容量;否则,高争用场景可能会降低性能。 | 依赖 | ~88–280KB |
crossbeam-utils | 跨beam | bounded1_mpsc |
1.545秒
1.763秒
bounded1_spsc