#event-sourcing #events #logs #event-log #api-response

eventlogs

高性能、内置功能、适用于 Rust 的事件溯源

1 个不稳定版本

0.1.0 2024 年 8 月 2 日

357数据库接口 中排名

Download history 48/week @ 2024-07-27 43/week @ 2024-08-03

91 每月下载量

MIT 许可证

91KB
1.5K SLoC

高性能、内置功能、适用于 Rust 的事件溯源

CI

这个 crate 支持一种称为“事件溯源”的事务处理风格。这个名字可能有些晦涩,但基本思想非常简单:而不是存储可变记录,这些记录随状态变化而更新,事件溯源系统存储一系列不可变的描述状态变化的“事件”。当系统需要知道某个给定实体的状态时,它选择与该实体相关的所有事件,并将它们(也称为折叠)成一个“聚合”,这是该实体的当前状态。换句话说,事务的当前状态实际上是从该事务记录的事件中计算出来的。

由于它们计算所用的事件是不可变的,并且新事件总是追加到日志的末尾,因此可以积极缓存聚合。还可以通过选择并应用创建后记录的所有事件来快速更新缓存的聚合。

这种方法不仅提供了关于给定实体状态如何形成的完整审计跟踪,而且记录发生多次的相同实体的事件的简单方法。例如,一笔付款可能被部分捕获或退款多次,但每次这些事件都会有自己的货币金额和参考号的独特属性。

这种方法的缺点是它使得实体列表和查询更复杂:如果你存储单个事件并计算整体状态,你如何快速找到已全额退款的全部付款?大多数事件溯源系统通过将这些查询发送到包含聚合快照的单独、高度索引的数据库来处理这个问题。这些聚合通常在新事件写入事务处理数据库后异步重新计算和更新。它们最终是一致的,但列表/查询 API 在大型分布式系统中通常是这种情况。这也使得事务处理数据库的写入非常快,因为那些只是对最小索引表的插入。这种分工通常被称为“命令查询责任分离”或简称 CQRS。

⚠️ 注意:这个 crate 是函数性和测试过的,但尚未在生产环境中使用,所以使用时请自行承担风险!如果您想进行试点,请在 GitHub 上创建一个跟踪问题,我会很乐意帮助您。

内置功能

这是一个包含所有功能的库,提供在高吞吐量分布式系统中通常需要的特性。

  • 幂等性:当创建一个新的日志或向现有日志追加事件时,调用者可以包含一个唯一的 idempotency_key,以确保操作只发生一次,即使请求被重试。幂等性重放将返回一个包含之前记录的 LogId 和事件索引的 IdempotentReplay 错误,这样您可以轻松地检测并相应地处理它们。
  • 乐观并发:如果有多个服务实例同时尝试向同一日志追加新事件,只有一个将赢得竞争,其他将收到错误。输家可以重新减少日志以查看新事件对聚合的影响,确定其操作是否仍然相关,然后再次尝试。
  • 异步聚合缓存:当您减少日志时,结果聚合将异步写入缓存,例如Redis。随后的对 reduce() 的调用将重用该缓存的聚合,并且只检索/应用自聚合上次计算后记录的事件。这使后续的减少更快,而不会减慢您的代码。
  • 缓存策略:聚合默认情况下总是被缓存,但如果您想根据聚合属性控制缓存何时发生,您可以提供一个 CachingPolicy 的实现。例如,如果聚合的状态表明它将不再被加载,您可以选择不缓存它。
  • 事件流式传输和分页:在减少时,事件异步地从数据库流式传输而不是缓冲,以限制消耗的内存量。但库还提供了一个方便的方法,您可以使用它一次获取一页事件作为 Vector,这使得从您的服务的API中以JSON数组的形式返回它们更容易。

示例用法

use std::error::Error;
use eventlogs::{LogId, LogManager, LogManagerOptions,
    AppendOptions, Aggregate, EventRecord};
use eventlogs::stores::fake::FakeEventStore;
use eventlogs::caches::fake::FakeReductionCache;
use serde::{Serialize, Deserialize};

// A typical application of event-sourcing is the tracking of payments.
// A payment is really a series of events: authorization, increment,
// reversal, capture, clearing, refund, dispute, etc. Most events
// can occur several times, but each must capture distinct properties
// (e.g., the amount refunded). The overall state of the payment can 
// then be reduced from these events.

// Let's start by defining a struct to hold the initial payment request
// properties, which would include details about the card, cardholder,
// amount requested, etc. 
// To keep things simple, amounts will be tracked in minor units with an 
// assumed single currency.
#[derive(Debug, Default, PartialEq, Clone, Serialize)]
pub struct PaymentRequest {
    amount_requested: isize,
    // ... lots of other details ...
}

// Now let's define our events, which are typically variants in an enum.
// Since this is just an example, we'll define only a subset with only
// the most relevant properties. Timestamps are added automatically
// by this crate, so we don't need to define them in each event.
#[derive(Debug, PartialEq, Clone, Serialize)]
pub enum PaymentEvent {
    Requested {
        request: PaymentRequest,
    },
    Authorized {
        amount: isize,
        approval_code: Option<String>,
    },
    Captured {
        amount: isize,
        statement_descriptor: String,
    },
    Refunded {
        amount: isize,
        reference_number: String,
    },
}

// Now let's define the "aggregate" for these events, which is the overall
// state of the payment. This is what we will reduce from the events,
// and use to decide if the current API request or operation is allowable.
// Aggregates must implement/derive Default, and implement Aggregate.
#[derive(Debug, Default, PartialEq, Clone, Serialize)]
pub struct Payment {
    request: PaymentRequest,
    amount_approved: isize,
    amount_captured: isize,
    amount_refunded: isize,
}

impl Payment {
    pub fn amount_outstanding(&self) -> isize {
        self.amount_approved - self.amount_captured - self.amount_refunded
    }
}

// To make Payment an aggregate, implement the Aggregate trait, which 
// adds a method for applying each event to the aggregate's current state.
impl Aggregate for Payment {
    type Event = PaymentEvent;

    fn apply(&mut self, event_record: &impl EventRecord<Self::Event>) {
        match event_record.event() {
            PaymentEvent::Requested { request } => { 
                // If you don't want to clone, you could use 
                // std::mem::take() with a mutable `request`
                // as the event isn't written back to the database.
                self.request = request.clone() 
            }
            PaymentEvent::Authorized { amount, .. } => self.amount_approved += amount,
            PaymentEvent::Captured { amount, .. } => self.amount_captured += amount,
            PaymentEvent::Refunded { amount, .. } => self.amount_refunded += amount,
        }
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // To begin, create a LogManager, passing the event store and
    // reduction cache you want to use. This example uses testing fakes,
    // but you would use PostgresEventStore RedisReductionCache, configured
    // to point to your servers/clusters.
    let log_manager = LogManager::new(
        FakeEventStore::<PaymentEvent>::new(),
        FakeReductionCache::<Payment>::new(),
    );
    
    // Let's say we get an API request to create a new payment. We start
    // by creating a universally-unique log ID to track events related
    // to this payment. This will be serialized to a URL-safe string in
    // your API responses.
    let payment_id = LogId::new();

    // Then we create a new log, appending the Requested event.
    // To ensure this happens only once, even if your caller gets a network
    // timeout and retries their API request, have them pass a universally
    // unique idempotency key with their payment creation request. The
    // library will use that to ensure this log gets created only once.
    // See the LogManagerError::IdempotentReplay error for more details.
    let idempotency_key_from_caller = uuid::Uuid::now_v7().to_string();
    let options = AppendOptions {
        idempotency_key: Some(idempotency_key_from_caller),
        ..Default::default()
    };
    let req_event = PaymentEvent::Requested {
        request: PaymentRequest { amount_requested: 10000 },
    };
    let log_state = log_manager.create(&payment_id, &req_event, &options).await?;

    // We then talk to the payment gateway, and get an approved authorization...
    let auth_event = PaymentEvent::Authorized {
        amount: 10000,
        approval_code: Some("xyz123".to_string()),
    };
    
    // Append the authorized event to the log. You can use an idempotency key here
    // as well, but if you don't want to use them, just pass AppendOptions::default().
    // We pass the `log_state` that was returned from create() so that the library can
    // do optimistic concurrency and detect race conditions. If multiple processes try
    // to append to the same log at the same time, only one process will win, and the 
    // others will get a ConcurrentAppend error.
    log_manager.append(
        &payment_id, 
        log_state, 
        &auth_event, 
        &AppendOptions::default()).await?;

    // Now let's assume we shipped one of the items in the customer's order, so 
    // we get an API request to capture some of the payment. To know if this
    // is a valid request, we first reduce the log into a Payment to see if
    // the amount_outstanding is positive. The reduction will be automatically
    // cached asynchronously when we do this, so the next time we reduce, it
    // will only need to fetch new events and apply them.
    let reduction = log_manager.reduce(&payment_id).await?;
    assert!(reduction.aggregate().amount_outstanding() > 0);

    // Looks like we can do the capture, so let's record that event.
    let capture_event = PaymentEvent::Captured {
        amount: 4000,
        statement_descriptor: "Widgets, Inc".to_string(),
    };

    // You can pass a reduction instead of a log_state as the second argument
    // if you recently reduced the log. This again helps the library do optimistic
    // concurrency to detect race conditions. The reduction is consumed here since
    // it shouldn't be treated as a current reduction after this call, regardless 
    // of the Result returned.
    log_manager.append(
        &payment_id, 
        reduction, 
        &capture_event, 
        &AppendOptions::default()).await?;

    // Now if we reduce the log again, we should see the affect of the Capture.
    // This will use the cached reduction from before, and select/apply only 
    // the events that were appended after that reduction was created.
    let reduction = log_manager.reduce(&payment_id).await?;
    let payment = reduction.aggregate();
    assert_eq!(payment.amount_approved, 10000);
    assert_eq!(reduction.aggregate().amount_captured, 4000);
    assert_eq!(reduction.aggregate().amount_refunded, 0);
    assert_eq!(reduction.aggregate().amount_outstanding(), 6000);

    // Now let's assume the customer changed their mind and canceled the other
    // item in their order, so we need to refund that amount.
    let refund_event = PaymentEvent::Refunded {
        amount: 6000,
        reference_number: "abc789".to_string(),
    };
    log_manager.append(
        &payment_id, 
        reduction, 
        &refund_event, 
        &AppendOptions::default()).await?;

    // When we reduce, we should see that the amount outstanding is now zero.
    let reduction = log_manager.reduce(&payment_id).await?;
    let payment = reduction.aggregate();
    assert_eq!(payment.amount_approved, 10000);
    assert_eq!(reduction.aggregate().amount_captured, 4000);
    assert_eq!(reduction.aggregate().amount_refunded, 6000);
    assert_eq!(reduction.aggregate().amount_outstanding(), 0);

    // If you want to expose the raw events to your caller or
    // on show them on an admin page, you can get them a page 
    // at a time from the load() method. If you ask for one more 
    // than your page size, you'll know if there are more pages!
    let events = log_manager.load(&payment_id, 0, 101).await?;
    // In our case there should be only 4
    assert_eq!(events.len(), 4);
    let json = serde_json::to_string(&events)?;

    Ok(())
}

Cargo功能

此crate定义了以下Cargo/编译器功能

名称 描述 默认?
postgres-store 启用PostgresEventStore
redis-cache 启用RedisReductionCache

由于Postgres和Redis是非常常见的选择,因此这些功能默认开启。随着未来添加更多的EventStoreReductionCache实现,将定义相应的非默认功能。

依赖关系

~5–16MB
~190K SLoC