#message-queue #queue #lock-free-queue #lock-free #fifo-queue #mpmc #events

rc_event_queue

类似VecDeque的快速、无界、FIFO、并发读无锁消息队列

8个版本

0.4.2 2021年10月20日
0.4.1 2021年10月11日
0.3.1 2021年10月2日
0.3.0 2021年9月29日
0.2.2 2021年9月27日

#456并发

Download history 378/week @ 2024-04-06 207/week @ 2024-04-13 215/week @ 2024-04-20 283/week @ 2024-04-27 194/week @ 2024-05-04 259/week @ 2024-05-11 234/week @ 2024-05-18 182/week @ 2024-05-25 179/week @ 2024-06-01 66/week @ 2024-06-08 52/week @ 2024-06-15 60/week @ 2024-06-22 98/week @ 2024-06-29 44/week @ 2024-07-06 27/week @ 2024-07-13 74/week @ 2024-07-20

每月247次下载

MIT/Apache

105KB
2K SLoC

crates.io Docs CI

读计事件队列

快速、并发FIFO事件队列(或消息队列)。多个消费者接收每条消息。

  • mpmc(多生产者多消费者)- 无锁读,锁写。
  • spmc(单生产者多消费者)- 无锁读,无锁写。

写操作从不阻塞读操作。性能面向消费者。大多为连续内存布局。内存消耗不随读取者数量增长。

性能

具有非常低的CPU + 内存开销。大部分时间读者只需在每个iter()调用中执行1次原子加载。仅此而已!

单线程。

读 - 接近 VecDeque!写

  • mpmc - push 比较慢,需要至少4个项目才能接近 VecDeque
  • spmc - 与 VecDeque 相同!

多线程。

读 - 每个线程的性能随着同时读取的线程数量增加而缓慢下降。(也请记住,由于 rc_event_queue 是消息队列,每个读者都读取整个队列 - 增加读取者不会使队列消耗更快)

写 - 每个线程的性能几乎呈线性下降,每个额外的同时写入线程。(由于被锁定)。不适用于 spmc

注意。但如果没有激烈竞争 - 性能非常接近单线程情况。

查看mpmc基准测试.

工作原理

查看 doc/principle-of-operation.md

简而言之 - EventQueue 基于块操作。 EventQueue 不接触 EventReaderEventReader 总是“拉取”来自 EventQueue 的数据。 EventReaderEventQueue 交互的唯一方式是在遍历期间切换到下一个块时增加读取计数。

使用方法

API文档

use rc_event_queue::prelude::*;
use rc_event_queue::mpmc::{EventQueue, EventReader};

let event = EventQueue::<usize>::new();
let mut reader1 = EventReader::new(event);
let mut reader2 = EventReader::new(event);

event.push(1);
event.push(10);
event.push(100);
event.push(1000);

fn sum (mut iter: impl LendingIterator<ItemValue = usize>) -> usize {
    let mut sum = 0;
    while let Some(item) = iter.next() {
        sum += item;
    }
    sum
}

assert!(sum(reader1.iter()) == 1111);
assert!(sum(reader1.iter()) == 0);
assert!(sum(reader2.iter()) == 1111);
assert!(sum(reader2.iter()) == 0);

event.extend(0..10);
assert!(sum(reader1.iter()) == 55);
assert!(sum(reader2.iter()) == 55);

clear

event.push(1);
event.push(10);
event.clear();
event.push(100);
event.push(1000);

assert!(sum(reader1.iter()) == 1100);
assert!(sum(reader2.iter()) == 1100);

clear/truncate_front 有一些特性 - 被读取者占用的块不会立即释放。

紧急切割

如果任何读者长时间未阅读,可能会保留清理队列。这意味着队列容量会增长。在长时间运行且不可预测的系统上,您可能需要定期检查 total_capacity,如果它增长过多,则可能需要强制裁剪/清除。

if event.total_capacity() > 100000{
    // This will not free chunks occupied by readers, but will free the rest.
    // This should be enough, to prevent memory leak, in case if some readers
    // stop consume unexpectedly.
    event.truncate_front(1000);     // leave some of the latest messages to read
    
    // If you set to Settings::MAX_CHUNK_SIZE to high value,
    // this will reduce chunk size.
    event.change_chunk_size(2048);

    // If you DO have access to all readers (you probably don't) - 
    // this will move readers forward, and free the chunks occupied by readers.
    // Under normal conditions, this is not necessary, since readers will jump
    // forward to another chunk immediately on the next iter() call.
    for reader in readers{
        reader.update_position();
        // reader.iter();   // this have same effect as above
    }
}

即使某些读者永远停止阅读,您也只会丢失/泄露由读者直接占用的块。

优化

清理

Settings 中将 CLEANUP 设置为 Never,以便推迟块分配。

use rc_event_reader::mpmc::{EventQueue, EventReader, Settings};

struct S{} impl Settings for S{
    const MIN_CHUNK_SIZE: u32 = 4;
    const MAX_CHUNK_SIZE: u32 = 4096;
    const CLEANUP: CleanupMode = CleanupMode::Never;
}

let event = EventQueue::<usize, S>::new();
let mut reader = event.subscribe();

event.extend(0..10);
sum(reader.iter()); // With CLEANUP != Never, this would cause chunk deallocation

...

event.cleanup();   // Free used chunks

双缓冲

使用 double_buffering 功能。这将重用最大的已释放块。当 EventQueue 达到其最佳大小 - 块将仅交换,而无需分配/释放。

稳定性

EventQueue 已通过测试。 Miri 测试。 Loom 测试。参见 doc/tests.md

依赖项

~305KB