5 个不稳定版本
使用旧的 Rust 2015
0.3.2 | 2017 年 2 月 18 日 |
---|---|
0.3.1 | 2017 年 2 月 7 日 |
0.2.1 | 2017 年 2 月 7 日 |
0.1.1 | 2017 年 1 月 31 日 |
#1010 在 异步
410 每月下载次数
用于 10 个 crates (8 个直接使用)
165KB
3K SLoC
MultiQueue:快速 MPMC 广播队列
MultiQueue 是一个快速的 mpmc 有限队列,支持广播/广播样式的操作
概述
MultiQueue 基于LMAX Disruptor的队列设计,进行了一些改进
- 它可以作为 futures 流/接收器,因此可以轻松设置高性能计算管道
- 它可以动态添加/删除生产者,并且每个 流 都可以有多个消费者
- 它具有快速回退功能,用于在只有一个消费者和/或一个生产者时,并可以在运行时检测切换
- 它在 32 位系统上运行,没有任何性能或能力上的损失
- 在大多数情况下,可以直接在队列中写入数据而无需复制
可以将 MultiQueue 视为一种类似 souped up channel/sync_channel 的东西,具有多个独立消费者,每个消费者都接收相同的 流 数据的能力。那么为什么选择 MultiQueue 而不是内置的通道呢?
- MultiQueue 支持通过单个推送将元素广播到多个读取器
- 在大多数情况下,MultiQueue 允许在队列中就地读取元素,因此可以广播元素而无需大量复制
- MultiQueue 可以充当 futures 流和接收器
- 与通道不同,MultiQueue 在推送/弹出时不会分配内存,导致延迟更可预测
- 与 sync_channel 不同,MultiQueue 实际上是无锁的1,在争用情况下表现良好
另一方面,如果您
- 真正需要一个无界队列,尽管您可能需要处理回压
- 需要发送者在队列满时阻塞,并且不能使用 futures api
- 不希望大缓冲区的内存使用
- 你需要一个单次使用队列
- 您非常频繁地添加/删除生产者/消费者
否则,在大多数情况下,MultiQueue应该是通道的良好替代品。总的来说,这将以非常良好的方式作为常规有界队列运行,性能接近手写队列的性能,即使没有利用广播功能
队列模型
从高层次看,MultiQueue的功能类似于LMAX Disruptor。有一个传入的FIFO数据流,被广播到一组订阅者,就像有多个流正在写入一样。有两个主要区别
- MultiQueue透明地支持在单生产和多生产者之间切换。
- 每个广播流可以被多个消费者共享。
最后一点使模型有点令人困惑,因为数据流和消费该流的东西之间有一个区别。更糟糕的是,每个消费者可能实际上看不到流上的每个值。相反,多个消费者可以对单个流进行操作,每个消费者都获得对某些元素的独特访问权限。
一个有用的心理模型可能是这样想,每个流实际上只是一个被推送到mpmc队列的队列,而MultiQueue结构只是在幕后将它们组合在一起。
以下是一个表示广播队列通用用例的图,其中每个消费者对流的访问都是独特的 - #代表生产者,@代表每个流的消费者,每个都有标签。线条旨在显示数据通过队列的流动。
-> # @-1
\ /
-> -> -> @-2
/ \
-> # @-3
这是一个相当标准的广播队列设置 - 对于每个发送的元素,它在该流的消费者中看到。
然而,在MultiQueue中,每个逻辑消费者实际上可能被分配到许多实际消费者,如下所示。
-> # @-1
\ /
-> -> -> @-2' (really @+@+@ each compete for a spot)
/ \
-> # @-3
如果这个图表被重新绘制,其中每个生产者发送一个有序元素(时间从左到右移动)
t=1|t=2| t=3 | t=4|
1 -> # @-1 (1, 2)
\ /
-> 2 -> 1 -> -> @-2' (really @ (1) + @ (2) + @ (nothing yet))
/ \
2 -> # @-3 (1, 2)
如果想象这是一个网络服务器,@-1和@-3的流可能在进行一些随机的网络服务工作,如日志记录或度量收集,并且可以在单个核心上完全处理工作量,@-2正在进行昂贵的处理请求工作,并分成多个工作者处理数据流。
由于那些绘图可能没有意义,这里有一些例子
示例
单生产者单流
对于队列来说,这已经很简了。快速,一个写者,一个读者,使用简单。
extern crate multiqueue;
use std::thread;
let (send, recv) = multiqueue::mpmc_queue(10);
thread::spawn(move || {
for val in recv {
println!("Got {}", val);
}
});
for i in 0..10 {
send.try_send(i).unwrap();
}
// Drop the sender to close the queue
drop(send);
// prints
// Got 0
// Got 1
// Got 2
// etc
// some join mechanics here
单生产者双流。
让我们将值发送到两个不同的流
extern crate multiqueue;
use std::thread;
let (send, recv) = multiqueue::multicast_queue(4);
for i in 0..2 { // or n
let cur_recv = recv.add_stream();
thread::spawn(move || {
for val in cur_recv {
println!("Stream {} got {}", i, val);
}
});
}
// Take notice that I drop the reader - this removes it from
// the queue, meaning that the readers in the new threads
// won't get starved by the lack of progress from recv
recv.unsubscribe();
for i in 0..10 {
// Don't do this busy loop in real stuff unless you're really sure
loop {
if send.try_send(i).is_ok() {
break;
}
}
}
// Drop the sender to close the queue
drop(send);
// prints along the lines of
// Stream 0 got 0
// Stream 0 got 1
// Stream 1 got 0
// Stream 0 got 2
// Stream 1 got 1
// etc
// some join mechanics here
单生产者广播,每条流两个消费者
让我们让每个流由两个消费者消费上述内容
extern crate multiqueue;
use std::thread;
let (send, recv) = multiqueue::multicast_queue(4);
for i in 0..2 { // or n
let cur_recv = recv.add_stream();
for j in 0..2 {
let stream_consumer = cur_recv.clone();
thread::spawn(move || {
for val in stream_consumer {
println!("Stream {} consumer {} got {}", i, j, val);
}
});
}
// cur_recv is dropped here
}
// Take notice that I drop the reader - this removes it from
// the queue, meaning that the readers in the new threads
// won't get starved by the lack of progress from recv
recv.unsubscribe();
for i in 0..10 {
// Don't do this busy loop in real stuff unless you're really sure
loop {
if send.try_send(i).is_ok() {
break;
}
}
}
drop(send);
// prints along the lines of
// Stream 0 consumer 1 got 2
// Stream 0 consumer 0 got 0
// Stream 1 consumer 0 got 0
// Stream 0 consumer 1 got 1
// Stream 1 consumer 1 got 1
// Stream 1 consumer 0 got 2
// etc
// some join mechanics here
有些奇怪
有人真的决定使用甚至想要做更多吗?
extern crate multiqueue;
use std::thread;
let (send, recv) = multiqueue::multicast_queue(4);
// start like before
for i in 0..2 { // or n
let cur_recv = recv.add_stream();
for j in 0..2 {
let stream_consumer = cur_recv.clone();
thread::spawn(move || {
for val in stream_consumer {
println!("Stream {} consumer {} got {}", i, j, val);
}
});
}
// cur_recv is dropped here
}
// On this stream, since there's only one consumer,
// the receiver can be made into a SingleReceiver
// which can view items inline in the queue
let single_recv = recv.add_stream().into_single().unwrap();
thread::spawn(move || {
for val in single_recv.iter_with(|item_ref| 10 * *item_ref) {
println!("{}", val);
}
});
// Same as above, except this time we just want to iterate until the receiver is empty
let single_recv_2 = recv.add_stream().into_single().unwrap();
thread::spawn(move || {
for val in single_recv_2.try_iter_with(|item_ref| 10 * *item_ref) {
println!("{}", val);
}
});
// Take notice that I drop the reader - this removes it from
// the queue, meaning that the readers in the new threads
// won't get starved by the lack of progress from recv
recv.unsubscribe();
// Many senders to give all the receivers something
for _ in 0..3 {
let cur_send = send.clone();
for i in 0..10 {
thread::spawn(loop {
if cur_send.try_send(i).is_ok() {
break;
}
});
}
}
drop(send);
MPMC 模式
可能会注意到,广播队列模式要求类型是Clone,而单读者原地变体要求类型也是Sync。这仅适用于广播队列,不适用于正常mpmc队列,因此还有一个mpmc api。它不需要类型为Clone或Sync的任何api,并且直接将项目从队列中移动出来而不是克隆它们。除了这一点外,基本没有api差异,所以我不打算对它们进行大量介绍。
Futures 模式
对于mpmc和广播,都支持futures模式。数据结构与正常的数据结构非常相似,除了它们为发送者和接收者实现了Futures Sink/Stream traits。这会带来一些性能成本,这就是为什么futures类型是分开的。
基准测试
吞吐量
更多内容即将推出,但这里是一些在Intel(R) Xeon(R) CPU E3-1240 v5上进行的基准测试,这些测试使用了busywait方法来阻塞,但在实际应用中,在接收者中使用阻塞等待将比大多数用例更快。
单生产者单消费者:每秒50-70百万操作。在这种情况下,通道大约每秒有8-11百万操作。
# -> -> @
单生产者单消费者,广播到两个不同的流:每秒2800万操作2
@
/
# -> ->
\
@
单生产者单消费者,广播到三个不同的流:每秒2500万操作2
@
/
# -> -> -> @
\
@
多生产者单消费者:每秒900万操作。在这种情况下,通道每秒大约执行800-900万操作。
#
\
-> -> @
/
#
多生产者单消费者,广播到两个不同的流:每秒800万操作
# @
\ /
-> ->
/ \
# @
延迟
我需要重新编写延迟基准工具,但延迟大约是内核间通信延迟,单核机器上大约是40-70纳秒。如果有多个生产者和消费者,这些值会稍高,因为每个都必须在完成写入或读取之前执行RMW操作。
常见问题解答
我的类型不是Clone,我能使用队列吗?
你可以使用队列的MPMC部分,但不能进行广播。
为什么发送者不会阻塞,尽管读者可以?
如果一个读者确实没有任何事情可做,那么它阻塞是合理的,而对于发送者来说,情况则不然。如果发送者阻塞,这意味着系统已出现积压,应该采取措施(例如启动另一个从积压流中消费的工作者)!此外,这会给队列带来比我所希望的更大的性能损失,并且通知发送者的延迟发生在队列操作完成之前,而通知读者的操作发生在值发送之后。
为什么即使发送者不能阻塞,未来的发送者还可以挂起?
这是为了使futures api合理运行所必需的,因为当futures无法将数据发送到队列时,它期望任务将被挂起并由其他进程唤醒(如果这是错误的,请告诉我!)这在当时也是合理的,因为其他事件将在那段期间得到处理,而不是简单的阻塞。我可能还会为那些想要futures api的友好性但不想承受性能损失的人添加一个仅旋转队列的futures api。
当出现积压时,我想知道哪个流落后最远,我能做到吗?
目前,这是不可能的。一般来说,这类问题很难具体回答,因为任何尝试回答它都会与写入者的更新竞争,而且也没有方法将“哪个流落后”的想法转化为程序可以执行的操作。
是否可以从一组多队列中选择?
不,目前不能。由于性能是队列的关键特性,我不希望进行任何修改以允许选择,因为这可能会对性能产生负面影响。
如果一个流的消费者落后会发生什么?
队列不会覆盖直到所有流都经过它的数据点,因此对队列的写入将失败。根据你的目标,这可能是好事也可能是坏事。一方面,没有人喜欢因为某个愚蠢的慢线程而被阻塞或饥饿更新。另一方面,这基本上强制执行了一种系统级的积压控制。如果你想了解为什么需要这样的例子,纽约证券交易所偶尔不会使综合信息流与个别信息流和市场的更新同步,市场就会陷入混乱。
脚注
1. 从技术上来说,队列不是无锁的——一个已经获得写入位置的写入者如果卡住了,将会阻塞读者前进。有一个无锁的MPMC边界队列,但它需要静态已知的最大发送者数,并且我认为不能扩展到广播。在实践中,这很少会很重要。
2 这些基准测试的结果差异很大,所以我取了上限。在其他一些机器上,与spsc情况相比,它们显示了微小的性能差异,所以我认为在实践中,有效吞吐量将高得多。
依赖项
~2MB
~32K SLoC