#环形缓冲区 #循环缓冲区 #无锁 #环形 #循环 #异步 #无分配

无 std mutringbuf

一个简单的无锁 SPSC 环形缓冲区,具有就地可变性

5 个不稳定版本

0.3.1 2024 年 6 月 27 日
0.3.0 2024 年 5 月 13 日
0.2.0 2024 年 5 月 6 日
0.1.3 2024 年 3 月 29 日
0.0.1-alpha.1 2024 年 3 月 15 日

#139并发

Download history 154/week @ 2024-05-05 133/week @ 2024-05-12 21/week @ 2024-05-19 1/week @ 2024-06-09 121/week @ 2024-06-23 64/week @ 2024-06-30

736 每月下载量

MIT/Apache

99KB
2K SLoC

MutRingBuf

crates.io Documentation Rust + Miri

一个简单的无锁 SPSC FIFO 环形缓冲区,具有就地可变性。

我应该使用它吗?

如果您正在寻找用于生产环境的环形缓冲区,在返回此处之前,请查看这些之一

如果您发现这个项目有任何错误,请,打开一个 问题;我将很高兴查看!

性能

根据基准测试,ringbuf 在执行某些操作时应略快于此 crate。

另一方面,根据我使用 Instants 进行的测试,mutringbuf 似乎略快。

我坦白不知道为什么,所以我的建议是尝试两者并决定,同时考虑到,对于典型的生产者-消费者使用,ringbuf 一定比这个 crate 更稳定和成熟。

此 crate 的目的是什么?

我编写了这个 crate 来在音频流上执行实时计算,您可以在 这里 找到(简单的)有意义的示例。要运行它,请转到 这里

功能

  • default: alloc
  • alloc: 使用 alloc crate,启用堆分配缓冲区
  • async: 启用异步/await 支持

用法

关于未初始化项的说明

此缓冲区可以处理未初始化的项。它们是在使用 new_zeroed 方法创建缓冲区时产生的,或者是在通过 ConsIter::popAsyncConsIter::pop 将初始化项从缓冲区移除时产生的。

正如在 ProdIter 文档页面所述,有两种方法可以将项推入缓冲区:

  • 只有当我们推入项的位置已初始化时,才能使用常规方法。
  • 当该位置未初始化时,必须使用 *_init 方法。

这是因为常规方法会隐式丢弃旧值,如果它已初始化,这很好,但如果它未初始化,那就糟糕透了。更准确地说,丢弃未初始化的值会导致未定义行为(UB),主要是 SIGSEGV。

缓冲区和迭代器的初始化

首先,必须创建一个缓冲区。

本地缓冲区应该更快,因为使用了纯整数作为索引,但在并发环境中显然不能使用。

栈分配的缓冲区

use mutringbuf::{ConcurrentStackRB, LocalStackRB};
// buffers filled with default values
let concurrent_buf = ConcurrentStackRB::<usize, 10>::default();
let local_buf = LocalStackRB::<usize, 10>::default();
// buffers built from existing arrays
let concurrent_buf = ConcurrentStackRB::from([0; 10]);
let local_buf = LocalStackRB::from([0; 10]);
// buffers with uninitialised (zeroed) items
unsafe {
    let concurrent_buf = ConcurrentStackRB::< usize, 10 >::new_zeroed();
    let local_buf = LocalStackRB::< usize, 10 >::new_zeroed();
}

堆分配的缓冲区

use mutringbuf::{ConcurrentHeapRB, LocalHeapRB};
// buffers filled with default values
let concurrent_buf: ConcurrentHeapRB<usize> = ConcurrentHeapRB::default(10);
let local_buf: LocalHeapRB<usize> = LocalHeapRB::default(10);
// buffers built from existing vec
let concurrent_buf = ConcurrentHeapRB::from(vec![0; 10]);
let local_buf = LocalHeapRB::from(vec![0; 10]);
// buffers with uninitialised (zeroed) items
unsafe {
    let concurrent_buf: ConcurrentHeapRB < usize > = ConcurrentHeapRB::new_zeroed(10);
    let local_buf: LocalHeapRB <usize > = LocalHeapRB::new_zeroed(10);
}

请注意,缓冲区使用一个位置来同步迭代器。

因此,大小为 SIZE 的缓冲区可以保持最多 SIZE - 1 个值!

然后可以使用以下两种方式使用此类缓冲区:

同步不可变

使用环形缓冲区的常规方法:生产者插入最终由消费者取出的值。

use mutringbuf::LocalHeapRB;
let buf = LocalHeapRB::from(vec![0; 10]);
let (mut prod, mut cons) = buf.split();

同步可变

与不可变情况类似,但在 prodcons 之间有一个第三个迭代器 work

此迭代器会就地修改元素。

use mutringbuf::LocalHeapRB;
let buf = LocalHeapRB::from(vec![0; 10]);
let (mut prod, mut work, mut cons) = buf.split_mut();

异步不可变

使用环形缓冲区的常规方法:生产者插入最终由消费者取出的值。

use mutringbuf::LocalHeapRB;
let buf = LocalHeapRB::from(vec![0; 10]);
let (mut as_prod, mut as_cons) = buf.split_async();

异步可变

与不可变情况类似,但在 prodcons 之间有一个第三个迭代器 work

此迭代器会就地修改元素。

use mutringbuf::LocalHeapRB;
let buf = LocalHeapRB::from(vec![0; 10]);
let (mut as_prod, mut as_work, mut as_cons) = buf.split_mut_async();

工作迭代器也可以被包装在一个 DetachedWorkIter 或一个 AsyncDetachedWorkIter 中,间接暂停消费者,以便来回探索产生的数据。


然后可以将每个迭代器传递给一个线程以执行其工作。更多信息可以在相关页面找到

请注意,无论类型如何,缓冲区都一直存在,直到最后一个迭代器消失。

测试、基准和示例

Miri 测试可以在 script 中找到。

以下命令必须从 crate 的根目录开始运行。

可以使用以下命令运行测试:

cargo test

可以使用以下命令运行基准测试:

RUSTFLAGS="--cfg bench" cargo bench

可以使用以下命令运行 CPAL 示例:

RUSTFLAGS="--cfg cpal" cargo run --example cpal

可以使用以下命令运行异步示例:

cargo run --example simple_async --features async

可以使用以下命令运行其他 example_name 示例:

cargo run --example `example_name`

依赖项

~115KB