#broadcast #lock-free #channel #read-write

已删除 lfq

基于环形缓冲区的无锁多生产者/多消费者广播队列

0.1.0 2019年10月13日

#50#broadcast

MIT/Apache

26KB
422

LFQ

Crates.io Documentation

基于环形缓冲区的无锁多生产者/多消费者广播队列。

广播意味着每个读者都会读取每个写入。阅读设计部分以查看权衡是否适合您的应用程序。

设计

出于速度原因,队列在初始创建后不会进行任何分配。通过在固定大小分配中模拟环形缓冲区来模拟无限大小。为了避免阻塞,写入者可以自由覆盖某些或所有读者尚未消费的数据。这意味着读者不能保证看到所有写入。因此,此队列不适合任何类似任务队列的东西。使用 crossbeam-channelbus

多消费者和广播意味着仅支持 Copy 类型。

由于流式读取者需要知道写入者是否覆盖了他们在缓冲区中的位置,每个数据单元和队列中的索引都有一个关联的写入 "纪元"。这些数据以及一个写入进行中的标记存储在一个 AtomicUsize 中。因此,分配大小向上舍入到2的幂。在约 2^(指针宽度) - 大小 次写入后,信息将在打包的原子中重叠,以不可预测的方式破坏队列。 size 指的是分配大小,而不是用户请求的大小。请注意,这发生在整数溢出之前。

写入是一个四步过程。首先,写入者争夺下一个槽位。获胜的写入者然后通过原子存储启动对缓冲区槽位的写入,执行实际写入,然后通过另一个原子存储确认。这确保了读者永远不会看到半写入的数据,即使数据大于平台上的原子操作大小。

读者将检查他们正在读取的单元格是否有与其索引匹配的纪元。他们还将拒绝任何当前正在进行的写入的单元格(如上所述的步骤2-4)。这意味着流式读取不保证获取每条消息。此外,最新写入的读取必须处理可能的不完整写入。各种选择作为公共方法实现。

元素类型必须实现Default,因为通过填充内部缓冲区以默认数据,消除了多个运行时检查。然而,这些临时数据永远不会被读取,仅存在于避免unsafe

唯一的不安全代码是内部Queue类型上的Sync实现。

示例

use std::sync::{
    atomic::{AtomicBool, Ordering},
    Arc,
};
use std::thread;
use lfq::QueueClient;

let w = QueueClient::new_queue(100);
assert_eq!(w.size(), 128);

let mut r = w.clone();
let messages = w.size() * 20;

let finished = Arc::new(AtomicBool::new(false));
let stop = finished.clone();
let thread = thread::spawn(move || {
    let mut last = 0;
    while !stop.load(Ordering::Relaxed) {
        let result = r.latest();
        assert!(result >= last);
        last = result;
    }
});

for data in 0..messages {
    w.push(data);
}
finished.store(true, Ordering::Relaxed);
thread.join().unwrap();

无运行时依赖