19 个版本
使用 Rust 2015
0.1.19 | 2016年10月20日 |
---|---|
0.1.18 | 2016年9月11日 |
0.1.17 | 2016年8月31日 |
0.1.6 | 2016年7月29日 |
0.1.5 | 2016年5月21日 |
#827 在 并发 中
69 每月下载
在 3 个 Crates(2 个直接) 中使用
28KB
679 行
lossyq for Rust
lossyq
是一个具有特定特性的单生产者、单订阅者队列
- 在创建队列时,您需要决定队列需要有多大
- 队列将在创建时分配指定数量的元素(见下文说明)
- 在添加元素时,put 闭包将接收到要更新的元素的引用,因此无需为新元素分配内存
- 在添加元素时,更新者永远不会被阻塞。它不关心是否有读取器来读取该元素
- 读取器维护最后一个读取的位置,并将读取元素直到最后一个写入位置
- 如果读取器比写入器慢,那么它可能看不到在此期间写入的元素
示例
extern crate lossyq;
fn main() {
use std::thread;
// create a very small channel of 2 elements
let (mut tx, mut rx) = lossyq::spsc::channel(2, 0 as i32);
let t = thread::spawn(move|| {
for i in 1..4 {
// the tx.put() receives a lambda function that in turn
// gets a writable reference to the next element in the queue
tx.put(|v| *v = i);
}
});
t.join().unwrap();
// the receiver receives an iterator which may be
// further passed to other adapters
let sum = rx.iter().fold(0, |acc, num| acc + num);
// this should print 5 as the writer sent three items to the
// queue: [1,2,3] and the first item got overwritten by the
// last one
println!("sum={}",sum);
}
添加元素
如上例所示,put
函数接收一个闭包,该闭包又接收队列中元素的可变引用。这样我们就不需要在插入时分配内存。
读取元素
读取时,iter
函数接收一个迭代器,它具有对当前所有可读元素的引用。如果写入器向队列写入更多元素,迭代器仍然有效,只是它不会看到新写入的元素。要查看它们,需要通过新的 iter
调用创建一个新的迭代器。
最多一次投递
循环缓冲区维护读取器的位置。当我们获取迭代器时,它会增加,并且下一次 iter
调用将返回一组不同的项目(或空)。
fn at_most_once() {
let (mut tx, mut rx) = spsc::channel(20, 0 as i32);
tx.put(|v| *v = 1);
tx.put(|v| *v = 2);
tx.put(|v| *v = 3);
{
// first iterator processes the single element
// assumes I process everything else in the iterator
let mut it = rx.iter();
assert_eq!(Some(1), it.next());
}
{
// the second iterator gets nothing, since the first
// iterator received the whole range no matter if it
// has really called a next on them ot not
let mut it = rx.iter();
assert_eq!(None, it.next());
}
}
理由
让我强调一点,读者可能会丢失更新。我认为这不是问题,而是一种可以接受的属性。其他队列实现选择在队列满时扩大队列大小,或者阻塞写者直到读者从队列中处理一些数据。我认为所有这些选择都是有效的,并且它们都有后果。当我们为队列分配更多内存时,我们可能会显然耗尽它,然后我们开始交换,整个系统都受到了诅咒。另一种选择是当阻塞写者时,写者性能受限于读者。
我认为在某些情况下,我们更关注最新数据而不是处理内存不足或阻塞条件。一个例子是心跳。只要我们知道给定组件在几秒钟前是活着的,我们可能就不会关心丢失旧的心跳。
这个队列不分配内存的事实,使得其性能可预测并且可能很快。(我故意没有进行任何测量。)
当你想最大限度地减少丢失项目的机会时,你需要选择一个更大的队列大小。正确的大小取决于你的应用程序。
实现说明
这个队列的核心是CircularBuffer
数据结构。它使用原子整数操作来确保写者和读者可以并发操作。
struct CircularBuffer<T : Copy> {
seqno : AtomicUsize, // the ID of the last written item
data : Vec<T>, // (2*n)+1 preallocated elements
size : usize, // n
buffer : Vec<AtomicUsize>, // (positions+seqno)[]
read_priv : Vec<usize>, // positions belong to the reader
write_tmp : usize, // temporary position where the writer writes first
max_read : usize, // reader's last read seqno
}
当写入时
data
向量包含了2n+1
预先分配的项目。n
个项目属于读者,而n+1
个项目属于写者。谁拥有哪些元素的所有权由buffer
、read_priv
和write_tmp
成员跟踪。buffer
向量代表CircularBuffer
,其中每个元素由16位seqno
和其余指向data
向量的位置组成。write_tmp
元素也是一个指向data
向量的位置。当写者写入一个新元素时
- 它写入由
write_tmp
指向的data
元素 - 然后它计算
seqno
模size
,这是将要更新的buffer
中的位置(new_pos) - 然后
buffer[new_pos]
将被更新以包含write_tmp < 16) + (seqno % 0xffff)
- 最后,
write_tmp
将被更新为buffer[old_pos] > 16
的前一个值 - (基本上
write_tmp
和buffer
的位置将被交换)
这种设计允许写者始终写入一个读者不会触及的私有区域,然后它将buffer[new_pos]
元素原子上地交换到新写入的元素上。这允许在不妨碍读者的前提下进行写入。
当读取时
要读取数据,需要通过iter()
函数来获取迭代器。该函数将按逆序遍历buffer
,并通过原子操作交换读者的位置(由read_priv
向量持有)与buffer
组件的位置部分。在遍历过程中,它检查buffer
条目中的序列号部分是否为预期的值。如果不是,则知道写者已翻页,因此应在下一次迭代中返回给定元素,并停止。
此操作的结果是,read_priv
向量持有先前写入元素的指针,而读者将其自己的元素提供给写者以交换,这样写者就可以写入这些元素,而读者则使用自己的副本。
许可
根据您的要求,受MIT或Apache-2许可协议的约束。