0.1.0 |
|
---|
#50 在 #broadcast
26KB
422 行
LFQ
基于环形缓冲区的无锁多生产者/多消费者广播队列。
广播意味着每个读者都会读取每个写入。阅读设计部分以查看权衡是否适合您的应用程序。
设计
出于速度原因,队列在初始创建后不会进行任何分配。通过在固定大小分配中模拟环形缓冲区来模拟无限大小。为了避免阻塞,写入者可以自由覆盖某些或所有读者尚未消费的数据。这意味着读者不能保证看到所有写入。因此,此队列不适合任何类似任务队列的东西。使用 crossbeam-channel
或 bus
。
多消费者和广播意味着仅支持 Copy
类型。
由于流式读取者需要知道写入者是否覆盖了他们在缓冲区中的位置,每个数据单元和队列中的索引都有一个关联的写入 "纪元"。这些数据以及一个写入进行中的标记存储在一个 AtomicUsize
中。因此,分配大小向上舍入到2的幂。在约 2^(指针宽度) - 大小
次写入后,信息将在打包的原子中重叠,以不可预测的方式破坏队列。 size
指的是分配大小,而不是用户请求的大小。请注意,这发生在整数溢出之前。
写入是一个四步过程。首先,写入者争夺下一个槽位。获胜的写入者然后通过原子存储启动对缓冲区槽位的写入,执行实际写入,然后通过另一个原子存储确认。这确保了读者永远不会看到半写入的数据,即使数据大于平台上的原子操作大小。
读者将检查他们正在读取的单元格是否有与其索引匹配的纪元。他们还将拒绝任何当前正在进行的写入的单元格(如上所述的步骤2-4)。这意味着流式读取不保证获取每条消息。此外,最新写入的读取必须处理可能的不完整写入。各种选择作为公共方法实现。
元素类型必须实现Default,因为通过填充内部缓冲区以默认数据,消除了多个运行时检查。然而,这些临时数据永远不会被读取,仅存在于避免
unsafe。
唯一的不安全代码是内部Queue
类型上的Sync
实现。
示例
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use std::thread;
use lfq::QueueClient;
let w = QueueClient::new_queue(100);
assert_eq!(w.size(), 128);
let mut r = w.clone();
let messages = w.size() * 20;
let finished = Arc::new(AtomicBool::new(false));
let stop = finished.clone();
let thread = thread::spawn(move || {
let mut last = 0;
while !stop.load(Ordering::Relaxed) {
let result = r.latest();
assert!(result >= last);
last = result;
}
});
for data in 0..messages {
w.push(data);
}
finished.store(true, Ordering::Relaxed);
thread.join().unwrap();