高性能、内置功能、适用于 Rust 的事件溯源

0.1.0 2024 年 8 月 2 日

这个 crate 支持一种称为“事件溯源”的事务处理风格。这个名字可能有些晦涩,但基本思想非常简单:而不是存储可变记录,这些记录随状态变化而更新,事件溯源系统存储一系列不可变的描述状态变化的“事件”。当系统需要知道某个给定实体的状态时,它选择与该实体相关的所有事件,并将它们(也称为折叠)成一个“聚合”,这是该实体的当前状态。换句话说,事务的当前状态实际上是从该事务记录的事件中计算出来的。



这种方法的缺点是它使得实体列表和查询更复杂:如果你存储单个事件并计算整体状态,你如何快速找到已全额退款的全部付款?大多数事件溯源系统通过将这些查询发送到包含聚合快照的单独、高度索引的数据库来处理这个问题。这些聚合通常在新事件写入事务处理数据库后异步重新计算和更新。它们最终是一致的,但列表/查询 API 在大型分布式系统中通常是这种情况。这也使得事务处理数据库的写入非常快,因为那些只是对最小索引表的插入。这种分工通常被称为“命令查询责任分离”或简称 CQRS。

⚠️ 注意:这个 crate 是函数性和测试过的,但尚未在生产环境中使用,所以使用时请自行承担风险!如果您想进行试点,请在 GitHub 上创建一个跟踪问题,我会很乐意帮助您。



  • 幂等性:当创建一个新的日志或向现有日志追加事件时,调用者可以包含一个唯一的 idempotency_key,以确保操作只发生一次,即使请求被重试。幂等性重放将返回一个包含之前记录的 LogId 和事件索引的 IdempotentReplay 错误,这样您可以轻松地检测并相应地处理它们。
  • 乐观并发:如果有多个服务实例同时尝试向同一日志追加新事件,只有一个将赢得竞争,其他将收到错误。输家可以重新减少日志以查看新事件对聚合的影响,确定其操作是否仍然相关,然后再次尝试。
  • 异步聚合缓存:当您减少日志时,结果聚合将异步写入缓存,例如Redis。随后的对 reduce() 的调用将重用该缓存的聚合,并且只检索/应用自聚合上次计算后记录的事件。这使后续的减少更快,而不会减慢您的代码。
  • 缓存策略:聚合默认情况下总是被缓存,但如果您想根据聚合属性控制缓存何时发生,您可以提供一个 CachingPolicy 的实现。例如,如果聚合的状态表明它将不再被加载,您可以选择不缓存它。
  • 事件流式传输和分页:在减少时,事件异步地从数据库流式传输而不是缓冲,以限制消耗的内存量。但库还提供了一个方便的方法,您可以使用它一次获取一页事件作为 Vector,这使得从您的服务的API中以JSON数组的形式返回它们更容易。


use std::error::Error;
use eventlogs::{LogId, LogManager, LogManagerOptions,
    AppendOptions, Aggregate, EventRecord};
use eventlogs::stores::fake::FakeEventStore;
use eventlogs::caches::fake::FakeReductionCache;
use serde::{Serialize, Deserialize};

// A typical application of event-sourcing is the tracking of payments.
// A payment is really a series of events: authorization, increment,
// reversal, capture, clearing, refund, dispute, etc. Most events
// can occur several times, but each must capture distinct properties
// (e.g., the amount refunded). The overall state of the payment can 
// then be reduced from these events.

// Let's start by defining a struct to hold the initial payment request
// properties, which would include details about the card, cardholder,
// amount requested, etc. 
// To keep things simple, amounts will be tracked in minor units with an 
// assumed single currency.
#[derive(Debug, Default, PartialEq, Clone, Serialize)]
pub struct PaymentRequest {
    amount_requested: isize,
    // ... lots of other details ...

// Now let's define our events, which are typically variants in an enum.
// Since this is just an example, we'll define only a subset with only
// the most relevant properties. Timestamps are added automatically
// by this crate, so we don't need to define them in each event.
#[derive(Debug, PartialEq, Clone, Serialize)]
pub enum PaymentEvent {
    Requested {
        request: PaymentRequest,
    Authorized {
        amount: isize,
        approval_code: Option<String>,
    Captured {
        amount: isize,
        statement_descriptor: String,
    Refunded {
        amount: isize,
        reference_number: String,

// Now let's define the "aggregate" for these events, which is the overall
// state of the payment. This is what we will reduce from the events,
// and use to decide if the current API request or operation is allowable.
// Aggregates must implement/derive Default, and implement Aggregate.
#[derive(Debug, Default, PartialEq, Clone, Serialize)]
pub struct Payment {
    request: PaymentRequest,
    amount_approved: isize,
    amount_captured: isize,
    amount_refunded: isize,

impl Payment {
    pub fn amount_outstanding(&self) -> isize {
        self.amount_approved - self.amount_captured - self.amount_refunded

// To make Payment an aggregate, implement the Aggregate trait, which 
// adds a method for applying each event to the aggregate's current state.
impl Aggregate for Payment {
    type Event = PaymentEvent;

    fn apply(&mut self, event_record: &impl EventRecord<Self::Event>) {
        match event_record.event() {
            PaymentEvent::Requested { request } => { 
                // If you don't want to clone, you could use 
                // std::mem::take() with a mutable `request`
                // as the event isn't written back to the database.
                self.request = request.clone() 
            PaymentEvent::Authorized { amount, .. } => self.amount_approved += amount,
            PaymentEvent::Captured { amount, .. } => self.amount_captured += amount,
            PaymentEvent::Refunded { amount, .. } => self.amount_refunded += amount,

async fn main() -> Result<(), Box<dyn Error>> {
    // To begin, create a LogManager, passing the event store and
    // reduction cache you want to use. This example uses testing fakes,
    // but you would use PostgresEventStore RedisReductionCache, configured
    // to point to your servers/clusters.
    let log_manager = LogManager::new(
    // Let's say we get an API request to create a new payment. We start
    // by creating a universally-unique log ID to track events related
    // to this payment. This will be serialized to a URL-safe string in
    // your API responses.
    let payment_id = LogId::new();

    // Then we create a new log, appending the Requested event.
    // To ensure this happens only once, even if your caller gets a network
    // timeout and retries their API request, have them pass a universally
    // unique idempotency key with their payment creation request. The
    // library will use that to ensure this log gets created only once.
    // See the LogManagerError::IdempotentReplay error for more details.
    let idempotency_key_from_caller = uuid::Uuid::now_v7().to_string();
    let options = AppendOptions {
        idempotency_key: Some(idempotency_key_from_caller),
    let req_event = PaymentEvent::Requested {
        request: PaymentRequest { amount_requested: 10000 },
    let log_state = log_manager.create(&payment_id, &req_event, &options).await?;

    // We then talk to the payment gateway, and get an approved authorization...
    let auth_event = PaymentEvent::Authorized {
        amount: 10000,
        approval_code: Some("xyz123".to_string()),
    // Append the authorized event to the log. You can use an idempotency key here
    // as well, but if you don't want to use them, just pass AppendOptions::default().
    // We pass the `log_state` that was returned from create() so that the library can
    // do optimistic concurrency and detect race conditions. If multiple processes try
    // to append to the same log at the same time, only one process will win, and the 
    // others will get a ConcurrentAppend error.

    // Now let's assume we shipped one of the items in the customer's order, so 
    // we get an API request to capture some of the payment. To know if this
    // is a valid request, we first reduce the log into a Payment to see if
    // the amount_outstanding is positive. The reduction will be automatically
    // cached asynchronously when we do this, so the next time we reduce, it
    // will only need to fetch new events and apply them.
    let reduction = log_manager.reduce(&payment_id).await?;
    assert!(reduction.aggregate().amount_outstanding() > 0);

    // Looks like we can do the capture, so let's record that event.
    let capture_event = PaymentEvent::Captured {
        amount: 4000,
        statement_descriptor: "Widgets, Inc".to_string(),

    // You can pass a reduction instead of a log_state as the second argument
    // if you recently reduced the log. This again helps the library do optimistic
    // concurrency to detect race conditions. The reduction is consumed here since
    // it shouldn't be treated as a current reduction after this call, regardless 
    // of the Result returned.

    // Now if we reduce the log again, we should see the affect of the Capture.
    // This will use the cached reduction from before, and select/apply only 
    // the events that were appended after that reduction was created.
    let reduction = log_manager.reduce(&payment_id).await?;
    let payment = reduction.aggregate();
    assert_eq!(payment.amount_approved, 10000);
    assert_eq!(reduction.aggregate().amount_captured, 4000);
    assert_eq!(reduction.aggregate().amount_refunded, 0);
    assert_eq!(reduction.aggregate().amount_outstanding(), 6000);

    // Now let's assume the customer changed their mind and canceled the other
    // item in their order, so we need to refund that amount.
    let refund_event = PaymentEvent::Refunded {
        amount: 6000,
        reference_number: "abc789".to_string(),

    // When we reduce, we should see that the amount outstanding is now zero.
    let reduction = log_manager.reduce(&payment_id).await?;
    let payment = reduction.aggregate();
    assert_eq!(payment.amount_approved, 10000);
    assert_eq!(reduction.aggregate().amount_captured, 4000);
    assert_eq!(reduction.aggregate().amount_refunded, 6000);
    assert_eq!(reduction.aggregate().amount_outstanding(), 0);

    // If you want to expose the raw events to your caller or
    // on show them on an admin page, you can get them a page 
    // at a time from the load() method. If you ask for one more 
    // than your page size, you'll know if there are more pages!
    let events = log_manager.load(&payment_id, 0, 101).await?;
    // In our case there should be only 4
    assert_eq!(events.len(), 4);
    let json = serde_json::to_string(&events)?;




名称 描述 默认?
postgres-store 启用PostgresEventStore
redis-cache 启用RedisReductionCache



