8 个版本
0.3.0 | 2023 年 4 月 6 日 |
---|---|
0.2.5 | 2021 年 11 月 5 日 |
0.2.4 | 2021 年 9 月 28 日 |
0.1.0 | 2021 年 9 月 3 日 |
#20 in #message-bus
每月 29 次下载
13KB
251 代码行
消息总线
Rust 的异步消息总线
灵感来自 Actix
基本功能
- 可以通过接收器(通常是队列实现)在演员之间传递消息
- 消息通过 TypeId 区分和传递
- 消息可以广播到多个接收器(克隆)或通过接收器 ID 地址传递,平衡(取决于队列负载)或随机
- 实现了不同类型的接收器
- BufferUnordered 接收器(同步和异步)
- 同步(同步和异步)
- BatchedBufferUnordered 接收器(同步和异步)
- BatchedSynchronized(同步和异步)
- 请求/响应 API。示例请见 demo_req_resp.rs
以下是已实现的处理器类型列表
pub trait Handler<M: Message>: Send + Sync {
type Error: StdSyncSendError;
type Response: Message;
fn handle(&self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>;
fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(())
}
}
#[async_trait]
pub trait AsyncHandler<M: Message>: Send + Sync {
type Error: StdSyncSendError;
type Response: Message;
async fn handle(&self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>;
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(())
}
}
pub trait SynchronizedHandler<M: Message>: Send {
type Error: StdSyncSendError;
type Response: Message;
fn handle(&mut self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>;
fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(())
}
}
#[async_trait]
pub trait AsyncSynchronizedHandler<M: Message>: Send {
type Error: StdSyncSendError;
type Response: Message;
async fn handle(&mut self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>;
async fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(())
}
}
pub trait BatchHandler<M: Message>: Send + Sync {
type Error: StdSyncSendError + Clone;
type Response: Message;
type InBatch: FromIterator<M> + Send;
type OutBatch: IntoIterator<Item = Self::Response> + Send;
fn handle(&self, msg: Self::InBatch, bus: &Bus) -> Result<Self::OutBatch, Self::Error>;
fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(())
}
}
#[async_trait]
pub trait AsyncBatchHandler<M: Message>: Send + Sync {
type Error: StdSyncSendError + Clone;
type Response: Message;
type InBatch: FromIterator<M> + Send;
type OutBatch: IntoIterator<Item = Self::Response> + Send;
async fn handle(&self, msg: Self::InBatch, bus: &Bus) -> Result<Self::OutBatch, Self::Error>;
async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(())
}
}
pub trait BatchSynchronizedHandler<M: Message>: Send {
type Error: StdSyncSendError + Clone;
type Response: Message;
type InBatch: FromIterator<M> + Send;
type OutBatch: IntoIterator<Item = Self::Response> + Send;
fn handle(&mut self, msg: Self::InBatch, bus: &Bus) -> Result<Self::OutBatch, Self::Error>;
fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(())
}
}
#[async_trait]
pub trait AsyncBatchSynchronizedHandler<M: Message>: Send {
type Error: StdSyncSendError + Clone;
type Response: Message;
type InBatch: FromIterator<M> + Send;
type OutBatch: IntoIterator<Item = Self::Response> + Send;
async fn handle(&mut self, msg: Self::InBatch, bus: &Bus) -> Result<Self::OutBatch, Self::Error>;
async fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
Ok(())
}
}
-
已实现的处理器类型
- 不需要同步(处理器实现了
Send
和Sync
)- 非批处理操作
- 同步(spawn_blocking)
- 异步(spawn)
- 批处理
- 同步(spawn_blocking)
- 异步(spawn)
- 非批处理操作
- 需要同步(处理器仅实现了
Send
但没有实现Sync
)- 非批处理操作
- 同步(spawn_blocking)
- 异步(spawn)
- 批处理
- 同步(spawn_blocking)
- 异步(spawn)
- 非批处理操作
- 不需要同步(处理器实现了
-
尚未实现的处理器类型
- 需要同步且线程专用(处理器是
!Sync
和!Send
)- 非批处理操作
- 同步(spawn_blocking)
- 异步(spawn)
- 批处理
- 同步(spawn_blocking)
- 异步(spawn)
- 非批处理操作
- 需要同步且线程专用(处理器是
-
示例
use messagebus::{error::Error, receivers, AsyncHandler, Bus};
use async_trait::async_trait;
struct TmpReceiver;
#[async_trait]
impl AsyncHandler<i32> for TmpReceiver {
type Error = Error;
type Response = ();
async fn handle(&self, msg: i32, bus: &Bus) -> Result<Self::Response, Self::Error> {
println!("---> i32 {}", msg);
bus.send(2i64).await?;
Ok(())
}
}
#[async_trait]
impl AsyncHandler<i64> for TmpReceiver {
type Error = Error;
type Response = ();
async fn handle(&self, msg: i64, _bus: &Bus) -> Result<Self::Response, Self::Error> {
println!("---> i64 {}", msg);
Ok(())
}
}
#[tokio::main]
async fn main() {
let (b, poller) = Bus::build()
.register(TmpReceiver)
.subscribe::<i32, receivers::BufferUnorderedAsync<_>, _, _>(8, Default::default())
.subscribe::<i64, receivers::BufferUnorderedAsync<_>, _, _>(8, Default::default())
.done()
.build();
b.send(1i32).await.unwrap();
println!("flush");
b.flush().await;
println!("close");
b.close().await;
println!("closed");
poller.await;
println!("[done]");
}
依赖项
~1.5MB
~35K SLoC