#cqrs #基于事件 #命令行工具 #命令总线

presage

一个用于设计基于事件系统的轻量级库

3 个版本 (重大更改)

0.3.0 2023年5月17日
0.2.0 2023年5月15日
0.1.0 2023年5月15日

#701开发工具

每月 33 次下载

MIT/Apache

38KB
496

Présage \pʁe.zaʒ\

一个用于设计基于事件系统的轻量级 Rust 库


概念

在基于事件系统中,任何可能发生的事情都用事件来建模。业务实体通过聚合来设计,聚合通过事件原子性地修改。

Présage 由领域驱动设计或命令查询责任分离等概念自由启发,但尽量保持无偏见,通过尽可能少地做出假设。具体来说,它不会将你绑定到任何持久化方法。

聚合

聚合表示在系统执行过程中可以演变的业务实体。它由一个唯一的 id 来标识。聚合总是通过原子修改来确保一致性。

要定义一个聚合,你需要创建一个结构体或枚举,它将作为聚合根并实现 Aggregate 特性

use presage::{Aggregate, Id};
use uuid::Uuid;

pub struct Todo {
    pub id: Id<Todo>,
    pub name: String,
    pub done: bool,
}

impl Aggregate for Todo {
    type Id = Uuid;
    type CreationEvent = TodoCreated;
    type UpdateEvent = TodoUpdated;
    type DeletionEvent = TodoDeleted;

    fn id(&self) -> Id<Self> {
        self.id
    }

    fn new(event: TodoCreated) -> Self {
        Self {
            id: event.id,
            name: event.name,
            done: false,
        }
    }

    fn apply(&mut self, event: TodoUpdated) {
        match event {
            TodoUpdated::Renamed { new_name, .. } => self.name = new_name,
            TodoUpdated::Done(_) => self.done = true,
        }
    }
}

你必须指定 id 的类型。它可以是任何东西,只要每个聚合都有一个唯一的值。在引用聚合时,应使用 Id<A>,因为它允许编译时检查。如果 id 的类型具有 Copy 特性,则 Id<A> 也将具有它。你需要实现 id 函数,该函数返回聚合的 id。

你还需要指定三种聚合事件类型:一种用于创建聚合,一种用于更新聚合,一种用于删除聚合(有关如何定义事件,请参阅 事件),以及一个用于创建新聚合的函数(new)和一个用于更新现有聚合的函数(apply)。

事件

事件表示过去发生的事情。在派发时使用 serde crate 进行序列化,以便可以在系统的独立部分之间共享。因此,事件必须实现 serde::Serializeserde::Deserialize

简单事件通过在结构体或枚举上实现 Event 特性来定义

#[derive(serde::Serialize, serde::Deserialize)]
struct SystemStarted(Instant);

impl presage::Event for SystemStarted {
    const NAME: &'static str = "system-started";
}

常量 NAME 识别事件的类型,且必须唯一。

为了修改系统的状态,需要专门的聚合事件。聚合事件影响单个聚合,必须通过其 id 进行引用。如果多个聚合被修改,则必须发出多个聚合事件。AggregateEvent 特性允许指定聚合事件。

use presage::{AggregateEvent, Event, Id};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
pub struct TodoCreated {
    pub id: Id<Todo>,
    pub name: String,
}

impl Event for TodoCreated {
    const NAME: &'static str = "todo-created";
}

impl AggregateEvent for TodoCreated {
    type Aggregate = Todo;

    fn id(&self) -> Id<Self::Aggregate> {
        self.id
    }
}

命令

命令是修改系统的请求。它们实现为包含所需信息的结构体或枚举。它与一个 命令处理器 相关联,当执行时会产生事件。

#[derive(Debug)]
pub struct CreateTodo {
    pub id: Id<Todo>,
    pub name: String,
}

impl presage::Command for CreateTodo {
    const NAME: &'static str = "create-todo";
}

处理器

一旦定义了命令和事件,就需要编写 处理器 来使系统真正执行某些操作。命令处理器接收一个命令并返回事件,而事件处理器接收一个事件并返回命令。

上下文

所有处理器都需要将可变借用上下文与它们处理的命令或事件一起传递。上下文可以是任何你需要的东西,其角色是管理输入和输出,例如在命令处理器中加载一个聚合。

定义处理器时,你可以指定在运行时使用的上下文的确切类型,或者使用泛型来指定处理器需要实现的特性。

命令处理器

命令处理器处理单个命令类型,通过其名称进行识别。它实现了 CommandHandler 特性

use presage::{CommandHandler, Error, Events, BoxedCommand};

struct CreateTodoHandler;

#[async_trait::async_trait]
impl<C, E> CommandHandler<C, E> for CreateTodoHandler
    where
        E: From<Error>,
{
    fn command_name(&self) -> &'static str {
        "create-todo"
    }

    async fn handle(&self, _context: &mut C, command: BoxedCommand) -> Result<Events, E> {
        let CreateTodo { id, name } = command.downcast()?;
        Ok(events!(TodoCreated { id, name }))
    }
}

事件处理器

事件处理器可以与多个事件类型相关联,并实现 EventHandler 特性

use presage::{Commands, Error, EventHandler, SerializedEvent};
use crate::{LoadTodoView, SaveTodoView, TodoView, TodoCreated, TodoUpdated};

struct CreateTodoViewOnTodoEvent;

#[async_trait::async_trait]
impl EventHandler<C, E> for CreateTodoViewOnTodoEvent
    where
        C: LoadTodoView + SaveTodoView,
        E: From<Error>,
{
    fn event_names(&self) -> &[&'static str] {
        &[TodoCreated::NAME, TodoUpdated::NAME]
    }

    async fn handle(&self, context: &mut C, event: &SerializedEvent) -> Result<Commands, E> {
        if event.name == TodoCreated::NAME {
            let TodoCreated { id, name } = event.deserialize()?;
            context.save(TodoView { id, name, done: false }).await?;
        } else {
            let todo_updated: TodoUpdated = event.deserialize()?;
            //
        }
        Ok(Default::default())
    }
}

为了简化命令、事件和处理器的编写,一些 derive 和属性宏可通过 derive 功能使用(默认启用)。

派生特性

CommandEventAggregateEvent 都可以派生。默认情况下,命令或事件的名称通过将类型名称转换为短横线命名法来派生(例如,TodoCreated 变为 todo-created)。如果你想指定另一个名称,可以在派生特性的类型上使用 #[presage(name = "name")] 属性。

为了正确推导 AggregateEvent 特性,需要更多信息。必须使用 #[presage] 属性指定相关聚合的类型:#[presage(Todo)]#[presage(aggregate = Todo)]。必须指定包含的字段,要么在类型上声明它,使用 #[presage(id = id_field)],要么在字段上添加 #[id] 属性。如果事件是枚举,可以使用两种方法的组合。

#[derive(Serialize, Deserialize, AggergateEvent)]
#[presage(aggregate = SomeAggregate, id = id)]
enum SomeAggregateEvent {
    // No field with the #[id] attribute, the field specified on the type is used
    Variant1 { id: Id<SomeAggregate>, /**/ },
    // The id field has a different name, we need to add the #[id] attribute
    Variant2 { #[id] id_field: Id<SomeAggregate>, /**/ },
    // Fields of a tuple variant have no name, we need to add the #[id] attribute
    Variant3(#[id] Id<SomeAggregate>, /**/),
}

处理器

可以使用 #[command_handler]#[event_handler] 属性在函数上自动创建 CommandHandlerEventHandler

对于命令处理器,函数必须有两个参数:上下文和处理命令。例如,上面定义的 CreateTodoHandler 可以写成这样:

#[command_handler]
pub async fn create_todo<C, E>(_context: &mut C, CreateTodo { id, name }: CreateTodo) -> Result<Events, E>
    where
        E: From<presage::Error>,
{
    Ok(events!(TodoCreated { id, name }))
}

#[event_handler] 可以与两种不同的函数签名一起使用。如果它只处理一种类型的事件,函数可以使用实际的事件类型作为第二个参数,否则需要在属性中指定事件名称,并使用 &SerializedEvent 作为第二个参数的类型。

#[event_handler]
pub async fn simple_event_handler<C, E>(context: &mut C, event: SomeEvent) -> Result<Commands, E>
    where
        E: From<presage::Error>,
{
    //
}

#[event_handler(events = [SomeEvent, "event-name"])]
pub async fn multiple_events_handler<C, E>(context: &mut C, event: &SerializedEvent) -> Result<Commands, E>
    where
        E: From<presage::Error>,
{
    //
}

对于两种类型的处理器,宏需要从函数签名中提取错误类型。如果你使用类型别名,可以在属性中指定错误类型。

#[command_handler(error = MyError)]
pub async fn some_handler(context: &mut MyContext, command: SomeCommand) -> MyResult<Events> {
    //
}

命令总线

命令总线是在使用 présage 运行系统时的主要入口点。它接收一个上下文和一个要执行的命令。随后持久化结果事件,然后执行任何匹配的事件处理器。这些事件处理器可以返回新的命令,这些命令也会被执行。只要发布新的命令,这个过程就会继续。注意避免无限循环!

配置

可以使用 new 函数(或 Default 实现)创建命令总线。然后可以通过提供 Configuration 来配置它。

let command_bus = CommandBus::new()
    .configure(
        Configuration::new()
            .event_handler(&some_event_handler)
            .command_handler(&some_command_handler)
    );

持久化

必须使用事件对所有系统的修改进行建模。这些修改使用 EventWriter 特性进行持久化。实际的实现可以持久化应用事件的结果,或者可以持久化事件本身。使用 présage,你可以选择你喜欢的方案,甚至可以混合两种方法。

示例

示例 文件夹包含了一个使用 présage 的简单命令行待办事项应用程序。它说明了本文件中讨论的大部分概念。

许可协议

根据以下任意一个许可协议授权:

任选其一。

贡献

除非您明确声明,否则根据 Apache-2.0 许可协议定义,您有意提交以包含在作品中的任何贡献,应按上述方式双授权,不附加任何额外条款或条件。

依赖项

~0.6–1.5MB
~33K SLoC