#queue #spsc #single-consumer #element #reader #writer #position

lossyq

这是一个固定长度的并发单生产者、单消费者队列,永远不会阻塞写入者

19 个版本

使用 Rust 2015

0.1.19 2016年10月20日
0.1.18 2016年9月11日
0.1.17 2016年8月31日
0.1.6 2016年7月29日
0.1.5 2016年5月21日

#827并发

Download history 10/week @ 2024-02-19 22/week @ 2024-02-26 14/week @ 2024-03-04 16/week @ 2024-03-11 15/week @ 2024-03-18

69 每月下载
3 个 Crates(2 个直接) 中使用

MIT/Apache

28KB
679

lossyq for Rust

lossyq 是一个具有特定特性的单生产者、单订阅者队列

  • 在创建队列时,您需要决定队列需要有多大
  • 队列将在创建时分配指定数量的元素(见下文说明)
  • 在添加元素时,put 闭包将接收到要更新的元素的引用,因此无需为新元素分配内存
  • 在添加元素时,更新者永远不会被阻塞。它不关心是否有读取器来读取该元素
  • 读取器维护最后一个读取的位置,并将读取元素直到最后一个写入位置
  • 如果读取器比写入器慢,那么它可能看不到在此期间写入的元素

示例

extern crate lossyq;

fn main() {
  use std::thread;

  // create a very small channel of 2 elements
  let (mut tx, mut rx) = lossyq::spsc::channel(2, 0 as i32);
  let t = thread::spawn(move|| {
    for i in 1..4 {
      // the tx.put() receives a lambda function that in turn
      // gets a writable reference to the next element in the queue
      tx.put(|v| *v = i);
    }
  });
  t.join().unwrap();
  // the receiver receives an iterator which may be
  // further passed to other adapters
  let sum = rx.iter().fold(0, |acc, num| acc + num);

  // this should print 5 as the writer sent three items to the
  // queue: [1,2,3] and the first item got overwritten by the
  // last one
  println!("sum={}",sum);
}

添加元素

如上例所示,put 函数接收一个闭包,该闭包又接收队列中元素的可变引用。这样我们就不需要在插入时分配内存。

读取元素

读取时,iter 函数接收一个迭代器,它具有对当前所有可读元素的引用。如果写入器向队列写入更多元素,迭代器仍然有效,只是它不会看到新写入的元素。要查看它们,需要通过新的 iter 调用创建一个新的迭代器。

最多一次投递

循环缓冲区维护读取器的位置。当我们获取迭代器时,它会增加,并且下一次 iter 调用将返回一组不同的项目(或空)。

fn at_most_once() {
  let (mut tx, mut rx) = spsc::channel(20, 0 as i32);
  tx.put(|v| *v = 1);
  tx.put(|v| *v = 2);
  tx.put(|v| *v = 3);
  {
    // first iterator processes the single element
    // assumes I process everything else in the iterator
    let mut it = rx.iter();
    assert_eq!(Some(1), it.next());
  }
  {
    // the second iterator gets nothing, since the first
    // iterator received the whole range no matter if it
    // has really called a next on them ot not
    let mut it = rx.iter();
    assert_eq!(None, it.next());
  }
}

理由

让我强调一点,读者可能会丢失更新。我认为这不是问题,而是一种可以接受的属性。其他队列实现选择在队列满时扩大队列大小,或者阻塞写者直到读者从队列中处理一些数据。我认为所有这些选择都是有效的,并且它们都有后果。当我们为队列分配更多内存时,我们可能会显然耗尽它,然后我们开始交换,整个系统都受到了诅咒。另一种选择是当阻塞写者时,写者性能受限于读者。

我认为在某些情况下,我们更关注最新数据而不是处理内存不足或阻塞条件。一个例子是心跳。只要我们知道给定组件在几秒钟前是活着的,我们可能就不会关心丢失旧的心跳。

这个队列不分配内存的事实,使得其性能可预测并且可能很快。(我故意没有进行任何测量。)

当你想最大限度地减少丢失项目的机会时,你需要选择一个更大的队列大小。正确的大小取决于你的应用程序。

实现说明

这个队列的核心是CircularBuffer数据结构。它使用原子整数操作来确保写者和读者可以并发操作。

struct CircularBuffer<T : Copy> {
  seqno       : AtomicUsize,        // the ID of the last written item
  data        : Vec<T>,             // (2*n)+1 preallocated elements
  size        : usize,              // n

  buffer      : Vec<AtomicUsize>,   // (positions+seqno)[]
  read_priv   : Vec<usize>,         // positions belong to the reader
  write_tmp   : usize,              // temporary position where the writer writes first
  max_read    : usize,              // reader's last read seqno
}

当写入时

data向量包含了2n+1预先分配的项目。n个项目属于读者,而n+1个项目属于写者。谁拥有哪些元素的所有权由bufferread_privwrite_tmp成员跟踪。buffer向量代表CircularBuffer,其中每个元素由16位seqno和其余指向data向量的位置组成。write_tmp元素也是一个指向data向量的位置。当写者写入一个新元素时

  • 它写入由write_tmp指向的data元素
  • 然后它计算seqnosize,这是将要更新的buffer中的位置(new_pos)
  • 然后buffer[new_pos]将被更新以包含write_tmp < 16) + (seqno % 0xffff)
  • 最后,write_tmp将被更新为buffer[old_pos] > 16的前一个值
  • (基本上write_tmpbuffer的位置将被交换)

这种设计允许写者始终写入一个读者不会触及的私有区域,然后它将buffer[new_pos]元素原子上地交换到新写入的元素上。这允许在不妨碍读者的前提下进行写入。

当读取时

要读取数据,需要通过iter()函数来获取迭代器。该函数将按逆序遍历buffer,并通过原子操作交换读者的位置(由read_priv向量持有)与buffer组件的位置部分。在遍历过程中,它检查buffer条目中的序列号部分是否为预期的值。如果不是,则知道写者已翻页,因此应在下一次迭代中返回给定元素,并停止。

此操作的结果是,read_priv向量持有先前写入元素的指针,而读者将其自己的元素提供给写者以交换,这样写者就可以写入这些元素,而读者则使用自己的副本。

许可

根据您的要求,受MIT或Apache-2许可协议的约束。

无运行时依赖