2个版本
0.1.1 | 2022年5月30日 |
---|---|
0.1.0 | 2022年5月30日 |
#837 在 并发
83KB
2K SLoC
ringbuf-basedrop
这是从ringbuf
crate派生而来,使用basedrop
's Shared
指针代替Arc
。这确保了当所有对环形缓冲区的引用都被丢弃时,底层的Vec
永远不会在实时线程中被意外地解除分配(非实时安全操作)。相反,所有分配都在拥有基于drop Collector
对象的所有线程中进行清理。
这对于音频应用程序特别有用。
具有直接访问内部数据的无锁单生产者单消费者(SPSC)FIFO环形缓冲区。
概述
RingBuffer
是代表环形缓冲区的初始结构。环形缓冲区可以拆分为一对Producer
和Consumer
。
Producer
和Consumer
用于相应地追加/从环形缓冲区中移除元素。它们可以在线程之间安全地传输。使用Producer
和Consumer
的操作是无锁的 - 它们立即成功或失败,而没有阻塞或等待。
元素可以有效地逐个追加/移除,也可以一次添加多个。此外,可以直接将数据加载/存储到Read
/Write
实例中。最后,还有一些unsafe
方法允许在追加/移除内部内存时线程安全地直接访问。
当使用nightly工具链构建时,可以通过cargo bench --features benchmark
运行基准测试。
此外,该crate可以与no_std
一起使用(但仍然需要alloc
)。
示例
简单示例
use basedrop::Collector;
use ringbuf::RingBuffer;
let collector = Collector::new();
let rb = RingBuffer::<i32>::new(2);
let (mut prod, mut cons) = rb.split(&collector.handle());
prod.push(0).unwrap();
prod.push(1).unwrap();
assert_eq!(prod.push(2), Err(2));
assert_eq!(cons.pop().unwrap(), 0);
prod.push(2).unwrap();
assert_eq!(cons.pop().unwrap(), 1);
assert_eq!(cons.pop().unwrap(), 2);
assert_eq!(cons.pop(), None);
消息传输
这是一个更复杂的示例,用于在线程之间传输文本消息。
use std::io::Read;
use std::thread;
use std::time::Duration;
use basedrop::Collector;
use ringbuf::RingBuffer;
let collector = Collector::new();
let buf = RingBuffer::<u8>::new(10);
let (mut prod, mut cons) = buf.split(&collector.handle());
let smsg = "The quick brown fox jumps over the lazy dog";
let pjh = thread::spawn(move || {
println!("-> sending message: '{}'", smsg);
let zero = [0];
let mut bytes = smsg.as_bytes().chain(&zero[..]);
loop {
if prod.is_full() {
println!("-> buffer is full, waiting");
thread::sleep(Duration::from_millis(1));
} else {
let n = prod.read_from(&mut bytes, None).unwrap();
if n == 0 {
break;
}
println!("-> {} bytes sent", n);
}
}
println!("-> message sent");
});
let cjh = thread::spawn(move || {
println!("<- receiving message");
let mut bytes = Vec::<u8>::new();
loop {
if cons.is_empty() {
if bytes.ends_with(&[0]) {
break;
} else {
println!("<- buffer is empty, waiting");
thread::sleep(Duration::from_millis(1));
}
} else {
let n = cons.write_into(&mut bytes, None).unwrap();
println!("<- {} bytes received", n);
}
}
assert_eq!(bytes.pop().unwrap(), 0);
let msg = String::from_utf8(bytes).unwrap();
println!("<- message received: '{}'", msg);
msg
});
pjh.join().unwrap();
let rmsg = cjh.join().unwrap();
assert_eq!(smsg, rmsg);
许可证
许可协议为以下之一
- Apache License,版本2.0 (LICENSE-APACHE 或 https://apache.ac.cn/licenses/LICENSE-2.0)
- 麻省理工学院许可证(LICENSE-MIT 或 http://opensource.org/licenses/MIT)
任选其一。
贡献
除非您明确声明,否则根据 Apache-2.0 许可证定义,您有意提交以包含在本工作中的任何贡献,将按照上述方式双重许可,不附加任何额外条款或条件。
依赖项
~38KB