8 个版本
0.1.7 | 2020 年 12 月 20 日 |
---|---|
0.1.6 | 2019 年 9 月 20 日 |
0.1.5 | 2019 年 5 月 28 日 |
#821 在 异步
每月 221 次下载
175KB
3.5K SLoC
MultiQueue2: 快速 MPMC 广播队列
MultiQueue2 是一个快速的有界 mpmc 队列,支持广播/广播样式操作
MultiQueue 由 Sam Schetterer 开发,但已经有一段时间没有更新。我发现它非常有用,因为它实现了 futures
。然而,它有一些过时的库 API,并且使用自旋锁在很多情况下会占用 100% 的 CPU。
MultiQueue2 的新特性
这个版本试图修复这些问题。默认情况下,现在使用 condvar 块。对于 _fut_
异步通道,所有项目都会迅速挂起,而无需初始自旋锁。
此队列的使用实际上是锁的,但从技术角度和严格意义上讲并非如此。有三种类型的锁
- 自旋锁
std::thread::yield_now
- 忙等待
- Condvar 阻塞
2
是最快的,但会占用 100% 的 CPU。MultiQueue2 的默认设置(包括 _fut
通道)是它们的组合。实际上,这不会成为应用程序的瓶颈。一个可能更适合更改到 2
的用例是音频和视频转换。
用法: futures_multiqueue_with(<capacity>,<try_spins>,<yield_spins>)
capacity
是队列允许的最大项目数;当它满时,将发出 Err(Full{...})
。 try_spins
是一种高效、低延迟的阻塞等待,用于轻量级冲突解决,当您的CPU使用率高时,可以降低此数值。
yield_spins
仍然忙碌,但由 yield()
慢化,这个数值可以很小。
futures_multiqueue_with(1000,0,0)
是可能的,这将把这个混合锁转换成内核锁。
请随意测试与您的系统匹配的不同设置。
所有依赖项都已升级,所有警告都已修复并升级到 2018。
目录: 概述 | 示例 | MPMC 模式 | Futures 模式 | 基准测试 | 常见问题解答
概述
多队列基于 LMAX Disruptor 的队列设计,略有改进
- 未来流/汇(实现了
futures
特性) - 它可以动态地添加/删除生产者,并且每个 流 都可以有多个消费者
- 它在任何时候都有一对一的消费者和/或生产者时,具有快速的回退机制,并且可以在运行时检测到切换
- 它在 32 位系统上工作,没有任何性能或能力上的损失
- 在大多数情况下,可以直接在队列中写入数据,而无需复制
可以将 MultiQueue 视为一个增强版的 通道/sync_channel,具有额外的功能,即每个独立的消费者都可以接收相同的 流 数据。
选择 MultiQueue2 而不是内置通道的原因
- 支持将元素广播到多个读取器,只需在队列中单次推送即可
- 在大多数情况下,允许在队列中就地读取元素,因此您可以在不进行大量复制的情况下广播元素
- 可以作为 futures 流和汇
- 与通道不同,不会在推送/弹出时分配内存,导致具有更多可预测的延迟
- 与 sync_channel 不同,几乎是无锁的,并且在争用下表现相当不错
不选择 MultiQueue2 而选择内置通道的原因
- 真正需要无界队列,尽管您可能应该处理回压
- 需要发送者在队列满时阻塞,并且不能使用 futures API
- 不希望使用大缓冲区的内存使用量
- 需要一次性队列
- 非常频繁地添加/删除生产者/消费者
在其他情况下,MultiQueue 应该是通道的良好替代品。通常,它将像常规有界队列一样工作得很好,其性能接近手写队列的单个/多个生产者/消费者
即使不利用广播
示例
单个生产者单个流
对于队列来说,这已经是最简单的情况了。快速、一个写者、一个读者,简单易用。
extern crate multiqueue2 as multiqueue;
use std::thread;
let (send, recv) = multiqueue::mpmc_queue(10);
thread::spawn(move || {
for val in recv {
println!("Got {}", val);
}
});
for i in 0..10 {
send.try_send(i).unwrap();
}
// Drop the sender to close the queue
drop(send);
// prints
// Got 0
// Got 1
// Got 2
// etc
// some join mechanics here
单个生产者双流。
让我们将值发送到两个不同的流
extern crate multiqueue2 as multiqueue;
use std::thread;
let (send, recv) = multiqueue::broadcast_queue(4);
for i in 0..2 { // or n
let cur_recv = recv.add_stream();
thread::spawn(move || {
for val in cur_recv {
println!("Stream {} got {}", i, val);
}
});
}
// Take notice that I drop the reader - this removes it from
// the queue, meaning that the readers in the new threads
// won't get starved by the lack of progress from recv
recv.unsubscribe();
for i in 0..10 {
// Don't do this busy loop in real stuff unless you're really sure
loop {
if send.try_send(i).is_ok() {
break;
}
}
}
// Drop the sender to close the queue
drop(send);
// prints along the lines of
// Stream 0 got 0
// Stream 0 got 1
// Stream 1 got 0
// Stream 0 got 2
// Stream 1 got 1
// etc
// some join mechanics here
单个生产者广播,每个流有两个消费者
让我们将上面的内容改为每个流由两个消费者消费
extern crate multiqueue2 as multiqueue;
use std::thread;
let (send, recv) = multiqueue::broadcast_queue(4);
for i in 0..2 { // or n
let cur_recv = recv.add_stream();
for j in 0..2 {
let stream_consumer = cur_recv.clone();
thread::spawn(move || {
for val in stream_consumer {
println!("Stream {} consumer {} got {}", i, j, val);
}
});
}
// cur_recv is dropped here
}
// Take notice that I drop the reader - this removes it from
// the queue, meaning that the readers in the new threads
// won't get starved by the lack of progress from recv
recv.unsubscribe();
for i in 0..10 {
// Don't do this busy loop in real stuff unless you're really sure
loop {
if send.try_send(i).is_ok() {
break;
}
}
}
drop(send);
// prints along the lines of
// Stream 0 consumer 1 got 2
// Stream 0 consumer 0 got 0
// Stream 1 consumer 0 got 0
// Stream 0 consumer 1 got 1
// Stream 1 consumer 1 got 1
// Stream 1 consumer 0 got 2
// etc
// some join mechanics here
一些奇怪的事情
有人真正想过要使用甚至想要做更类似于的东西吗?
extern crate multiqueue2 as multiqueue;
use std::thread;
let (send, recv) = multiqueue::broadcast_queue(4);
// start like before
for i in 0..2 { // or n
let cur_recv = recv.add_stream();
for j in 0..2 {
let stream_consumer = cur_recv.clone();
thread::spawn(move || {
for val in stream_consumer {
println!("Stream {} consumer {} got {}", i, j, val);
}
});
}
// cur_recv is dropped here
}
// On this stream, since there's only one consumer,
// the receiver can be made into a SingleReceiver
// which can view items inline in the queue
let single_recv = recv.add_stream().into_single().unwrap();
thread::spawn(move || {
for val in single_recv.iter_with(|item_ref| 10 * *item_ref) {
println!("{}", val);
}
});
// Same as above, except this time we just want to iterate until the receiver is empty
let single_recv_2 = recv.add_stream().into_single().unwrap();
thread::spawn(move || {
for val in single_recv_2.try_iter_with(|item_ref| 10 * *item_ref) {
println!("{}", val);
}
});
// Take notice that I drop the reader - this removes it from
// the queue, meaning that the readers in the new threads
// won't get starved by the lack of progress from recv
recv.unsubscribe();
// Many senders to give all the receivers something
for _ in 0..3 {
let cur_send = send.clone();
for i in 0..10 {
thread::spawn(loop {
if cur_send.try_send(i).is_ok() {
break;
}
});
}
}
drop(send);
MPMC 模式
可能会注意到,广播队列模式要求类型必须是 Clone,而单个读取者的原地变体也要求类型必须是
Sync。这仅适用于广播队列,而不是正常的 mpmc 队列,因此还有一个 mpmc 接口。
Multiqueue2 对于任何 API 都不需要类型是 Clone 或
Sync,并且直接将项目从队列中移出而不是克隆它们。除了这一点之外,API 基本上没有区别,因此我不会在它们上占用大量篇幅。
未来模式
对于 mpmc 和广播,都支持未来模式。数据结构与正常的数据结构非常相似,只是它们实现了发送者和接收者的 Futures
Sink
/Stream
特性。这会带来一定的性能成本,这也是为什么未来类型被分开的原因。
基准测试
吞吐量
吞吐量是通过使用 condvar 阻塞锁进行基准测试的,这是队列系统的默认设置。这确保了即使对于长时间阻塞的异步项目,CPU 也能高效使用。
切换到忙等待锁可以提供另一个 30% 的吞吐量提升。
SPSC:在 1p::1c 上执行 10000000 push/pop 对的时间为每个项目 292.9397618 纳秒
SPMC:在 1p::1c_2b 上执行 10000000 push/pop 对的时间为每个项目 310.12774815 纳秒
在 1p::1c_3b 上执行 10000000 push/pop 对的时间为每个项目 317.77275306666667 纳秒
MPSC:在 2p::1c 上执行 10000000 push/pop 对的时间为每个项目 378.5664167 纳秒
MPMC:在 2p::1c_2b 上执行 10000000 push/pop 对的时间为每个项目 377.69721405 纳秒
在 2p::1c_3b 上执行 10000000 push/pop 对的时间为每个项目 414.59893453333336 纳秒
在 MacBook Pro 2018 i7,16GB Ram 上。
这里没有延迟基准测试工具,但延迟将大约是核心间通信延迟,大约在单插槽机器上的 40-70 纳秒。
随着多个生产者和多个消费者的增加,这些值会更高,因为每个生产者或消费者在完成写入或读取之前都必须执行 RMW 操作。
常见问题解答
我的类型不是 Clone,我能使用队列吗?
您可以使用队列的 MPMC 部分,但不能进行广播。
为什么发送者不能阻塞,而读取者可以?
当确实没有可做之事时,让读取者阻塞是有意义的,而发送者的情况则不然。
如果发送方被阻塞,这意味着系统已经满载,需要其他东西来处理积压的项目。
此外,它给队列带来了更大的性能损失,通知发送方的延迟在队列操作完成之前就已经发生,而通知读取器则在数据发送之后。
为什么未来发送者可以停车,尽管发送者不能被阻塞呢?
这是为了使未来API能够合理工作,因为当未来无法发送到队列时,它期望任务会被某个其他进程挂起和唤醒(如果这是错误的,请告知我!)。这样做是有道理的,因为在那段时间里,将处理其他事件而不是简单的阻塞。我可能还会添加一个仅对队列进行空转的未来API,供那些想要未来API的优雅性但不想承受性能损失的人使用。
当出现积压时,我想知道哪个流落后最多,我能做到吗?
目前,这是不可能的。一般来说,这样的问题很难具体回答,因为任何试图回答它的尝试都会与编写器更新竞争,而且也没有办法将“哪个流落后”的想法转化为程序可以执行的操作。
能否从一组多队列中选择?
不,目前还不能。为了获得良好的性能,队列中的所有项目都不是共享的。
如果一个流的消费者落后了会怎样?
队列不会在所有流都通过它之前覆盖数据点,因此对队列的写入会失败。根据你的目标,这可能是好事也可能是坏事。一方面,没有人喜欢因为某个愚蠢的慢线程而阻塞或饥饿更新。另一方面,这基本上强制实施了一种系统级积压控制。如果你想知道为什么需要这种控制,NYSE偶尔不会将合并的实时数据与个别实时数据和市场保持同步,市场就会陷入混乱。
依赖项
~3MB
~66K SLoC