8个版本
0.4.2 | 2021年10月20日 |
---|---|
0.4.1 | 2021年10月11日 |
0.3.1 | 2021年10月2日 |
0.3.0 | 2021年9月29日 |
0.2.2 | 2021年9月27日 |
#456 在 并发
每月247次下载
105KB
2K SLoC
读计事件队列
快速、并发FIFO事件队列(或消息队列)。多个消费者接收每条消息。
- mpmc(多生产者多消费者)- 无锁读,锁写。
- spmc(单生产者多消费者)- 无锁读,无锁写。
写操作从不阻塞读操作。性能面向消费者。大多为连续内存布局。内存消耗不随读取者数量增长。
性能
具有非常低的CPU + 内存开销。大部分时间读者只需在每个iter()
调用中执行1次原子加载。仅此而已!
单线程。
读 - 接近 VecDeque
!写
mpmc
-push
比较慢,需要至少4个项目才能接近VecDeque
。spmc
- 与VecDeque
相同!
多线程。
读 - 每个线程的性能随着同时读取的线程数量增加而缓慢下降。(也请记住,由于 rc_event_queue
是消息队列,每个读者都读取整个队列 - 增加读取者不会使队列消耗更快)
写 - 每个线程的性能几乎呈线性下降,每个额外的同时写入线程。(由于被锁定)。不适用于 spmc
。
注意。但如果没有激烈竞争 - 性能非常接近单线程情况。
工作原理
查看 doc/principle-of-operation.md。
简而言之 - EventQueue
基于块操作。 EventQueue
不接触 EventReader
。 EventReader
总是“拉取”来自 EventQueue
的数据。 EventReader
与 EventQueue
交互的唯一方式是在遍历期间切换到下一个块时增加读取计数。
使用方法
use rc_event_queue::prelude::*;
use rc_event_queue::mpmc::{EventQueue, EventReader};
let event = EventQueue::<usize>::new();
let mut reader1 = EventReader::new(event);
let mut reader2 = EventReader::new(event);
event.push(1);
event.push(10);
event.push(100);
event.push(1000);
fn sum (mut iter: impl LendingIterator<ItemValue = usize>) -> usize {
let mut sum = 0;
while let Some(item) = iter.next() {
sum += item;
}
sum
}
assert!(sum(reader1.iter()) == 1111);
assert!(sum(reader1.iter()) == 0);
assert!(sum(reader2.iter()) == 1111);
assert!(sum(reader2.iter()) == 0);
event.extend(0..10);
assert!(sum(reader1.iter()) == 55);
assert!(sum(reader2.iter()) == 55);
clear
event.push(1);
event.push(10);
event.clear();
event.push(100);
event.push(1000);
assert!(sum(reader1.iter()) == 1100);
assert!(sum(reader2.iter()) == 1100);
clear
/truncate_front
有一些特性 - 被读取者占用的块不会立即释放。
紧急切割
如果任何读者长时间未阅读,可能会保留清理队列。这意味着队列容量会增长。在长时间运行且不可预测的系统上,您可能需要定期检查 total_capacity
,如果它增长过多,则可能需要强制裁剪/清除。
if event.total_capacity() > 100000{
// This will not free chunks occupied by readers, but will free the rest.
// This should be enough, to prevent memory leak, in case if some readers
// stop consume unexpectedly.
event.truncate_front(1000); // leave some of the latest messages to read
// If you set to Settings::MAX_CHUNK_SIZE to high value,
// this will reduce chunk size.
event.change_chunk_size(2048);
// If you DO have access to all readers (you probably don't) -
// this will move readers forward, and free the chunks occupied by readers.
// Under normal conditions, this is not necessary, since readers will jump
// forward to another chunk immediately on the next iter() call.
for reader in readers{
reader.update_position();
// reader.iter(); // this have same effect as above
}
}
即使某些读者永远停止阅读,您也只会丢失/泄露由读者直接占用的块。
优化
清理
在 Settings
中将 CLEANUP
设置为 Never
,以便推迟块分配。
use rc_event_reader::mpmc::{EventQueue, EventReader, Settings};
struct S{} impl Settings for S{
const MIN_CHUNK_SIZE: u32 = 4;
const MAX_CHUNK_SIZE: u32 = 4096;
const CLEANUP: CleanupMode = CleanupMode::Never;
}
let event = EventQueue::<usize, S>::new();
let mut reader = event.subscribe();
event.extend(0..10);
sum(reader.iter()); // With CLEANUP != Never, this would cause chunk deallocation
...
event.cleanup(); // Free used chunks
双缓冲
使用 double_buffering
功能。这将重用最大的已释放块。当 EventQueue
达到其最佳大小 - 块将仅交换,而无需分配/释放。
稳定性
EventQueue
已通过测试。 Miri 测试。 Loom 测试。参见 doc/tests.md
依赖项
~305KB