#环形缓冲区 #无锁 #FIFO #SPSC #生产者-消费者 #共享指针 #共享数据

无std 基于环形缓冲区的基于drop

ringbufcrate派生而来,用basedrop的共享指针代替Arc

2个版本

0.1.1 2022年5月30日
0.1.0 2022年5月30日

#837并发

MIT/Apache

83KB
2K SLoC

ringbuf-basedrop

Crates.io Docs.rs Github Actions License

这是从ringbufcrate派生而来,使用basedrop's Shared指针代替Arc。这确保了当所有对环形缓冲区的引用都被丢弃时,底层的Vec永远不会在实时线程中被意外地解除分配(非实时安全操作)。相反,所有分配都在拥有基于drop Collector对象的所有线程中进行清理。

这对于音频应用程序特别有用。


具有直接访问内部数据的无锁单生产者单消费者(SPSC)FIFO环形缓冲区。

概述

RingBuffer是代表环形缓冲区的初始结构。环形缓冲区可以拆分为一对ProducerConsumer

ProducerConsumer用于相应地追加/从环形缓冲区中移除元素。它们可以在线程之间安全地传输。使用ProducerConsumer的操作是无锁的 - 它们立即成功或失败,而没有阻塞或等待。

元素可以有效地逐个追加/移除,也可以一次添加多个。此外,可以直接将数据加载/存储到Read/Write实例中。最后,还有一些unsafe方法允许在追加/移除内部内存时线程安全地直接访问。

当使用nightly工具链构建时,可以通过cargo bench --features benchmark运行基准测试。

此外,该crate可以与no_std一起使用(但仍然需要alloc)。

示例

简单示例

use basedrop::Collector;
use ringbuf::RingBuffer;

let collector = Collector::new();

let rb = RingBuffer::<i32>::new(2);
let (mut prod, mut cons) = rb.split(&collector.handle());

prod.push(0).unwrap();
prod.push(1).unwrap();
assert_eq!(prod.push(2), Err(2));

assert_eq!(cons.pop().unwrap(), 0);

prod.push(2).unwrap();

assert_eq!(cons.pop().unwrap(), 1);
assert_eq!(cons.pop().unwrap(), 2);
assert_eq!(cons.pop(), None);

消息传输

这是一个更复杂的示例,用于在线程之间传输文本消息。

use std::io::Read;
use std::thread;
use std::time::Duration;

use basedrop::Collector;
use ringbuf::RingBuffer;

let collector = Collector::new();

let buf = RingBuffer::<u8>::new(10);
let (mut prod, mut cons) = buf.split(&collector.handle());

let smsg = "The quick brown fox jumps over the lazy dog";

let pjh = thread::spawn(move || {
    println!("-> sending message: '{}'", smsg);

    let zero = [0];
    let mut bytes = smsg.as_bytes().chain(&zero[..]);
    loop {
        if prod.is_full() {
            println!("-> buffer is full, waiting");
            thread::sleep(Duration::from_millis(1));
        } else {
            let n = prod.read_from(&mut bytes, None).unwrap();
            if n == 0 {
                break;
            }
            println!("-> {} bytes sent", n);
        }
    }

    println!("-> message sent");
});

let cjh = thread::spawn(move || {
    println!("<- receiving message");

    let mut bytes = Vec::<u8>::new();
    loop {
        if cons.is_empty() {
            if bytes.ends_with(&[0]) {
                break;
            } else {
                println!("<- buffer is empty, waiting");
                thread::sleep(Duration::from_millis(1));
            }
        } else {
            let n = cons.write_into(&mut bytes, None).unwrap();
            println!("<- {} bytes received", n);
        }
    }

    assert_eq!(bytes.pop().unwrap(), 0);
    let msg = String::from_utf8(bytes).unwrap();
    println!("<- message received: '{}'", msg);

    msg
});

pjh.join().unwrap();
let rmsg = cjh.join().unwrap();

assert_eq!(smsg, rmsg);

许可证

许可协议为以下之一

任选其一。

贡献

除非您明确声明,否则根据 Apache-2.0 许可证定义,您有意提交以包含在本工作中的任何贡献,将按照上述方式双重许可,不附加任何额外条款或条件。

依赖项

~38KB