3 个版本
0.0.3 | 2020年11月2日 |
---|---|
0.0.2 | 2020年8月23日 |
0.0.1 | 2020年8月22日 |
6 in #crossbeam
15KB
213 行
double_decker
使用 crossbeam 通道构建的简单无界多生产者多消费者事件总线。
为什么选择 double_decker?
与 bus
crate 中的 Bus
不同,double_decker::Bus
是无界的,而且众所周知,双层巴士 比普通巴士能载更多乘客 🤷♂️。
与 bus::Bus
不同,double_decker::Bus
实现了一个便宜的 Clone
,这对我来说非常有用。
听起来双层巴士比普通巴士更好。这是否意味着 double_decker::Bus
比代码库中的 bus::Bus
更好?
不。
bus
crate 是成熟的,并且完全无锁。这个实现既不是成熟也不是无锁的!
设计
T
必须实现 Clone
,以便它可以传递给所有消费者。
当你调用 add_rx()
时,会创建一个 Sender
/Receiver
对,并将 Sender
存储在一个 HashMap
后面的 RwLock
中。
broadcast()
使用 RwLock
的共享读访问,并将事件按添加顺序发送给每个 Receiver
。
当订阅者数量变化时,可能会发生锁竞争,因为这需要写入 RwLock
的访问权限。这发生在你调用 add_rx()
时,或者当你调用 broadcast()
时,因为一个或多个 Sender
返回 SendError
,因为它已经断开连接。
从 bus
crate 中复制的示例
单发送,多消费者示例
use double_decker::Bus;
let mut bus = Bus::new();
let mut rx1 = bus.add_rx();
let mut rx2 = bus.add_rx();
bus.broadcast("Hello");
assert_eq!(rx1.recv(), Ok("Hello"));
assert_eq!(rx2.recv(), Ok("Hello"));
多发送,多消费者示例
use double_decker::Bus;
use std::thread;
let mut bus = Bus::new();
let mut rx1 = bus.add_rx();
let mut rx2 = bus.add_rx();
// start a thread that sends 1..100
let j = thread::spawn(move || {
for i in 1..100 {
bus.broadcast(i);
}
});
// every value should be received by both receivers
for i in 1..100 {
// rx1
assert_eq!(rx1.recv(), Ok(i));
// and rx2
assert_eq!(rx2.recv(), Ok(i));
}
j.join().unwrap();
还包括 subscribe
和 subscribe_on_thread
,这两个功能允许您通过闭包订阅广播事件,该闭包会在每次广播时被调用。其中 subscribe
是阻塞式的,而 subscribe_on_thread
则在另一个线程中调用闭包。
subscribe_on_thread
返回一个 Subscription
,您应该保留它,因为当它被丢弃时线程会终止。
use double_decker::{Bus, SubscribeToReader};
let bus = Bus::<i32>::new();
// This would block
// bus.subscribe(Box::new(move |_event| {
// // This closure is called on every broadcast
// }));
let _subscription = bus.subscribe_on_thread(Box::new(move |_event| {
// This closure is called on every broadcast
}));
bus.broadcast(5);
许可证:MIT
依赖项
~125KB