#事件溯源 #CQRS #actor-model #事件

已删除 riker-es-macros

riker-es 的过程宏

0.1.0 2020年8月20日

#65#actor-model


actor-es 中使用

GPL-3.0-or-later

2KB

ActorES

这是一个旨在更实用而不是“照本宣科”的事件溯源和 CQRS 抽象。它建立在轻量级 actor 框架 Riker 之上。
警告 目前它几乎是 alpha 质量,功能尚不完整,API 仍在变化。

Overview draft

如何使用

此库为 Riker 添加了一些额外的与事件溯源相关的概念,最显著的是 实体,它代表一个处理外部命令和业务逻辑的领域。使用 Riker 的常规机制(参见 创建 actor)通过 Entity actor 创建。

let entity = actor_system.actor_of_args::<Entity<MyEntity>, _>("my-entity", SomeArgs)`;

定义您的实体

假设 MyEntity 是一个用户提供的实现 ES 特质的 struct。

struct MyEntity;

#[async-trait]
impl ES for MyEntity {
  type Args = SomeArgs; // the arguments passed to the constructor
  type Model = MyData; // The "model" is the data that is to be persisted along with its changes.
  type Cmd = MyEntityCommands; // The external command or commands(often in the form of an enum) this entity can handle.
  // type Event = (); // TODO: Similar to commands but for handling events emitted by other entities.
  type Error = MyEntityError; // Error produced by the handler functions.
  
  // Used to construct an entity, receives the `Entity` actor's context 
  // to be able to create other actors and hold their references
  fn new(_cx: &Context<CQRS<Self::Cmd>>, _args: Self::Args) -> Self {
    MyEntity
  }
  
  async fn handle_command(&mut self, cmd: Self::Cmd) -> Result<Self> {
    // do your command handling here and return a commit that will be persited
    // to the configured store(a simple memory store atm).
    
    // when entities are created for the first time
    Ok(Event::Create(MyData).into())
    // or to update an existing entity
    // Ok(Event::Change("some id".into(), MyDataUpdate).into())
  }
}

模型和重新加载数据

定义您的聚合(数据模型),它可以处理的更新以及如何应用它们。

struct MyData {
  some_id: String,
  some_field: String,
  other_field: Option<String>
}

enum MyDataUpdate {
  TheChange(String, String),
  LittleChange(String),
}

impl Model for MyData {
  type Change = MyDataUpdate;
  
  fn id(&self) -> EntityId {
    self.some_id.into()
  }
  
  fn apply_change(&mut self, change: Self::Change) {
    match change {
      MyDataUpdate::TheChange(field, other) => {
        self.some_field = field;
        self.other_field = Some(other);
      },
      MyDataUpdate::LittleChange(field) => {
        self.some_field = field;
      },
    }
  }
}

分发命令和查询

RikerES 提供了一个实体管理器,当实体被 register 时,它会创建和管理您的实体。它提供了一个更简单的 API 来发送命令和查询到正确的实体。

let mgr = Manager::new(actor_system).register::<MyEntity, _>(SomeStore::new(), SomeArgs);

let id = mgr.command(MyEntityCommands::DoSomething).await;
let data = mgr.query::<MyEntity>(id).await.unwrap();

同时(在我找到使它更自动化的方法之前),您还必须为实体和命令实现 EntityName,以便能够将命令分发到正确的实体。

impl EntityName for MyEntity {
  const NAME: &'static str = "my-entity";
}
impl EntityName for MyEntityCommands {
  const NAME: &'static str = "my-entity";
}

依赖项

~1.5MB
~35K SLoC