#bus #non-blocking #mpmc

atomic-bus

原子 MPMC 总线

1 个不稳定版本

0.1.0 2024年6月1日

#795并发

MIT/Apache

15KB
127

atomic-bus

原子总线:一个无界、无锁、多生产者、多消费者 pub/sub 实现,利用原子操作。

示例

基本发送/订阅

以下示例将启动一个发送线程和一个打印订阅线程。

代码

use atomic_bus::AtomicBus;
use std::{sync::Arc, time::Duration, thread};

// create the bus
let bus: AtomicBus<String> = AtomicBus::new();

// subscribing before spawning the sender thread guarantees all sent messages will be received
let mut subscriber = bus.subscribe_mut();

// create and spawn a sender
let sender = bus.create_sender();
let arc_message = Arc::new("all messages are an Arc".to_owned());
thread::spawn(move || {
    sender.send("hello world!".to_owned());
    sender.send(arc_message);
    sender.send("done".to_owned());
});

// spawn printing subscriber and wait for it to complete
thread::spawn(move || loop {
    match subscriber.next() {
        None => thread::sleep(Duration::from_millis(1)),
        Some(x) => {
            println!("subscriber received: {x:?}");
            if x.as_ref() == "done" {
                return;
            }
        }
    }
})
.join()
.unwrap();

输出

subscriber received: "hello world!"
subscriber received: "all messages are an Arc"
subscriber received: "done"

负载均衡订阅

以下示例将启动一个发送线程和多个订阅线程,这些线程共享同一个 AtomicSubscriber 以在多个线程之间负载均衡接收的事件。为了简化示例,订阅线程将在尝试轮询后总是休眠,以模拟负载并避免一个贪婪的消费者在其他人有机会开始之前就接收所有事件。

代码

use atomic_bus::AtomicBus;
use std::{sync::Arc, time::Duration, thread};

// create the bus
let bus: AtomicBus<String> = AtomicBus::new();

// subscribing before spawning the sender thread guarantees all sent messages will be received
let subscriber = Arc::new(bus.subscribe());

// create and spawn a sender
let sender = bus.create_sender();
thread::spawn(move || {
    for i in 0..10 {
        sender.send(format!("message #{i}"));
    }
});

// spawn printing subscriber threads that share a single AtomicSubscription
let mut handles = Vec::new();
{
    for i in 0..3 {
        let subscriber = Arc::clone(&subscriber);
        let handle = thread::spawn(move || loop {
            if let Some(x) = subscriber.next() {
                println!("subscriber {i} received: {x:?}");
                if x.as_ref() == "done" {
                    return;
                }
            }
            thread::sleep(Duration::from_millis(10));
        });
        handles.push(handle);
    }
};

输出

subscriber 0 received: "message #0"
subscriber 1 received: "message #1"
subscriber 2 received: "message #2"
subscriber 0 received: "message #3"
subscriber 1 received: "message #4"
subscriber 2 received: "message #5"
subscriber 0 received: "message #6"
subscriber 1 received: "message #7"
subscriber 2 received: "message #8"
subscriber 0 received: "message #9"

许可证:MIT OR Apache-2.0

依赖项

~185KB