8 个版本

0.1.7 2020 年 12 月 20 日
0.1.6 2019 年 9 月 20 日
0.1.5 2019 年 5 月 28 日

#821异步

Download history 47/week @ 2024-03-15 22/week @ 2024-03-22 39/week @ 2024-03-29 32/week @ 2024-04-05 43/week @ 2024-04-12 24/week @ 2024-04-19 22/week @ 2024-04-26 38/week @ 2024-05-03 38/week @ 2024-05-10 63/week @ 2024-05-17 39/week @ 2024-05-24 70/week @ 2024-05-31 42/week @ 2024-06-07 113/week @ 2024-06-14 47/week @ 2024-06-21 6/week @ 2024-06-28

每月 221 次下载

MIT 许可证

175KB
3.5K SLoC

MultiQueue2: 快速 MPMC 广播队列

Build Status Crates.io

MultiQueue2 是一个快速的有界 mpmc 队列,支持广播/广播样式操作

MultiQueue 由 Sam Schetterer 开发,但已经有一段时间没有更新。我发现它非常有用,因为它实现了 futures。然而,它有一些过时的库 API,并且使用自旋锁在很多情况下会占用 100% 的 CPU。

MultiQueue2 的新特性

这个版本试图修复这些问题。默认情况下,现在使用 condvar 块。对于 _fut_ 异步通道,所有项目都会迅速挂起,而无需初始自旋锁。

此队列的使用实际上是锁的,但从技术角度和严格意义上讲并非如此。有三种类型的锁

  1. 自旋锁 std::thread::yield_now
  2. 忙等待
  3. Condvar 阻塞

2 是最快的,但会占用 100% 的 CPU。MultiQueue2 的默认设置(包括 _fut 通道)是它们的组合。实际上,这不会成为应用程序的瓶颈。一个可能更适合更改到 2 的用例是音频和视频转换。

用法: futures_multiqueue_with(<capacity>,<try_spins>,<yield_spins>)

capacity 是队列允许的最大项目数;当它满时,将发出 Err(Full{...})try_spins 是一种高效、低延迟的阻塞等待,用于轻量级冲突解决,当您的CPU使用率高时,可以降低此数值。
yield_spins 仍然忙碌,但由 yield() 慢化,这个数值可以很小。
futures_multiqueue_with(1000,0,0) 是可能的,这将把这个混合锁转换成内核锁。
请随意测试与您的系统匹配的不同设置。

所有依赖项都已升级,所有警告都已修复并升级到 2018。

目录: 概述 | 示例 | MPMC 模式 | Futures 模式 | 基准测试 | 常见问题解答

概述

多队列基于 LMAX Disruptor 的队列设计,略有改进

  • 未来流/汇(实现了 futures 特性)
  • 它可以动态地添加/删除生产者,并且每个 都可以有多个消费者
  • 它在任何时候都有一对一的消费者和/或生产者时,具有快速的回退机制,并且可以在运行时检测到切换
  • 它在 32 位系统上工作,没有任何性能或能力上的损失
  • 在大多数情况下,可以直接在队列中写入数据,而无需复制

可以将 MultiQueue 视为一个增强版的 通道/sync_channel,具有额外的功能,即每个独立的消费者都可以接收相同的 数据。

选择 MultiQueue2 而不是内置通道的原因

  • 支持将元素广播到多个读取器,只需在队列中单次推送即可
  • 在大多数情况下,允许在队列中就地读取元素,因此您可以在不进行大量复制的情况下广播元素
  • 可以作为 futures 流和汇
  • 与通道不同,不会在推送/弹出时分配内存,导致具有更多可预测的延迟
  • 与 sync_channel 不同,几乎是无锁的,并且在争用下表现相当不错

不选择 MultiQueue2 而选择内置通道的原因

  • 真正需要无界队列,尽管您可能应该处理回压
  • 需要发送者在队列满时阻塞,并且不能使用 futures API
  • 不希望使用大缓冲区的内存使用量
  • 需要一次性队列
  • 非常频繁地添加/删除生产者/消费者

在其他情况下,MultiQueue 应该是通道的良好替代品。通常,它将像常规有界队列一样工作得很好,其性能接近手写队列的单个/多个生产者/消费者

即使不利用广播

示例

单个生产者单个流

对于队列来说,这已经是最简单的情况了。快速、一个写者、一个读者,简单易用。

extern crate multiqueue2 as multiqueue;

use std::thread;

let (send, recv) = multiqueue::mpmc_queue(10);

thread::spawn(move || {
    for val in recv {
        println!("Got {}", val);
    }
});

for i in 0..10 {
    send.try_send(i).unwrap();
}

// Drop the sender to close the queue
drop(send);

// prints
// Got 0
// Got 1
// Got 2
// etc


// some join mechanics here

单个生产者双流。

让我们将值发送到两个不同的流

extern crate multiqueue2 as multiqueue;

use std::thread;

let (send, recv) = multiqueue::broadcast_queue(4);

for i in 0..2 { // or n
    let cur_recv = recv.add_stream();
    thread::spawn(move || {
        for val in cur_recv {
            println!("Stream {} got {}", i, val);
        }
    });
}

// Take notice that I drop the reader - this removes it from
// the queue, meaning that the readers in the new threads
// won't get starved by the lack of progress from recv
recv.unsubscribe();

for i in 0..10 {
    // Don't do this busy loop in real stuff unless you're really sure
    loop {
        if send.try_send(i).is_ok() {
            break;
        }
    }
}

// Drop the sender to close the queue
drop(send);

// prints along the lines of
// Stream 0 got 0
// Stream 0 got 1
// Stream 1 got 0
// Stream 0 got 2
// Stream 1 got 1
// etc

// some join mechanics here

单个生产者广播,每个流有两个消费者

让我们将上面的内容改为每个流由两个消费者消费

extern crate multiqueue2 as multiqueue;

use std::thread;

let (send, recv) = multiqueue::broadcast_queue(4);

for i in 0..2 { // or n
    let cur_recv = recv.add_stream();
    for j in 0..2 {
        let stream_consumer = cur_recv.clone();
        thread::spawn(move || {
            for val in stream_consumer {
                println!("Stream {} consumer {} got {}", i, j, val);
            }
        });
    }
    // cur_recv is dropped here
}

// Take notice that I drop the reader - this removes it from
// the queue, meaning that the readers in the new threads
// won't get starved by the lack of progress from recv
recv.unsubscribe();

for i in 0..10 {
    // Don't do this busy loop in real stuff unless you're really sure
    loop {
        if send.try_send(i).is_ok() {
            break;
        }
    }
}
drop(send);

// prints along the lines of
// Stream 0 consumer 1 got 2
// Stream 0 consumer 0 got 0
// Stream 1 consumer 0 got 0
// Stream 0 consumer 1 got 1
// Stream 1 consumer 1 got 1
// Stream 1 consumer 0 got 2
// etc

// some join mechanics here

一些奇怪的事情

有人真正想过要使用甚至想要做更类似于的东西吗?

extern crate multiqueue2 as multiqueue;

use std::thread;

let (send, recv) = multiqueue::broadcast_queue(4);

// start like before
for i in 0..2 { // or n
    let cur_recv = recv.add_stream();
    for j in 0..2 {
        let stream_consumer = cur_recv.clone();
        thread::spawn(move || {
            for val in stream_consumer {
                println!("Stream {} consumer {} got {}", i, j, val);
            }
        });
    }
    // cur_recv is dropped here
}

// On this stream, since there's only one consumer,
// the receiver can be made into a SingleReceiver
// which can view items inline in the queue
let single_recv = recv.add_stream().into_single().unwrap();

thread::spawn(move || {
    for val in single_recv.iter_with(|item_ref| 10 * *item_ref) {
        println!("{}", val);
    }
});

// Same as above, except this time we just want to iterate until the receiver is empty
let single_recv_2 = recv.add_stream().into_single().unwrap();

thread::spawn(move || {
    for val in single_recv_2.try_iter_with(|item_ref| 10 * *item_ref) {
        println!("{}", val);
    }
});

// Take notice that I drop the reader - this removes it from
// the queue, meaning that the readers in the new threads
// won't get starved by the lack of progress from recv
recv.unsubscribe();

// Many senders to give all the receivers something
for _ in 0..3 {
    let cur_send = send.clone();
    for i in 0..10 {
        thread::spawn(loop {
            if cur_send.try_send(i).is_ok() {
                break;
            }
        });
    }
}
drop(send);

MPMC 模式

可能会注意到,广播队列模式要求类型必须是 Clone,而单个读取者的原地变体也要求类型必须是 Sync。这仅适用于广播队列,而不是正常的 mpmc 队列,因此还有一个 mpmc 接口。

Multiqueue2 对于任何 API 都不需要类型是 CloneSync,并且直接将项目从队列中移出而不是克隆它们。除了这一点之外,API 基本上没有区别,因此我不会在它们上占用大量篇幅。

未来模式

对于 mpmc 和广播,都支持未来模式。数据结构与正常的数据结构非常相似,只是它们实现了发送者和接收者的 Futures Sink/Stream 特性。这会带来一定的性能成本,这也是为什么未来类型被分开的原因。

基准测试

吞吐量

吞吐量是通过使用 condvar 阻塞锁进行基准测试的,这是队列系统的默认设置。这确保了即使对于长时间阻塞的异步项目,CPU 也能高效使用。

切换到忙等待锁可以提供另一个 30% 的吞吐量提升。

SPSC:在 1p::1c 上执行 10000000 push/pop 对的时间为每个项目 292.9397618 纳秒

SPMC:在 1p::1c_2b 上执行 10000000 push/pop 对的时间为每个项目 310.12774815 纳秒 在 1p::1c_3b 上执行 10000000 push/pop 对的时间为每个项目 317.77275306666667 纳秒

MPSC:在 2p::1c 上执行 10000000 push/pop 对的时间为每个项目 378.5664167 纳秒

MPMC:在 2p::1c_2b 上执行 10000000 push/pop 对的时间为每个项目 377.69721405 纳秒 在 2p::1c_3b 上执行 10000000 push/pop 对的时间为每个项目 414.59893453333336 纳秒

在 MacBook Pro 2018 i7,16GB Ram 上。

这里没有延迟基准测试工具,但延迟将大约是核心间通信延迟,大约在单插槽机器上的 40-70 纳秒。

随着多个生产者和多个消费者的增加,这些值会更高,因为每个生产者或消费者在完成写入或读取之前都必须执行 RMW 操作。

常见问题解答

我的类型不是 Clone,我能使用队列吗?

您可以使用队列的 MPMC 部分,但不能进行广播。

为什么发送者不能阻塞,而读取者可以?

当确实没有可做之事时,让读取者阻塞是有意义的,而发送者的情况则不然。

如果发送方被阻塞,这意味着系统已经满载,需要其他东西来处理积压的项目。

此外,它给队列带来了更大的性能损失,通知发送方的延迟在队列操作完成之前就已经发生,而通知读取器则在数据发送之后。

为什么未来发送者可以停车,尽管发送者不能被阻塞呢?

这是为了使未来API能够合理工作,因为当未来无法发送到队列时,它期望任务会被某个其他进程挂起和唤醒(如果这是错误的,请告知我!)。这样做是有道理的,因为在那段时间里,将处理其他事件而不是简单的阻塞。我可能还会添加一个仅对队列进行空转的未来API,供那些想要未来API的优雅性但不想承受性能损失的人使用。

当出现积压时,我想知道哪个流落后最多,我能做到吗?

目前,这是不可能的。一般来说,这样的问题很难具体回答,因为任何试图回答它的尝试都会与编写器更新竞争,而且也没有办法将“哪个流落后”的想法转化为程序可以执行的操作。

能否从一组多队列中选择?

不,目前还不能。为了获得良好的性能,队列中的所有项目都不是共享的。

如果一个流的消费者落后了会怎样?

队列不会在所有流都通过它之前覆盖数据点,因此对队列的写入会失败。根据你的目标,这可能是好事也可能是坏事。一方面,没有人喜欢因为某个愚蠢的慢线程而阻塞或饥饿更新。另一方面,这基本上强制实施了一种系统级积压控制。如果你想知道为什么需要这种控制,NYSE偶尔不会将合并的实时数据与个别实时数据和市场保持同步,市场就会陷入混乱。

依赖项

~3MB
~66K SLoC