#bus #message #tokio #async #future

messagebus

MessageBus允许模块之间通过消息进行交互

43个版本

0.15.2 2024年2月7日
0.15.0 2023年11月29日
0.12.2 2023年6月21日
0.9.13 2021年12月16日
0.4.6 2020年12月27日

#3 in #bus

Download history 17/week @ 2024-04-20 3/week @ 2024-04-27 1/week @ 2024-05-18 2/week @ 2024-06-15 1/week @ 2024-06-22 5/week @ 2024-06-29 12/week @ 2024-07-06 442/week @ 2024-07-27

每月456次下载

MIT/Apache

59KB
2K SLoC

消息总线

Rust的异步消息总线

受Actix启发

基础

  1. 可以使用接收器(通常是一个队列实现)在actor之间传递消息
  2. 消息通过TypeId区分和传递
  3. 消息可以广播发送给多个接收器(克隆)或通过接收器id定向发送,平衡(取决于队列负载)或随机
  4. 已实现了不同类型的接收器
  • BufferUnordered Receiver(同步和异步)
  • 同步(同步和异步)
  • BatchedBufferUnordered Receiver(同步和异步)
  • BatchedSynchronized(同步和异步)
  1. 请求/响应API。以下是一个示例:demo_req_resp.rs

以下是已实现的处理器类型列表

pub trait Handler<M: Message>: Send + Sync {
    type Error: StdSyncSendError;
    type Response: Message;

    fn handle(&self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>;
    fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
        Ok(())
    }
}

#[async_trait]
pub trait AsyncHandler<M: Message>: Send + Sync {
    type Error: StdSyncSendError;
    type Response: Message;

    async fn handle(&self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>;
    async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
        Ok(())
    }
}

pub trait SynchronizedHandler<M: Message>: Send {
    type Error: StdSyncSendError;
    type Response: Message;

    fn handle(&mut self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>;
    fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
        Ok(())
    }
}

#[async_trait]
pub trait AsyncSynchronizedHandler<M: Message>: Send {
    type Error: StdSyncSendError;
    type Response: Message;

    async fn handle(&mut self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>;
    async fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
        Ok(())
    }
}

pub trait BatchHandler<M: Message>: Send + Sync {
    type Error: StdSyncSendError + Clone;
    type Response: Message;
    type InBatch: FromIterator<M> + Send;
    type OutBatch: IntoIterator<Item = Self::Response> + Send;

    fn handle(&self, msg: Self::InBatch, bus: &Bus) -> Result<Self::OutBatch, Self::Error>;
    fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
        Ok(())
    }
}

#[async_trait]
pub trait AsyncBatchHandler<M: Message>: Send + Sync {
    type Error: StdSyncSendError + Clone;
    type Response: Message;
    type InBatch: FromIterator<M> + Send;
    type OutBatch: IntoIterator<Item = Self::Response> + Send;

    async fn handle(&self, msg: Self::InBatch, bus: &Bus) -> Result<Self::OutBatch, Self::Error>;
    async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
        Ok(())
    }
}

pub trait BatchSynchronizedHandler<M: Message>: Send {
    type Error: StdSyncSendError + Clone;
    type Response: Message;
    type InBatch: FromIterator<M> + Send;
    type OutBatch: IntoIterator<Item = Self::Response> + Send;

    fn handle(&mut self, msg: Self::InBatch, bus: &Bus) -> Result<Self::OutBatch, Self::Error>;
    fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
        Ok(())
    }
}

#[async_trait]
pub trait AsyncBatchSynchronizedHandler<M: Message>: Send {
    type Error: StdSyncSendError + Clone;
    type Response: Message;
    type InBatch: FromIterator<M> + Send;
    type OutBatch: IntoIterator<Item = Self::Response> + Send;

    async fn handle(&mut self, msg: Self::InBatch, bus: &Bus) -> Result<Self::OutBatch, Self::Error>;
    async fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
        Ok(())
    }
}

  1. 已实现的处理器类型

    1. 不需要同步(处理器实现了SendSync
      • 非批量操作
        • 同步(spawn_blocking)
        • 异步(spawn)
      • 批量
        • 同步(spawn_blocking)
        • 异步(spawn)
    2. 需要同步(处理器仅实现了Send但没有实现Sync
      • 非批量操作
        • 同步(spawn_blocking)
        • 异步(spawn)
      • 批量
        • 同步(spawn_blocking)
        • 异步(spawn)
  2. 尚未实现的处理类型

    1. 需要同步和线程专用(处理器是!Sync!Send
      • 非批量操作
        • 同步(spawn_blocking)
        • 异步(spawn)
      • 批量
        • 同步(spawn_blocking)
        • 异步(spawn)
  3. 示例

use messagebus::{error::Error, receivers, AsyncHandler, Bus};
use async_trait::async_trait;

struct TmpReceiver;

#[async_trait]
impl AsyncHandler<i32> for TmpReceiver {
    type Error = Error;
    type Response = ();

    async fn handle(&self, msg: i32, bus: &Bus) -> Result<Self::Response, Self::Error> {
        println!("---> i32 {}", msg);

        bus.send(2i64).await?;

        Ok(())
    }
}

#[async_trait]
impl AsyncHandler<i64> for TmpReceiver {
    type Error = Error;
    type Response = ();

    async fn handle(&self, msg: i64, _bus: &Bus) -> Result<Self::Response, Self::Error> {
        println!("---> i64 {}", msg);

        Ok(())
    }
}

#[tokio::main]
async fn main() {
    let (b, poller) = Bus::build()
        .register(TmpReceiver)
            .subscribe::<i32, receivers::BufferUnorderedAsync<_>, _, _>(8, Default::default())
            .subscribe::<i64, receivers::BufferUnorderedAsync<_>, _, _>(8, Default::default())
            .done()
        .build();

    b.send(1i32).await.unwrap();

    println!("flush");
    b.flush().await;

    println!("close");
    b.close().await;

    println!("closed");

    poller.await;
    println!("[done]");
}

依赖关系

~6–13MB
~135K SLoC