#ring-buffer #thread #spsc #tasks #transfer #data #asynchronous

fixed_ring_buffer

一个异步 SPSC 固定容量无锁环形缓冲区,可用于在两个线程或两个异步任务之间传输数据

1 个不稳定版本

0.1.0 2021年2月20日

#1039并发

Download history 24/week @ 2024-04-01

17,076 每月下载量

MIT 许可证

59KB
748 代码行

异步无锁环形缓冲区

FixedRingBuffer 是一个异步 SPSC 固定容量无锁环形缓冲区,可用于在两个线程或两个异步任务之间传输数据。其整体功能如下

  • 无锁
  • spsc
  • 使用固定容量缓冲区,并支持缓冲区内部内存回收
  • 为读取器实现 AsyncRead 和 AsyncBufRead 特性
  • 为写入器实现 AsyncWrite 特性

快速入门

    use std::sync::Arc;
    use bytes::BufMut;
    use async_std::task;
    use std::{thread};
    use futures::io::{AsyncWriteExt, AsyncReadExt};
    use futures_lite::future;

    use crate::async_ring_buffer::{RingBufferReader, RingBufferWriter, RingBuffer};


    let ring_buffer = Arc::new(RingBuffer::new(1024));
    
    let mut reader = RingBufferReader::new(ring_buffer.clone());
    let mut writer = RingBufferWriter::new(ring_buffer.clone());

    let t1 = thread::spawn(move ||{
      let handle = task::spawn(async move {
        let mut length = 0 as usize;
        let mut contents: Vec<u8> = Vec::with_capacity(16096);
        contents.resize(16096, 0);
        loop {
          match reader.read(&mut contents).await {
            Ok(size) => {
              if size > 0 {
                length += size;
              } else {
                break;
              }
            },
            Err(e) => {
              panic!("read err = {}", e);
            },
          }
        }

        println!("length = {}", length);
      });

      task::block_on(handle);
    });

    let t2 = thread::spawn(move ||{
      let handle = task::spawn(async move {
        let mut length = 0 as usize;
        let mut contents: Vec<u8> = Vec::new();
        contents.put("example-data-data".as_bytes());
        for i in 0..102400 as usize {
          match writer.write_all(&mut contents).await {
            Ok(()) => {
              length += contents.len();
            },
            Err(e) => {
              panic!("write err = {}", e);
            },
          }
        }
        println!("length = {}", length);
      });

      task::block_on(handle);
    });

    t1.join();
    t2.join();

依赖项

~7–19MB
~239K SLoC