43个版本 (26个重大更新)

0.27.0 2024年5月6日
0.25.0 2024年4月10日
0.24.0 2024年3月27日
0.12.0 2023年12月28日
0.6.0 2023年1月31日

#795解析器实现

Download history 102/week @ 2024-04-28 302/week @ 2024-05-05 81/week @ 2024-05-12 48/week @ 2024-05-19 32/week @ 2024-05-26 22/week @ 2024-06-02 115/week @ 2024-06-09 46/week @ 2024-06-16 40/week @ 2024-06-23 38/week @ 2024-06-30 122/week @ 2024-07-07 50/week @ 2024-07-14 6/week @ 2024-07-21 173/week @ 2024-07-28 12/week @ 2024-08-04 17/week @ 2024-08-11

每月210 次下载
用于 3 crates

Apache-2.0

57KB
1K SLoC

EventSourced

Crates.io license

Rust中的事件溯源实体。

概念

EventSourced在很大程度上受到了令人惊叹的 Akka Persistence 库的启发。它提供了一个框架来实现 事件溯源CQRS

[EventSourced] 特质定义了事件类型和事件溯源实体的处理。这些可以通过类型名称和ID识别,并可以通过 EventSourcedExt::entity 扩展方法创建。可以通过 [Command] 特质定义命令,该特质包含一个命令处理函数,用于拒绝命令或返回一个事件。事件被持久化到事件日志中,然后应用于事件处理程序以返回实体的新状态。

                 ┌───────┐   ┌ ─ ─ ─ Entity─ ─ ─ ─
                 │Command│                        │
┌ ─ ─ ─ ─ ─ ─    └───────┘   │ ┌────────────────┐
    Client   │────────────────▶│ handle_command │─┼─────────┐
└ ─ ─ ─ ─ ─ ─                │ └────────────────┘           │
       ▲                           │    │         │         │ ┌─────┐
        ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─     │read               │ │Event│
                  ┌─────┐               ▼         │         ▼ └─────┘
                  │Reply│    │     ┌─────────┐       ┌ ─ ─ ─ ─ ─ ─
                  │  /  │          │  State  │    │     EventLog  │
                  │Error│    │     └─────────┘       └ ─ ─ ─ ─ ─ ─
                  └─────┘               ▲         │         │ ┌─────┐
                             │     write│                   │ │Event│
                                        │         │         │ └─────┘
                             │ ┌────────────────┐           │
                               │  handle_event  │◀┼─────────┘
                             │ └────────────────┘
                                                  │
                             └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─

[EventLog] 和 [SnapshotStore] 特质定义了一个可插拔的事件日志和一个可插拔的快照存储。对于 NATSPostgres,这些在相应的crates中实现。

EventSourcedEntity::spawn 将事件溯源实体放置在给定的事件日志和快照存储中,返回一个 [EntityRef],可以廉价地克隆并用于向实体传递命令。事件和快照状态到和从字节的转换通过给定的 [Binarize] 实现完成;对于 prostserde_json,这些已经提供。在处理配置的事件数量后进行快照,以加快未来的孵化。

EntityRef::handle_command 对于拒绝的命令返回 Command::Error,对于接受的命令返回 Command::Reply,并包装在另一个 Result 中以处理技术错误。

可以通过ID或实体类型从事件日志中查询事件。这些查询可以用于构建只读侧投影。在eventsourced-projection存储库中已有对投影的早期支持。

构建项目和示例的要求

在构建项目和示例之前,请确保您已安装protobuf依赖项。这不仅对于与prost的可选字节转换是必需的,而且对于eventsourced-nats也是必需的。不使用protobuf的唯一方法是不要使用prost和不要构建eventsourced-nats。

在macOS上,可以通过Homebrew安装protobuf

brew install protobuf

反例(非故意双关语)

example目录中的counter包包含一个简单的示例:一个计数器,它处理IncDec命令,并发出/处理IncreasedDecreased事件。

#[derive(Debug)]
struct Increase(u64);

impl Command<Uuid, Event, Counter> for Increase {
    type Error = Overflow;

    type Reply = u64;

    fn handle(self, id: &Uuid, state: &Counter) -> Result<Event, Self::Error> {
        if u64::MAX - state.0 < self.0 {
            Err(Overflow)
        } else {
            Ok(Event::Increased(*id))
        }
    }

    fn reply(state: &Counter) -> Self::Reply {
        state.0
    }
}

#[derive(Debug)]
struct Overflow;

#[derive(Debug)]
struct Decrease(u64);

impl Command<Uuid, Event, Counter> for Decrease {
    type Error = Underflow;

    type Reply = Counter;

    fn handle(self, id: &Uuid, state: &Counter) -> Result<Event, Self::Error> {
        if state.0 < self.0 {
            Err(Underflow)
        } else {
            Ok(Event::Decreased(*id))
        }
    }

    fn reply(state: &Counter) -> Self::Reply {
        *state
    }
}

#[derive(Debug, PartialEq, Eq)]
struct Underflow;

#[derive(Debug, Serialize, Deserialize)]
enum Event {
    Increased(Uuid),
    Decreased(Uuid),
}

#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)]
struct Counter(u64);

impl Counter {
    fn handle_event(self, evt: Event) -> Self {
        match evt {
            Event::Increased(_) => Self(self.0 + 1),
            Event::Decreased(_) => Self(self.0 - 1),
        }
    }
}

还有两个counter-natscounter-postgres包,每个包都有一个二进制存储库,分别使用eventsourced-natseventsourced-postgres来处理事件日志。

...
let evt_log = evt_log.clone();
let snapshot_store = snapshot_store.clone();
let counter = Counter::default();
let counter = counter
    .spawn(
        id,
        unsafe { NonZeroUsize::new_unchecked(42) },
        evt_log,
        snapshot_store,
        convert::serde_json::binarizer(),
    )
    .await
    .context("cannot spawn entity")?;

tasks.spawn(async move {
    for n in 0..config.evt_count / 2 {
        if n > 0 && n % 2_500 == 0 {
            println!("{id}: {} events persisted", n * 2);
        }
        counter
            .handle_command(Command::Inc(n as u64))
            .await
            .context("cannot handle Inc command")
            .unwrap();
        ...
    }
});
...

有关更多详细信息,请查看示例目录。

运行counter-nats示例

对于counter-nats示例,需要安装nats-server。在macOS上,只需使用Homebrew即可。

brew install nats-server

在运行示例之前,使用jetstream功能启动nats-server。

nats-server --jetstream

然后使用以下命令运行示例

RUST_LOG=info \
    CONFIG_DIR=examples/counter-nats/config \
    cargo run \
    --release \
    --package counter-nats

请注意,您可以通过更改位于examples/counter-nats/configdefaul.toml文件或通过环境变量覆盖配置设置来更改配置,例如APP__COUNTER__EVT_COUNT=42

RUST_LOG=info \
    APP__COUNTER__EVT_COUNT=42 \
    CONFIG_DIR=examples/counter-nats/config \
    cargo run \
    --release \
    --package counter-nats

运行counter-postgres示例

对于counter-postgres示例,需要安装PostgreSQL。在macOS上,只需使用Homebrew即可。

brew install postgresql@14

在运行示例之前,启动PostgreSQL。

brew services run postgresql@14

请确保您知道以下连接参数

  • 主机
  • 端口
  • 用户
  • 密码
  • 数据库名

您可以通过更改位于examples/counter-postgres/configdefault.toml文件或通过环境变量覆盖配置设置来更改配置,例如APP__EVT_LOG__DBNAME=testAPP__COUNTER__EVT_COUNT=42

然后使用以下命令运行示例

RUST_LOG=info \
    APP__EVT_LOG__DBNAME=test \
    APP__COUNTER__EVT_COUNT=42 \
    CONFIG_DIR=examples/counter-postgres/config \
    cargo run \
    --release \
    --package counter-postgres

许可证

此代码是开源软件,根据Apache 2.0许可证授权。

依赖关系

~3.5–5.5MB
~94K SLoC