#单消费者 #环形缓冲区 #MPMC #消息 #多消费者

disruptor

通过环形缓冲区实现的低延迟线程间通信(受LMAX Disruptor启发)

24 个版本 (12 个稳定版本)

3.2.0 2024年8月10日
3.1.0 2024年7月21日
2.2.0 2024年6月19日
1.2.0 2024年5月22日
0.0.1 2018年10月10日

#53并发 中排名

Download history 125/week @ 2024-04-30 102/week @ 2024-05-21 282/week @ 2024-05-28 27/week @ 2024-06-04 12/week @ 2024-06-11 190/week @ 2024-06-18 2/week @ 2024-06-25 25/week @ 2024-07-02 174/week @ 2024-07-09 159/week @ 2024-07-16 105/week @ 2024-07-23 56/week @ 2024-07-30 111/week @ 2024-08-06 37/week @ 2024-08-13

每月 327 次下载

MIT 许可证

96KB
2K SLoC

Crates.io Crates.io Build codecov

Disruptor

这个库是一个用Rust编写的低延迟线程间通信库。

它深受LMAX的杰出Disruptor库的启发。

内容

入门指南

将以下内容添加到您的 Cargo.toml 文件中

disruptor = "3.2.0"

要了解如何使用该库的详细信息,请查看docs.rs/disruptor上的文档。

以下是一个演示单次和批量发布的最小示例。注意,当可能时,应始终使用批量发布以实现最佳延迟和吞吐量(请参阅下面的基准测试)。

use disruptor::*;

// The event on the ring buffer.
struct Event {
    price: f64
}

fn main() {
    // Factory closure for initializing events in the Ring Buffer.
    let factory = || { Event { price: 0.0 }};

    // Closure for processing events.
    let processor = |e: &Event, sequence: Sequence, end_of_batch: bool| {
        // Your processing logic here.
    };

    let size = 64;
    let mut producer = disruptor::build_single_producer(size, factory, BusySpin)
        .handle_events_with(processor)
        .build();

    // Publish single events into the Disruptor via the `Producer` handle.
    for i in 0..10 {
        producer.publish(|e| {
            e.price = i as f64;
        });
    }

    // Publish a batch of events into the Disruptor.
    producer.batch_publish(5, |iter| {
        for e in iter { // `iter` is guaranteed to yield 5 events.
            e.price = 42.0;
        }
    });
}// At this point, the Producer instance goes out of scope and when the
 // processor is done handling all events then the Disruptor is dropped
 // as well.

该库还支持将线程固定在核心上,以避免由上下文切换引起的延迟。一个更高级的示例,展示了这一点,以及多个生产者和多个相互依赖的消费者,可能如下所示

use disruptor::*;
use std::thread;

struct Event {
    price: f64
}

fn main() {
    let factory = || { Event { price: 0.0 }};

    // Closure for processing events.
    let h1 = |e: &Event, sequence: Sequence, end_of_batch: bool| {
        // Processing logic here.
    };
    let h2 = |e: &Event, sequence: Sequence, end_of_batch: bool| {
        // Some processing logic here.
    };
    let h3 = |e: &Event, sequence: Sequence, end_of_batch: bool| {
        // More processing logic here.
    };

    let mut producer1 = disruptor::build_multi_producer(64, factory, BusySpin)
        // `h2` handles events concurrently with `h1`.
        .pin_at_core(1).handle_events_with(h1)
        .pin_at_core(2).handle_events_with(h2)
            .and_then()
            // `h3` handles events after `h1` and `h2`.
            .pin_at_core(3).handle_events_with(h3)
        .build();

    // Create another producer.
    let mut producer2 = producer1.clone();

    // Publish into the Disruptor.
    thread::scope(|s| {
        s.spawn(move || {
            for i in 0..10 {
                producer1.publish(|e| {
                    e.price = i as f64;
                });
            }
        });
        s.spawn(move || {
            for i in 10..20 {
                producer2.publish(|e| {
                    e.price = i as f64;
                });
            }
        });
    });
}// At this point, the Producers instances go out of scope and when the
 // processors are done handling all events then the Disruptor is dropped
 // as well.

如果您需要在处理器线程中存储某些状态,这些状态既不是 Send 也不是 Sync,例如一个 Rc<RefCell<i32>>,那么您可以为初始化该状态创建一个闭包,并在构建Disruptor时将其与处理闭包一起传递。然后,Disruptor将在每个事件上传递对您的状态的可变引用。以下是一个示例

use std::{cell::RefCell, rc::Rc};
use disruptor::*;

struct Event {
    price: f64
}

#[derive(Default)]
struct State {
    data: Rc<RefCell<i32>>
}

fn main() {
    let factory = || { Event { price: 0.0 }};
    let initial_state = || { State::default() };

    // Closure for processing events *with* state.
    let processor = |s: &mut State, e: &Event, _: Sequence, _: bool| {
        // Mutate your custom state:
        *s.data.borrow_mut() += 1;
    };

    let size = 64;
    let mut producer = disruptor::build_single_producer(size, factory, BusySpin)
        .handle_events_and_state_with(processor, initial_state)
        .build();

    for i in 0..10 {
        producer.publish(|e| {
            e.price = i as f64;
        });
    }
}

功能

  • 单生产者单消费者(SPSC)。
  • 具有消费者依赖关系的单生产者多消费者(SPMC)。
  • 多生产者单消费者(MPSC)。
  • 具有消费者依赖关系的多生产者多消费者(MPMC)。
  • 忙等待策略。
  • 事件批量发布。
  • 事件批量消费。
  • 可以为事件处理器线程设置线程亲和性。
  • 设置每个事件处理器线程的线程名称。

设计选择

图书馆中的一切都关于低延迟,这极大地影响了在这个库中做出的所有选择。例如,您不能分配一个事件并将其移动到环形缓冲区中。相反,事件在启动时分配,以确保它们在内存中相邻,以提高缓存一致性。但是,您仍然可以在堆上分配一个结构体,并将所有权移动到环形缓冲区中的事件字段。只要您意识到这可能会增加延迟,因为结构体是由一个线程分配的,并由另一个线程丢弃。因此,在分配器中存在同步操作。

没有使用动态分派 - 一切都是单态的。

正确性

该库需要使用不安全功能来实现低延迟。虽然不能保证没有错误,但已使用这些方法来消除错误。

  • 不使用不安全块的最小化。
  • 高测试覆盖率。
  • 所有测试都在CI/CD中的Miri上运行。
  • 使用TLA+进行验证(请参阅verification/文件夹)。

性能

SPSC和MPSC Disruptor变体已进行基准测试并与Crossbeam进行比较。请参阅benches/spsc.rsbenches/mpsc.rs文件中的代码。

以下SPSC基准测试的结果是从在2016 Macbook Pro上运行的2.6 GHz四核Intel Core i7上运行基准测试收集的。因此,在现代Intel Xeon上,数字应该更好。此外,在Mac上无法隔离核心并固定线程,这将产生更稳定的结果。这是未来的工作。

如果您有任何改进基准测试的建议,请随时提交问题。

为了提供一个相对现实的基准测试,不仅要考虑不同大小的突发,还要考虑突发之间的可变暂停:0 ms、1 ms和10 ms。

以下延迟是每元素的平均延迟,置信区间为95%(标准criterion设置)。捕获所有延迟并计算各种百分位数(特别是最大延迟)是未来的工作。然而,我预计以下测量结果将代表您在真实应用程序中可以实现的实际性能。

突发之间无暂停

延迟

突发大小 Crossbeam Disruptor 改进
1 65 ns 32 ns 51%
10 68 ns 9 ns 87%
100 29 ns 8 ns 72%

吞吐量

突发大小 Crossbeam Disruptor 改进
1 15.2M / s 31.7M / s 109%
10 14.5M / s 117.3M / s 709%
100 34.3M / s 119.7M / s 249%

突发之间暂停1 ms

延迟

突发大小 Crossbeam Disruptor 改进
1 63 ns 33 ns 48%
10 67 ns 8 ns 88%
100 30 ns 9 ns 70%

吞吐量

突发大小 Crossbeam Disruptor 改进
1 15.9M / s 30.7M / s 93%
10 14.9M / s 117.7M / s 690%
100 33.8M / s 105.0M / s 211%

突发之间暂停10 ms

延迟

突发大小 Crossbeam Disruptor 改进
1 51 ns 32 ns 37%
10 67 ns 9 ns 87%
100 30 ns 10 ns 67%

吞吐量

突发大小 Crossbeam Disruptor 改进
1 19.5M / s 31.6M / s 62%
10 14.9M / s 114.5M / s 668%
100 33.6M / s 105.0M / s 213%

结论

Disruptor库和Crossbeam库之间显然存在差异。然而,这并不是因为Crossbeam库不是一件优秀的软件。它是。Disruptor通过交换CPU和内存资源来实现低延迟和高吞吐量,这就是它能够实现这些结果的原因。当像基准测试中那样以10和100个事件的大小发布事件批处理时,Disruptor也表现出色。

随着突发大小的增加,这两个库都得到了很大的改进,但Disruptor的性能更能抵抗突发之间的暂停,这是设计目标之一。

相关工作

有多个其他Rust项目模仿LMAX Disruptor库

  1. Turbine
  2. Disrustor

这个库支持的一个关键特性是从不同的线程创建多个生产者,这在上述库中(在撰写本文时)都没有支持。

贡献

欢迎您创建一个Pull-Request或者提出改进建议的问题。

更改仅由我自行决定,我会关注这些更改是否适合这个crate的目的和设计。

路线图

空!所有项目都已实现。

依赖项

~0.4–1.1MB
~22K SLoC