#事件总线 #多生产者 #多消费者 #通道 #事件 #crossbeam #无界

double_decker

使用 crossbeam 通道构建的简单无界多生产者多消费者事件总线

3 个版本

0.0.3 2020年11月2日
0.0.2 2020年8月23日
0.0.1 2020年8月22日

6 in #crossbeam

MIT 许可证

15KB
213

double_decker

使用 crossbeam 通道构建的简单无界多生产者多消费者事件总线。

为什么选择 double_decker?

bus crate 中的 Bus 不同,double_decker::Bus 是无界的,而且众所周知,双层巴士 比普通巴士能载更多乘客 🤷‍♂️。

bus::Bus 不同,double_decker::Bus 实现了一个便宜的 Clone,这对我来说非常有用。

听起来双层巴士比普通巴士更好。这是否意味着 double_decker::Bus 比代码库中的 bus::Bus 更好?

不。

bus crate 是成熟的,并且完全无锁。这个实现既不是成熟也不是无锁的!

设计

T 必须实现 Clone,以便它可以传递给所有消费者。

当你调用 add_rx() 时,会创建一个 Sender/Receiver 对,并将 Sender 存储在一个 HashMap 后面的 RwLock 中。

broadcast() 使用 RwLock 的共享读访问,并将事件按添加顺序发送给每个 Receiver

当订阅者数量变化时,可能会发生锁竞争,因为这需要写入 RwLock 的访问权限。这发生在你调用 add_rx() 时,或者当你调用 broadcast() 时,因为一个或多个 Sender 返回 SendError,因为它已经断开连接。

bus crate 中复制的示例

单发送,多消费者示例

use double_decker::Bus;

let mut bus = Bus::new();
let mut rx1 = bus.add_rx();
let mut rx2 = bus.add_rx();

bus.broadcast("Hello");
assert_eq!(rx1.recv(), Ok("Hello"));
assert_eq!(rx2.recv(), Ok("Hello"));

多发送,多消费者示例

use double_decker::Bus;
use std::thread;

let mut bus = Bus::new();
let mut rx1 = bus.add_rx();
let mut rx2 = bus.add_rx();

// start a thread that sends 1..100
let j = thread::spawn(move || {
    for i in 1..100 {
        bus.broadcast(i);
    }
});

// every value should be received by both receivers
for i in 1..100 {
    // rx1
    assert_eq!(rx1.recv(), Ok(i));
    // and rx2
    assert_eq!(rx2.recv(), Ok(i));
}

j.join().unwrap();

还包括 subscribesubscribe_on_thread,这两个功能允许您通过闭包订阅广播事件,该闭包会在每次广播时被调用。其中 subscribe 是阻塞式的,而 subscribe_on_thread 则在另一个线程中调用闭包。

subscribe_on_thread 返回一个 Subscription,您应该保留它,因为当它被丢弃时线程会终止。

use double_decker::{Bus, SubscribeToReader};

let bus = Bus::<i32>::new();

// This would block
// bus.subscribe(Box::new(move |_event| {
//     // This closure is called on every broadcast
// }));

let _subscription = bus.subscribe_on_thread(Box::new(move |_event| {
    // This closure is called on every broadcast
}));

bus.broadcast(5);

许可证:MIT

依赖项

~125KB