44个版本
0.1.49 | 2023年10月12日 |
---|---|
0.1.48 | 2023年10月11日 |
0.1.42 | 2023年9月26日 |
0.1.12 | 2023年8月22日 |
0.1.7 | 2023年7月31日 |
#721 in 异步
每月437次下载
39KB
542 行
一个用于编写可靠和可扩展系统的事件驱动框架。
在较高层面,它提供了一些主要组件
- 用于具有特质的核组件的工具,
- 用于处理事件和命令的宏
事件驱动库之旅
事件驱动库由多个模块组成,这些模块提供了在Rust中实现类似消息总线应用的必要功能。在本节中,我们将简要浏览,总结主要API及其用途。
命令 & 事件
您可以使用以下方式将任何通用结构体与Command派生宏相关联
#[derive(Command)]
pub struct MakeOrder {
pub user_id: i64,
pub items: Vec<String>,
}
当您附加Command派生宏时,消息总线现在将能够理解如何以及在哪里派遣命令。
同样,您可以对事件做同样的事情
#[derive(Serialize, Deserialize, Clone, Message)]
#[internally_notifiable]
pub struct OrderFailed {
#[identifier]
pub user_id: i64,
}
#[derive(Serialize, Deserialize, Clone, Message)]
#[internally_notifiable]
pub struct OrderSucceeded{
#[identifier]
pub user_id: i64,
pub items: Vec<String>
}
请注意,使用internally_notifiable
(或externally_notifiable
)和identifier
是必须的。
internally_notifiable
是标记,让系统知道事件应该在应用内部处理externally_notifiable
是为了留下OutBox
。identifier
是为了记录聚合ID。
初始化命令处理器
命令处理器负责处理应用程序中的命令,其响应将直接发送给客户端。命令本质上是命令式的,意味着它们指定了应该做什么。
use event_driven_library::prelude::{init_command_handler, init_event_handler};
init_command_handler!(
{
MakeOrder: OrderHandler::make_order,
CancelOrder: OrderHandler::cancel_order
}
);
在上面的例子中,您可以看到MakeOrder
被映射到OrderHandler::make_order
,这是应用层的处理器。
在这个阶段,假设您想要处理 MakeOrder
命令处理的成功/失败情况。那么,您需要考虑使用事件处理器。
注册事件
Event
是 Command 或另一个 Event 处理的副作用。只要它们都消费相同类型的事件,您可以注册尽可能多的处理器,如下所示
示例
init_event_handler!(
{
OrderFaild: [
NotificationHandler::send_mail,
],
OrderSucceeded: [
DeliveryHandler::checkout_delivery_items,
InventoryHandler::change_inventory_count
]
}
);
在 MakeOrder
命令处理中,我们有两种事件 OrderFailed
或 OrderSucceeded
,它们有自己的处理程序。事件是由 MessageBus 抛出的,并由 Context 抛出的处理器中引发的。然后 MessageBus 会循环遍历处理器,除非接收到 StopSentinel
。
处理器 API 示例
处理器可以位于任何位置,只要它们接受两个参数
示例
pub async fn make_order(
cmd: MakeOrder,
context: AtomicContextManager,
) -> Result<ServiceResponse, ServiceError> {
let mut uow = UnitOfWork::<Repository<OrderAggregate>, TExecutor>::new(context).await;
let mut order_aggregate = OrderAggregate::new(cmd);
uow.repository().add(&mut task_aggregate).await?;
uow.commit::<ServiceOutBox>().await?;
Ok(().into())
}
但有时,您可能想要添加更多依赖项。为此,已经实现了依赖注入机制。因此,您也可以做类似的事情
pub async fn make_order(
cmd: MakeOrder,
context: AtomicContextManager,
payment_gateway_caller: Box<dyn Fn(String, Value) -> Future<(), ServiceError> + Send + Sync + 'static> //injected dependency
) -> Result<ServiceResponse, ServiceError> {
let mut uow = UnitOfWork::<Repository<OrderAggregate>, TExecutor>::new(context).await;
let mut order_aggregate = OrderAggregate::new(cmd,payment_gateway_caller);
uow.repository().add(&mut task_aggregate).await?;
uow.commit::<ServiceOutBox>().await?;
Ok(().into())
}
这是怎么可能的?因为我们预处理处理器,使其能够允许 DI 容器
。
依赖注入
您可以通过在自由函数顶部放置属性来简单地注册依赖项。
示例
#[dependency]
pub fn payment_gateway_caller() -> Box<dyn Fn(String, Value) -> Future<(), ServiceError> + Send + Sync + 'static> {
if cfg!(test) {
__test_payment_gateway_caller() //Dependency For Test
} else {
__actual_payment_gateway_caller() //Real Dependency
}
}
这很好,因为它可以让您摆脱语言的静态性质。
消息总线
在事件驱动库的核心是 MessageBus,它从 UnitOfWork
获取命令和引发事件,并将事件分发给正确的处理器。由于这仅在框架端完成,因此您唯一能感觉到消息总线存在的方式是调用它。其他所有事情都是神奇地完成的。
示例
#[derive(Command)]
pub struct MakeOrder { // Test Command
pub user_id: i64,
pub items: Vec<String>
}
async fn test_func(){
let bus = MessageBus::new(command_handler(), event_handler())
let command = MakeOrder{user_id:1, items:vec!["shirts","jeans"]}
match bus.handle(command).await{
Err(err)=> { // test for error case }
Ok(val)=> { // test for happy case }
}
}
}
}
消息总线错误
当命令尚未注册时,它返回一个错误 - BaseError::NotFound
请注意,与分布式事件处理不同,总线不返回事件处理的结果。
依赖项
~6.5–9MB
~144K SLoC