#postgresql #event-sourcing #ddd #event-store #database

eventastic_postgres

eventastic 的一个示例 PostgreSQL 事件存储

2 个版本

0.3.1 2024 年 2 月 26 日
0.3.0 2024 年 2 月 25 日
0.2.1 2024 年 2 月 25 日
0.2.0 2023 年 7 月 9 日
0.1.0 2023 年 7 月 6 日

#557 in 异步

Download history 4/week @ 2024-03-07 19/week @ 2024-03-28 17/week @ 2024-04-04

99 每月下载次数

MIT 许可证

47KB
860 代码行

Eventastic

这是 Eventually-rs 的一个有观点的分支。

Eventastic 强制使用事务,处理幂等性并去除命令处理抽象。

示例

examples/bank 中查看完整示例

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
    // Setup postgres repo
    let repository = get_repository().await;

    // Run our side effects handler in a background task
    tokio::spawn(async {
        let repository = get_repository().await;

        let _ = repository
            .start_outbox(SideEffectContext {}, std::time::Duration::from_secs(5))
            .await;
    });

    // Start transaction
    let mut transaction = repository.begin_transaction().await?;

    let account_id = Uuid::new_v4();

    let event_id = Uuid::new_v4();

    let add_event_id = Uuid::new_v4();

    // Open a bank account
    let event = AccountEvent::Open {
        event_id,
        account_id,
        starting_balance: 21,
        email: "[email protected]".into(),
    };

    let mut account = Account::record_new(event)?;

    // Add funds to newly created account
    let add_event = AccountEvent::Add {
        event_id: add_event_id,
        amount: 324,
    };

    // Record add fund events.
    // Record takes in the transaction, as it does idempotency checks with the db.
    account
        .record_that(&mut transaction, add_event.clone())
        .await?;

    // Save uncommitted events and side effects in the db.
    transaction.store(&mut account).await?;

    // Commit the transaction
    transaction.commit().await?;

    // Get the aggregate from the db
    let mut transaction = repository.begin_transaction().await?;

    let mut account: Context<Account> = transaction.get(&account_id).await?;

    // Check our balance is correct
    assert_eq!(account.state().balance, 345);

    // Trying to apply the same event id but with different content gives us an IdempotencyError
    let changed_add_event = AccountEvent::Add {
        event_id: add_event_id,
        amount: 123,
    };

    let err = account
        .record_that(&mut transaction, changed_add_event)
        .await
        .expect_err("failed to get error");

    assert!(matches!(err, RecordError::IdempotencyError(_, _)));

    // Applying the already applied event, will be ignored and return Ok
    account.record_that(&mut transaction, add_event).await?;

    transaction.commit().await?;

    let mut transaction = repository.begin_transaction().await?;

    let account: Context<Account> = transaction.get(&account_id).await?;

    // Balance hasn't changed since the event wasn't actually applied
    assert_eq!(account.state().balance, 345);

    println!("Got account {account:?}");

    tokio::time::sleep(std::time::Duration::from_secs(30)).await;
    Ok(())
}

依赖关系

~37–51MB
~880K SLoC