#channel #message #mpsc #mpmc #ring #bounded-channel

nightly talaria

高性能循环消息传递库

2 个版本

0.1.1 2024年1月15日
0.1.0 2024年1月15日

#680 in 并发

MIT 许可证

66KB
1K SLoC

talaria

talaria 是一个具有有限 FIFO 语义的高性能循环消息传递库。

[!警告] 虽然 talaria 已经经过一些正确性测试的验证,但它仍然被视为不稳定且未经证明。

talaria 分为三个主要概念

  • 通道
  • 分区
  • 预留

通道

talaria 的通道与标准库中找到的有限通道类似。尽管有一些差异。

首先,通道需要提前提供一个固定集合的对象来管理。其次,通道至少由两个分区组成。

分区

分区可以被视为通道管理的数据的“状态”。为什么不考虑分区作为数据的“所有者”呢?好吧,分区可以配置为exclusiveconcurrent,这描述了是否可以同时持有分区的多个实例。所以,当所有分区都处于“独家”模式时,可以考虑它们作为不同的“所有者”,但并发访问分区会改变这一点。数据由分区管理,但持有分区的引用(除非它是独家分区)并不保证数据仅由此上下文所有。

分区用于预留分区当前拥有的一个或多个对象。一旦这些对象不再使用(例如超出作用域),它们就会被“转移”到下一个逻辑分区。因此,对象从分区 0 到分区 1,从分区 1 到分区 2,依此类推。一旦最终分区中的对象被取消预留,它就会返回到分区 0

将分区视为循环中的“状态”可能更有帮助,而不是“发送者”和“接收者”对。事实上,这是库的原始动机——使用分区来表示对象在其生命周期中的状态,允许简单的状态转换,并在需要时允许多个所有者访问。当然,这适用于消息传递,假设分区内部数据可变。

预留

预订表示对某些数据的独家所有权。预订是从分区创建的。一旦可用,就会创建一个预订,该预订持有对请求的对象的独家所有权。这意味着一旦您持有预订,就可以安全地读取和写入这些对象。

一旦预订超出范围或被明确删除,对象的“所有权”将“转移”到下一个逻辑分区。值得注意的是,对于并发分区,可以在一个分区上同时请求多个预订。持有预订仍然保证对预订管理的对象具有独家所有权,但删除预订可能会暂时阻止执行,直到之前的预订也已传递。也就是说,预订遵循先进先出(FIFO)顺序 - 首先创建的预订必须首先删除,然后后续预订将等待直到这是可能的。

如何使用

使用talaria有三个步骤

  1. 创建一个通道
  2. 获取一个分区
  3. 获取一个预订

以下是如何使用talaria在两个线程之间无限传递“Ping/Pong”消息的示例。

use talaria::channel::Channel;

#[derive(Clone)]
enum Message {
    Ping, 
    Pong,
}

fn main() {
    // id of the partition we'll access on the main thread
    const MAIN_PARTITION: usize = 0;
    // id of the partition we'll access on worker thread
    const THREAD_PARTITION: usize = 1;
    
    let channel = Channel::builder()
        .add_exclusive_partition()
        .add_exclusive_partition()
        .build(vec![Message::Ping; 16])
        .unwrap();

    let channel_clone = channel.clone();
    
    // spin up a worker thread
    // it will do what the main thread is doing, but in reverse..
    let thread_handle = std::thread::spawn(move || {
        let mut partition = channel_clone
            .get_exclusive_partition(THREAD_PARTITION)
            .unwrap();
        
        while let Ok(mut reservation) = partition.reserve(1) {
            reservation[0] = match &reservation[0] {
                Message::Pong => Message::Ping,
                Message::Ping => panic!("unexpected message!")
            };
        }
    });

    let mut partition = channel
        .get_exclusive_partition(MAIN_PARTITION)
        .unwrap();

    // reserve an item at a time
    while let Ok(mut reservation) = partition.reserve(1) {
        reservation[0] = match &reservation[0] {
            // if the first (and only) element is a "ping" message,
            // set it to "pong" and forward it
            Message::Ping => Message::Pong,
            //  otherwise we got an unexpected message!
            Message::Pong => panic!("unexpected message!")
        };
    }
    
    thread_handle.join().unwrap();
}

基准测试

talaria使用 Criterion 进行基准测试,只需运行cargo bench即可开始基准测试。

基准测试是为独家和并发双线程分区场景编写的,以及与 std::sync::mpsccrossbeam 的有界通道等效的测试。

以下是在我的机器上(i9-9900k,64Gb 3200mhz RAM)进行的基准测试示例

独家

Exclusive Partition Benchmark

并发

Concurrent Partition Benchmark

std::sync::mpsc

std::sync::mpsc Benchmark

crossbeam

crossbeam Benchmark

正确性测试

talaria附带了一个(相对不完整)的正确性测试套件,使用loomshuttle。要运行它们,请执行以下操作

Loom

RUSTFLAGS="--cfg loom" cargotest

Shuttle

RUSTFLAGS="--cfg shuttle" cargotest

有些测试是时间限制的,以防止它们无限期运行或运行时间过长。虽然预期测试将花费几分钟,但请期待。

依赖项

~0.4–28MB
~356K SLoC