25 个版本

0.5.3 2020年5月10日
0.5.0 2020年4月26日
0.4.1 2019年7月31日
0.3.8 2019年3月18日
0.3.7 2018年12月31日

#352 in 异步


用于 bitcoin-zmq

Apache-2.0/MIT

130KB
1K SLoC

无锁限界非阻塞发布/订阅队列

这是一个发布/订阅模式队列,其中发布者永远不会因为慢速订阅者而阻塞。副作用是慢速订阅者可能会错过消息。预期的用例是高吞吐量流,其中接收最新消息比接收整个流更为优先。例如市场数据流、实时流等...

底层数据结构是一个 Arc(s) 的向量,消除了复制的使用。

特性

  • 无锁读写 - 对发布者和订阅者都是无锁的。
  • 限界 - 使用固定大小的内存,最大为 sizeof(MsgObject)*(queue_size + sub_cnt + 1)。这是一个边缘情况,其中每个订阅者都持有对象的引用,而在此期间发布者已经发布了整个队列的长度。
  • 非阻塞 - 队列永远不会阻塞发布者,慢速订阅者会按比例错过数据。
  • 发布/订阅 - 每个能够跟上发布者速度的订阅者将接收到发布者发布的所有数据。
  • channel - 一个原始的 Pub/Sub 通道实现,没有线程同步和 futures 逻辑。
  • bus - 一个具有 futures::sink::Sinkfutures::stream::Stream 特性的异步 Pub/Sub 队列。

bus::Publisherchannel::Sender 用于向 bus::Subscriberchannel::Receiver 池广播数据。订阅者是可克隆的,这样许多线程或 futures 可以同时接收数据。唯一的限制是订阅者必须跟上发布者的频率。如果订阅者速度慢,它将丢失数据。

断开连接

通道上的广播和接收操作都将返回一个表示操作是否成功的 结果。操作失败通常表明通道的另一端在对应的线程中被“挂起”。

一旦通道的一半被释放,大多数操作都无法继续进行,因此会返回 错误。许多应用程序会继续从这个模块返回的结果中 展开,如果其中一个意外死亡,可能会导致线程间失败的传播。

示例

简单直接使用

extern crate bus_queue;

use bus_queue::raw_bounded;

fn main() {
    let (tx, rx) = raw_bounded(10);
    (1..15).for_each(|x| tx.broadcast(x).unwrap());

    let received: Vec<i32> = rx.map(|x| *x).collect();
    // Test that only the last 10 elements are in the received list.
    let expected: Vec<i32> = (5..15).collect();

    assert_eq!(expected, received);
}

简单异步使用

use bus_queue::bounded;
use futures::executor::block_on;
use futures::stream;
use futures::StreamExt;

fn main() {
    let (publisher, subscriber1) = bounded(10);
    let subscriber2 = subscriber1.clone();

    block_on(async move {
        stream::iter(1..15)
            .map(|i| Ok(i))
            .forward(publisher)
            .await
            .unwrap();
    });

    let received1: Vec<u32> = block_on(async { subscriber1.map(|x| *x).collect().await });
    let received2: Vec<u32> = block_on(async { subscriber2.map(|x| *x).collect().await });
    // Test that only the last 10 elements are in the received list.
    let expected = (5..15).collect::<Vec<u32>>();
    assert_eq!(received1, expected);
    assert_eq!(received2, expected);
}

依赖