7 个版本

新增 0.2.3 2024年8月20日
0.2.2 2024年7月3日
0.2.1 2024年6月26日
0.1.0 2024年6月2日
0.0.2 2024年5月29日

#538数据库接口

Download history 371/week @ 2024-05-27 63/week @ 2024-06-03 131/week @ 2024-06-10 16/week @ 2024-06-17 136/week @ 2024-06-24 112/week @ 2024-07-01 7/week @ 2024-07-08 15/week @ 2024-07-15 23/week @ 2024-07-22 39/week @ 2024-07-29 5/week @ 2024-08-05 38/week @ 2024-08-12

每月105 次下载

自定义许可

58KB
1.5K SLoC

evented

Crates.io license

evented 是一个用于 事件源 的库,其中状态更改作为事件持久化。它起源于 eventsourced,但通过与 PostgreSQL 紧密耦合以及其他功能(如事件元数据)提供了额外的强一致性功能。

evented 的核心抽象是 EventSourcedEntity,可以通过 ID 进行识别:一个 Entity 实现定义其状态和事件处理,相关的 Command 实现定义其命令处理。

当事件源实体接收到一个命令时,相应的命令处理器会被调用,它要么返回要持久化的事件序列加上元数据,要么返回一个拒绝。如果返回了事件,这些事件将进行事务性持久化,从而也调用了一个可选的 EventListener。通过每个实体的版本进行乐观锁来处理并发。

创建事件源实体时,其事件会被加载,其状态通过应用它们到事件处理器来构建。然后,这个状态被命令处理器用来决定是否应该接受命令——结果是持久化和应用事件或拒绝。

也可以创建异步的 Projection。这些通过填充它们的视图并存储已处理事件的序列号来事务性处理事件。目前只支持按类型投影的事件。

以下是一个简单的测试示例,它使用事件监听器来构建一个同步和一致的观点。实际上,这个视图可能不需要一致性,但现实世界中对一致视图的实际用例包括唯一性检查,例如用户实体的电子邮件地址。

#[derive(Debug, Default, PartialEq, Eq)]
pub struct Counter(u64);

impl Entity for Counter {
    type Id = Uuid;
    type Event = Event;
    type Metadata = Metadata;

    const TYPE_NAME: &'static str = "counter";

    fn handle_event(&mut self, event: Self::Event) {
        match event {
            Event::Increased { inc, .. } => self.0 += inc,
            Event::Decreased { dec, .. } => self.0 -= dec,
        }
    }
}

#[derive(Debug, Serialize, Deserialize)]
pub enum Event {
    Increased { id: Uuid, inc: u64 },
    Decreased { id: Uuid, dec: u64 },
}

#[derive(Debug)]
pub struct Increase(u64);

impl Command for Increase {
    type Entity = Counter;
    type Rejection = Overflow;

    async fn handle(
        self,
        id: &<Self::Entity as Entity>::Id,
        entity: &Self::Entity,
    ) -> Result<
        Vec<
            impl Into<
                EventWithMetadata<
                    <Self::Entity as Entity>::Event,
                    <Self::Entity as Entity>::Metadata,
                >,
            >,
        >,
        Self::Rejection,
    > {
        let Increase(inc) = self;
        if entity.0 > u64::MAX - inc {
            Err(Overflow)
        } else {
            let increased = Event::Increased { id: *id, inc };
            let metadata = Metadata {
                timestamp: OffsetDateTime::now_utc(),
            };
            Ok(vec![increased.with_metadata(metadata)])
        }
    }
}

#[derive(Debug, PartialEq, Eq)]
pub struct Overflow;

#[derive(Debug)]
pub struct Decrease(u64);

impl Command for Decrease {
    type Entity = Counter;
    type Rejection = Underflow;

    async fn handle(
        self,
        id: &<Self::Entity as Entity>::Id,
        entity: &Self::Entity,
    ) -> Result<
        Vec<
            impl Into<
                EventWithMetadata<
                    <Self::Entity as Entity>::Event,
                    <Self::Entity as Entity>::Metadata,
                >,
            >,
        >,
        Self::Rejection,
    > {
        let Decrease(dec) = self;
        if entity.0 < dec {
            Err::<Vec<_>, Underflow>(Underflow)
        } else {
            let decreased = Event::Decreased { id: *id, dec };
            let metadata = Metadata {
                timestamp: OffsetDateTime::now_utc(),
            };
            Ok(vec![decreased.with_metadata(metadata)])
        }
    }
}

#[derive(Debug, PartialEq, Eq)]
pub struct Underflow;

#[derive(Debug, Serialize, Deserialize)]
pub struct Metadata {
    timestamp: OffsetDateTime,
}

struct Listener;

impl EventListener<Event> for Listener {
    async fn listen(
        &mut self,
        event: &Event,
        tx: &mut Transaction<'_, sqlx::Postgres>,
    ) -> Result<(), BoxError> {
        match event {
            Event::Increased { id, inc } => {
                let value = sqlx::query("SELECT value FROM counters WHERE id = $1")
                    .bind(id)
                    .fetch_optional(&mut **tx)
                    .await
                    .map_err(Box::new)?
                    .map(|row| row.try_get::<i64, _>(0))
                    .transpose()?;
                match value {
                    Some(value) => {
                        sqlx::query("UPDATE counters SET value = $1 WHERE id = $2")
                            .bind(value + *inc as i64)
                            .bind(id)
                            .execute(&mut **tx)
                            .await
                            .map_err(Box::new)?;
                    }

                    None => {
                        sqlx::query("INSERT INTO counters VALUES ($1, $2)")
                            .bind(id)
                            .bind(*inc as i64)
                            .execute(&mut **tx)
                            .await
                            .map_err(Box::new)?;
                    }
                }
                Ok(())
            }

            _ => Ok(()),
        }
    }
}

更多示例可以在 示例目录 中找到。

许可

此代码是开源软件,根据 Apache 2.0 许可证 许可。

依赖关系

~14–30MB
~429K SLoC