#event-sourcing #cqrs #es #architecture #ddd

esrs

一个 Prima.it-有意见的库,用于实现 cqrs/es

40 个版本 (16 个破坏性版本)

0.17.1 2024 年 8 月 20 日
0.17.0 2024 年 6 月 12 日
0.16.0 2024 年 5 月 8 日
0.14.0 2024 年 1 月 9 日
0.2.7 2021 年 3 月 29 日

#56异步

Download history 282/week @ 2024-05-01 525/week @ 2024-05-08 210/week @ 2024-05-15 235/week @ 2024-05-22 259/week @ 2024-05-29 352/week @ 2024-06-05 393/week @ 2024-06-12 379/week @ 2024-06-19 455/week @ 2024-06-26 280/week @ 2024-07-03 270/week @ 2024-07-10 289/week @ 2024-07-17 390/week @ 2024-07-24 373/week @ 2024-07-31 570/week @ 2024-08-07 618/week @ 2024-08-14

每月 1,972 次下载

MIT/Apache

93KB
1.5K SLoC

Event sourcing.rs

它是一个用于在 Rust 中实现 cqrs/es 的有意见的库。

一些示例代码片段可以在 example 文件夹中找到。

安装

Event Sourcing RS 在底层使用 sqlx

# Cargo.toml
[dependencies]
# postgres database
esrs = { version = "0.17", features = ["postgres"] }
sqlx = { version = "0.7", features = ["postgres", "runtime-tokio-native-tls", "uuid", "json", "chrono"] }

跟踪

每次使用投影器或对给定事件应用策略时,都会生成一个跟踪跨度。

运行示例、测试和 linting

启动 docker-compose 栈

docker compose run --service-ports web bash

运行测试。

cargo make test

运行 linters。

cargo make clippy

用法

本节介绍了将库集成到您的应用程序中,使您能够启动 CQRS/事件源实现。

事件源

事件源是一种软件架构模式,用于捕获和存储应用程序状态随时间发生的变化或事件。与持久化对象或实体的当前状态不同,事件源侧重于记录导致该状态的一系列事件。

聚合

要开始在您的代码库中实现 CQRS/事件源,首先您需要创建一个 Aggregate

在事件源的情况下,聚合是一个基本概念,它代表一组领域对象及其状态变化。它是用于在事件源系统中建模和管理数据的关键构建块之一。

因此,聚合负责处理传入的命令并生成相应的反映其状态变化的_even_。它封装了与它所管理的数据相关的特定操作的业务逻辑和规则。聚合为事件源系统中的一致性和并发控制提供了一个清晰的边界。

实现自己的聚合

pub struct Book;

impl Aggregate for Book {
    ...
}

Aggregate 特性的每个实现都必须定义特定的类型和函数,以满足其接口。

  • const NAME:常量值。它指的是用于在系统中识别特定聚合实例的唯一标识符或键。

  • type State:指的是在特定时间点特定聚合实例的当前状态。它代表了由聚合表示的实体的所有数据和属性。

  • type Command:表示对聚合实例执行特定操作或操作的请求或消息。

  • type Event:是特定聚合实例发生的重要状态变化或事件的表示。聚合事件是事件源的核心,因为它们捕获了聚合随时间所做的所有更改。

  • type Error:表示在尝试处理命令时可能发生的每次验证失败。

  • fn handle_command:负责处理和验证传入的命令,并发出相应的_even_。

  • fn apply_event:此函数负责处理单个_even_,或重新播放一批_even_,并将它们的效果应用于 Aggregate 的状态。

以下是一些示例实现

NAME:

const NAME: &'static str = "book";

State

pub struct BookState {
    pub leftover: i32,
}

impl Default for BookState {
    fn default() -> Self {
        Self { leftover: 10 }
    }
}

Command

pub enum BookCommand {
    Buy {
        num_of_copies: i32,
    },
    Return {
        num_of_copies: i32,
    },
}

Event

use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Clone)]
pub enum BookEvent {
    Bought {
        num_of_copies: i32,
    },
    Returned {
        num_of_copies: i32,
    }
}

Error

use thiserror::Error;

#[derive(Debug, Error)]
pub enum BookError {
    NotEnoughCopies,
}

现在让我们将所有这些放入 Aggregate 中,同时实现 handle_commandapply_events 函数。

...

impl Aggregate for Book {
    const NAME: &'static str = "book";
    type State = BookState;
    type Command = BookCommand;
    type Event = BookEvent;
    type Error = BookError;

    fn handle_command(state: &Self::State, command: Self::Command) -> Result<Vec<Self::Event>, Self::Error> {
        match command {
            BookCommand::Buy { num_of_copies } if state.leftover < num_of_copies => Err(BookError::NotEnoughCopies),
            BookCommand::Buy { num_of_copies } => Ok(vec![BookEvent::Bought { num_of_copies }]),
            BookCommand::Return { num_of_copies } => Ok(vec![BookEvent::Returned { num_of_copies }]),
        }
    }

    fn apply_event(state: Self::State, payload: Self::Event) -> Self::State {
        match payload {
            BookEvent::Bought { num_of_copies } => BookState { leftover: state.leftover - num_of_copies },
            BookEvent::Returned { num_of_copies } => BookState { leftover: state.leftover + num_of_copies },
        }
    }
}

添加持久化层

目前,唯一的现有持久化层是 PgStore。它的作用是将所有_even_写入 postgres 中的专用表中。表名使用 Aggregate::NAME 常量值与 _events 连接构建。

!重要:每个 PgStore 都是为每个聚合全局唯一设计的。如果您需要在运行时动态构建它,请记住在构建存储时调用 without_running_migrations,以避免执行迁移。

注意:为了使用 PgStore,需要一个完全实现的 Aggregate

use sqlx::{Pool, Postgres};

let pool: Pool<Postgres> = unimplemented!();

// Building a `PgStore`
let store: PgStore<Book> = PgStoreBuilder::new(pool)
        .try_build()
        .await
        .expect("Failed to create PgStore");

let mut state: BookState = BookState::default();
// Using the store
let events: Vec<BookEvent> = Book::handle_command(&state, BookCommand::Buy { num_of_copies: 1 });

store.persist(&mut state, events).await?;

为了减轻编写所有这些代码的负担,您可以利用 AggregateManager 辅助工具。可以将 AggregateManager 视为同步的 CommandBus

使用 SchemaAggregate::Event 与数据库解耦

为了避免由 Aggregate::Event 表示的领域事件与持久层之间的强耦合,可以在 PgStore 上引入一个 Schema 类型。

此类型必须实现 SchemaPersistable。这种机制使领域事件能够更自由地发展。例如,可以通过使用架构来弃用事件变体(请参阅弃用事件示例)。此外,此机制可以用作向上提升(upcasting)的替代方案(请参阅向上提升示例)。

有关如何将架构引入现有应用的示例,请参阅引入架构示例

let manager: AggregateManager<_> = AggregateManager::new(store);
manager.handle_command(Default::default(), BookCommand::Buy { num_of_copies: 1 }).await

CQRS

CQRS代表命令查询责任分离(Command Query Responsibility Segregation),这是一种软件架构模式,它将处理应用程序中读取和写入操作的责任分开。CQRS背后的核心思想是将应用程序的数据模型分为两个不同的模型:一个优化于读取操作(查询端),另一个优化于写入操作(命令端)。

同步方式

在CQRS的纯粹概念中,这种方法可能会引起争议。然而,其根本意图是在各种场景下,在操作完成后为用户提供及时反馈。

例如,它旨在通过确保读取端及时更新,消除假设的前端不断轮询后端以获取更新的需求。此外,它还适应了操作序列形成链的情况,整个任务的完成至关重要,例如SAGA模式。

总的来说,主要目标是通过对读取和写入操作使用定义良好且专业的模型来增强用户体验和系统效率,根据特定用例定制架构。

CQRS/事件源上下文中的两个主要支柱是“事件处理器”和“事务性事件处理器”。

事件处理器

根据定义,EventHandler 在最终一致的基础上运行,主要用于更新应用程序的读取端。它还常用于对其他聚合(包括所属的聚合)执行命令。

EventHandler 是不可靠的。不可靠的函数是指在任何情况下都不会失败或产生错误的函数。换句话说,它是一个总是产生有效结果,没有任何可能性引发异常、抛出错误或返回不正确数据的函数。错误处理的责任由库用户承担。

pub struct BookEventHandler;

#[async_trait::async_trait]
impl EventHandler<Book> for BookEventHandler {
    async fn handle(&self, event: &StoreEvent<BookEvent>) {
        // Implementation here
    }
}

// Where the store is built..
PgStoreBuilder::new(pool)
    // ..add your event handler
    .add_event_handler(BookEventHandler)
    .try_build()
    .await
    .expect("Failed to create PgStore");
事务性事件处理器

TransactionalEventHandlers 是一种专门的事件处理器形式,旨在在写入模型和读取模型之间保持事务一致性。

如果 TransactionalEventHandlers 中发生失败,它将触发事务的完全回滚,并随后向调用者返回错误。

!重要:强烈建议不要使用 TransactionalEventHandler 在另一个聚合上执行命令。

pub struct BookTransactionalEventHandler;

#[async_trait::async_trait]
impl TransactionalEventHandler<Book, PgStoreError, PgConnection> for BookTransactionalEventHandler {
    async fn handle(&self, event: &StoreEvent<BookEvent>, transaction: &mut PgConnection) -> Result<(), PgStoreError> {
        // Implementation here
        Ok(())
    }
}

// Where the store is built..
PgStoreBuilder::new(pool)
    // ..add your transactional event handler
    .add_transactional_event_handler(BookTransactionalEventHandler)
    .try_build()
    .await
    .expect("Failed to create PgStore");

异步方式

这种方法遵循CQRS的传统原则。当从应用程序的写入端发出事件时,相应的事件处理器异步处理它们。这种方法通过允许应用程序并发处理事件来增强响应性和可伸缩性。

事件总线

目前存在两种不同的事件总线实现,一种使用rabbit,另一种使用kafka,但可以自定义事件总线的实现。要这样做,只需实现EventBus特性。

为了这个示例,我们创建了一个强耦合的事件总线,而更通用的实现更为可取。

pub struct BookEventBus;

#[async_trait::async_trait]
impl EventBus<Book> for BookEventBus {
    async fn publish(&self, store_event: &StoreEvent<BookEvent>) {
        // Implementation herepub struct BookEventBus;

#[async_trait::async_trait]
impl EventBus<Book> for BookEventBus {
    async fn publish(&self, store_event: &StoreEvent<BooKEvent>) {
        // Implementation herre
    }
}
    }
}

随后,在构建时将EventBus集成到PgStore中。

// Where the store is built..
PgStoreBuilder::new(pool)
    // ..add your event handler
    .add_event_bus(BookEventBus)
    .try_build()
    .await
    .expect("Failed to create PgStore");
再次消费总线和处理事件...

为了最终确定异步方法,接下来的阶段是构建一个读侧。为了实现这一点,我们继续依赖于EventHandler。然而,库中缺少内置的消费者。实现消费者留给库用户自行决定。这种“消费者”的概念是从总线上消费消息,然后将它们应用到每个负责执行特定任务的EventHandler上。

由于实现非常类似于“同步方式”,我们避免了再次编写它。

后续

您可以在使用部分的完整示例中找到包含所有展示内容的示例,在readme示例中。

依赖项

~6–24MB
~359K SLoC