3 个版本
| 0.0.3 | 2020年11月2日 |
|---|---|
| 0.0.2 | 2020年8月23日 |
| 0.0.1 | 2020年8月22日 |
6 in #crossbeam
15KB
213 行
double_decker
使用 crossbeam 通道构建的简单无界多生产者多消费者事件总线。
为什么选择 double_decker?
与 bus crate 中的 Bus 不同,double_decker::Bus 是无界的,而且众所周知,双层巴士 比普通巴士能载更多乘客 🤷♂️。
与 bus::Bus 不同,double_decker::Bus 实现了一个便宜的 Clone,这对我来说非常有用。
听起来双层巴士比普通巴士更好。这是否意味着 double_decker::Bus 比代码库中的 bus::Bus 更好?
不。
bus crate 是成熟的,并且完全无锁。这个实现既不是成熟也不是无锁的!
设计
T 必须实现 Clone,以便它可以传递给所有消费者。
当你调用 add_rx() 时,会创建一个 Sender/Receiver 对,并将 Sender 存储在一个 HashMap 后面的 RwLock 中。
broadcast() 使用 RwLock 的共享读访问,并将事件按添加顺序发送给每个 Receiver。
当订阅者数量变化时,可能会发生锁竞争,因为这需要写入 RwLock 的访问权限。这发生在你调用 add_rx() 时,或者当你调用 broadcast() 时,因为一个或多个 Sender 返回 SendError,因为它已经断开连接。
从 bus crate 中复制的示例
单发送,多消费者示例
use double_decker::Bus;
let mut bus = Bus::new();
let mut rx1 = bus.add_rx();
let mut rx2 = bus.add_rx();
bus.broadcast("Hello");
assert_eq!(rx1.recv(), Ok("Hello"));
assert_eq!(rx2.recv(), Ok("Hello"));
多发送,多消费者示例
use double_decker::Bus;
use std::thread;
let mut bus = Bus::new();
let mut rx1 = bus.add_rx();
let mut rx2 = bus.add_rx();
// start a thread that sends 1..100
let j = thread::spawn(move || {
for i in 1..100 {
bus.broadcast(i);
}
});
// every value should be received by both receivers
for i in 1..100 {
// rx1
assert_eq!(rx1.recv(), Ok(i));
// and rx2
assert_eq!(rx2.recv(), Ok(i));
}
j.join().unwrap();
还包括 subscribe 和 subscribe_on_thread,这两个功能允许您通过闭包订阅广播事件,该闭包会在每次广播时被调用。其中 subscribe 是阻塞式的,而 subscribe_on_thread 则在另一个线程中调用闭包。
subscribe_on_thread 返回一个 Subscription,您应该保留它,因为当它被丢弃时线程会终止。
use double_decker::{Bus, SubscribeToReader};
let bus = Bus::<i32>::new();
// This would block
// bus.subscribe(Box::new(move |_event| {
// // This closure is called on every broadcast
// }));
let _subscription = bus.subscribe_on_thread(Box::new(move |_event| {
// This closure is called on every broadcast
}));
bus.broadcast(5);
许可证:MIT
依赖项
~125KB