25 个版本
0.5.3 | 2020年5月10日 |
---|---|
0.5.0 | 2020年4月26日 |
0.4.1 | 2019年7月31日 |
0.3.8 | 2019年3月18日 |
0.3.7 | 2018年12月31日 |
#352 in 异步
用于 bitcoin-zmq
130KB
1K SLoC
无锁限界非阻塞发布/订阅队列
这是一个发布/订阅模式队列,其中发布者永远不会因为慢速订阅者而阻塞。副作用是慢速订阅者可能会错过消息。预期的用例是高吞吐量流,其中接收最新消息比接收整个流更为优先。例如市场数据流、实时流等...
底层数据结构是一个 Arc(s) 的向量,消除了复制的使用。
特性
- 无锁读写 - 对发布者和订阅者都是无锁的。
- 限界 - 使用固定大小的内存,最大为 sizeof(MsgObject)*(queue_size + sub_cnt + 1)。这是一个边缘情况,其中每个订阅者都持有对象的引用,而在此期间发布者已经发布了整个队列的长度。
- 非阻塞 - 队列永远不会阻塞发布者,慢速订阅者会按比例错过数据。
- 发布/订阅 - 每个能够跟上发布者速度的订阅者将接收到发布者发布的所有数据。
- channel - 一个原始的 Pub/Sub 通道实现,没有线程同步和 futures 逻辑。
- bus - 一个具有 futures::sink::Sink 和 futures::stream::Stream 特性的异步 Pub/Sub 队列。
bus::Publisher 和 channel::Sender 用于向 bus::Subscriber 和 channel::Receiver 池广播数据。订阅者是可克隆的,这样许多线程或 futures 可以同时接收数据。唯一的限制是订阅者必须跟上发布者的频率。如果订阅者速度慢,它将丢失数据。
断开连接
通道上的广播和接收操作都将返回一个表示操作是否成功的 结果。操作失败通常表明通道的另一端在对应的线程中被“挂起”。
一旦通道的一半被释放,大多数操作都无法继续进行,因此会返回 错误。许多应用程序会继续从这个模块返回的结果中 展开,如果其中一个意外死亡,可能会导致线程间失败的传播。
示例
简单直接使用
extern crate bus_queue;
use bus_queue::raw_bounded;
fn main() {
let (tx, rx) = raw_bounded(10);
(1..15).for_each(|x| tx.broadcast(x).unwrap());
let received: Vec<i32> = rx.map(|x| *x).collect();
// Test that only the last 10 elements are in the received list.
let expected: Vec<i32> = (5..15).collect();
assert_eq!(expected, received);
}
简单异步使用
use bus_queue::bounded;
use futures::executor::block_on;
use futures::stream;
use futures::StreamExt;
fn main() {
let (publisher, subscriber1) = bounded(10);
let subscriber2 = subscriber1.clone();
block_on(async move {
stream::iter(1..15)
.map(|i| Ok(i))
.forward(publisher)
.await
.unwrap();
});
let received1: Vec<u32> = block_on(async { subscriber1.map(|x| *x).collect().await });
let received2: Vec<u32> = block_on(async { subscriber2.map(|x| *x).collect().await });
// Test that only the last 10 elements are in the received list.
let expected = (5..15).collect::<Vec<u32>>();
assert_eq!(received1, expected);
assert_eq!(received2, expected);
}