#事件驱动 #事件 #事件处理 #消息 #命令 #处理 #命令处理

event-driven-library

Rust库用于事件驱动消息处理

44个版本

0.1.49 2023年10月12日
0.1.48 2023年10月11日
0.1.42 2023年9月26日
0.1.12 2023年8月22日
0.1.7 2023年7月31日

#721 in 异步

Download history 2/week @ 2024-03-08 2/week @ 2024-03-15 188/week @ 2024-03-29 45/week @ 2024-04-05

每月437次下载

MIT许可证

39KB
542

一个用于编写可靠和可扩展系统的事件驱动框架。

在较高层面,它提供了一些主要组件

事件驱动库之旅

事件驱动库由多个模块组成,这些模块提供了在Rust中实现类似消息总线应用的必要功能。在本节中,我们将简要浏览,总结主要API及其用途。

命令 & 事件

您可以使用以下方式将任何通用结构体与Command派生宏相关联

#[derive(Command)]
pub struct MakeOrder {
    pub user_id: i64,
    pub items: Vec<String>,
}

当您附加Command派生宏时,消息总线现在将能够理解如何以及在哪里派遣命令。

同样,您可以对事件做同样的事情

#[derive(Serialize, Deserialize, Clone, Message)]
#[internally_notifiable]
pub struct OrderFailed {
    #[identifier]
    pub user_id: i64,
}

#[derive(Serialize, Deserialize, Clone, Message)]
#[internally_notifiable]
pub struct OrderSucceeded{
    #[identifier]
    pub user_id: i64,
    pub items: Vec<String>
}

请注意,使用internally_notifiable(或externally_notifiable)和identifier是必须的。

  • internally_notifiable是标记,让系统知道事件应该在应用内部处理
  • externally_notifiable是为了留下OutBox
  • identifier是为了记录聚合ID。

初始化命令处理器

命令处理器负责处理应用程序中的命令,其响应将直接发送给客户端。命令本质上是命令式的,意味着它们指定了应该做什么。

use event_driven_library::prelude::{init_command_handler, init_event_handler};


init_command_handler!(
{
   MakeOrder: OrderHandler::make_order,
   CancelOrder: OrderHandler::cancel_order
}
);

在上面的例子中,您可以看到MakeOrder被映射到OrderHandler::make_order,这是应用层的处理器。

在这个阶段,假设您想要处理 MakeOrder 命令处理的成功/失败情况。那么,您需要考虑使用事件处理器。

注册事件

EventCommand 或另一个 Event 处理的副作用。只要它们都消费相同类型的事件,您可以注册尽可能多的处理器,如下所示

示例

init_event_handler!(
{
   OrderFaild: [
           NotificationHandler::send_mail,
           ],
   OrderSucceeded: [
           DeliveryHandler::checkout_delivery_items,
           InventoryHandler::change_inventory_count
           ]
}
);

MakeOrder 命令处理中,我们有两种事件 OrderFailedOrderSucceeded,它们有自己的处理程序。事件是由 MessageBus 抛出的,并由 Context 抛出的处理器中引发的。然后 MessageBus 会循环遍历处理器,除非接收到 StopSentinel

处理器 API 示例

处理器可以位于任何位置,只要它们接受两个参数

  • msg - 要么是 Command 要么是 Event
  • context - [AtomicContextManager]

示例

pub async fn make_order(
    cmd: MakeOrder,
    context: AtomicContextManager,
) -> Result<ServiceResponse, ServiceError> {
    let mut uow = UnitOfWork::<Repository<OrderAggregate>, TExecutor>::new(context).await;

    let mut order_aggregate = OrderAggregate::new(cmd);
    uow.repository().add(&mut task_aggregate).await?;

    uow.commit::<ServiceOutBox>().await?;

    Ok(().into())
}

但有时,您可能想要添加更多依赖项。为此,已经实现了依赖注入机制。因此,您也可以做类似的事情

pub async fn make_order(
    cmd: MakeOrder,
    context: AtomicContextManager,
    payment_gateway_caller: Box<dyn Fn(String, Value) -> Future<(), ServiceError> + Send + Sync + 'static> //injected dependency
) -> Result<ServiceResponse, ServiceError> {
    let mut uow = UnitOfWork::<Repository<OrderAggregate>, TExecutor>::new(context).await;

    let mut order_aggregate = OrderAggregate::new(cmd,payment_gateway_caller);
    uow.repository().add(&mut task_aggregate).await?;

    uow.commit::<ServiceOutBox>().await?;

    Ok(().into())
}

这是怎么可能的?因为我们预处理处理器,使其能够允许 DI 容器

依赖注入

您可以通过在自由函数顶部放置属性来简单地注册依赖项。

示例

#[dependency]
pub fn payment_gateway_caller() -> Box<dyn Fn(String, Value) -> Future<(), ServiceError> + Send + Sync + 'static> {
    if cfg!(test) {
        __test_payment_gateway_caller()  //Dependency For Test
    } else {
        __actual_payment_gateway_caller() //Real Dependency
    }
}

这很好,因为它可以让您摆脱语言的静态性质。

消息总线

在事件驱动库的核心是 MessageBus,它从 UnitOfWork 获取命令和引发事件,并将事件分发给正确的处理器。由于这仅在框架端完成,因此您唯一能感觉到消息总线存在的方式是调用它。其他所有事情都是神奇地完成的。

示例

#[derive(Command)]
pub struct MakeOrder { // Test Command
    pub user_id: i64,
    pub items: Vec<String>
}

async fn test_func(){
    let bus = MessageBus::new(command_handler(), event_handler())
    let command = MakeOrder{user_id:1, items:vec!["shirts","jeans"]}
    match bus.handle(command).await{
        Err(err)=> { // test for error case }
        Ok(val)=> { // test for happy case }
    }
    }
    }   
}

消息总线错误

当命令尚未注册时,它返回一个错误 - BaseError::NotFound 请注意,与分布式事件处理不同,总线不返回事件处理的结果。

依赖项

~6.5–9MB
~144K SLoC