#event-stream #event-sourcing #disintegrate #events #business #aggregate #domain

disintegrate-postgres

Disintegrate PostgresDB实现。不建议直接使用。有关详细信息,请参阅disintegrate存储库。

12个版本 (7个重大变更)

0.8.0 2024年4月20日
0.7.3 2024年3月9日
0.7.2 2024年1月28日
0.6.0 2023年12月28日
0.5.0 2023年7月15日

#534 in 数据库接口

Download history 91/week @ 2024-04-26 106/week @ 2024-05-03 111/week @ 2024-05-10 48/week @ 2024-05-17 3/week @ 2024-05-24 12/week @ 2024-05-31 16/week @ 2024-06-07 10/week @ 2024-06-14 7/week @ 2024-06-21 18/week @ 2024-06-28 72/week @ 2024-07-05 6/week @ 2024-07-12 5/week @ 2024-07-19 103/week @ 2024-07-26 15/week @ 2024-08-02 1/week @ 2024-08-09

每月124次下载

MIT许可证

230KB
3.5K SLoC

Disintegrate

Crates.io docs

Disintegrate是一个Rust库,提供了一种从事件流构建领域对象的不同方法。虽然支持传统的聚合,但Disintegrate引入了一种新的方法,允许在建模业务规则时具有更高的灵活性和适应性。

为什么选择Disintegrate?

Disintegrate通过转向不依赖聚合来设计业务应用,提供了一个全新的视角。它使开发者能够直接从事件流构建业务概念,这允许构建去中心化和动态架构,这些架构可以随着时间的推移而演变。

通过利用事件流作为基础,Disintegrate使开发者能够构建模型,以捕捉业务事件的本质,而无需在聚合中多次版本化同一事件。这减少了重复和复杂性,导致代码更清洁、更易于维护。

关键特性

  • 事件流建模:Disintegrate允许直接从事件流构建业务概念,提供了一种更灵活和去中心化的方法。

  • 支持聚合:虽然推广了一种新的方法,但Disintegrate仍然支持传统的聚合,允许开发者逐步过渡或在应用程序中同时使用这两种方法。

  • 适应不断变化的业务规则:Disintegrate允许更容易地随时间演变和适应不断变化的业务规则。通过将模型与聚合解耦,开发者可以自由地调整和优化业务概念,而无需沉重的依赖。

  • 测试驱动开发(TDD):该库提供了一个用于鼓励使用测试驱动开发(TDD)的实用工具 TestHarness。开始您的新应用程序实现时,您应该首先编写描述您业务逻辑的测试。这样,您可以确保您的实现满足您的业务不变性。TestHarness 允许以 给定-当-然后 风格编写测试。其中,given 代表过去的事件,when 是一个决策,而 then 是由决策或错误引发的事件

        #[test]
        fn it_withdraws_an_amount() {
            disintegrate::TestHarness::given([
                DomainEvent::AccountOpened {
                    account_id: "some account".into(),
                },
                DomainEvent::AmountDeposited {
                    account_id: "some account".into(),
                    amount: 10,
                },
            ])
            .when(WithdrawAmount::new("some account".into(), 10))
            .then([DomainEvent::AmountWithdrawn {
                account_id: "some account".into(),
                amount: 10,
            }]);
        }
    
        #[test]
        fn it_should_not_withdraw_an_amount_when_the_balance_is_insufficient() {
            disintegrate::TestHarness::given([
                DomainEvent::AccountOpened {
                    account_id: "some account".into(),
                },
                DomainEvent::AmountDeposited {
                    account_id: "some account".into(),
                    amount: 10,
                },
                DomainEvent::AmountWithdrawn {
                    account_id: "some account".into(),
                    amount: 26,
                },
            ])
            .when(WithdrawAmount::new("some account".into(), 5))
            .then_err(Error::InsufficientBalance);
        }
    

使用方法

要将 Disintegrate 添加到您的项目中,请按照以下步骤操作

  1. 在您的 Cargo.toml 文件中将 disintegratedisintegrate-postgres 添加为依赖项

    [dependencies]
    disintegrate = "0.8.0"
    disintegrate-postgres = "0.8.0"
    
    • Disintegrate 提供了您可以根据项目需求启用的几个功能。您可以在 Cargo.toml 文件中按如下方式包含它们
    [dependencies]
    disintegrate = { version = "0.8.0", features = ["macros", "serde-prost"] }
    disintegrate-postgres = { version = "0.8.0", features = ["listener"] }
    
    • 宏功能允许使用 derive 宏简化事件实现。

    • 对于事件序列化和反序列化,Disintegrate 通过 Serde 生态系统支持不同的序列化格式。您可以通过包含相应的功能来启用所需的格式

      • 要启用 JSON 序列化,使用 serde-json 功能:features = ["serde-json"]
      • 要启用 Avro 序列化,使用 serde-avro 功能:features = ["serde-avro"]
      • 要启用 Prost 序列化,使用 serde-prost 功能:features = ["serde-prost"]
      • 要启用 Protocol Buffers 序列化,使用 serde-protobuf 功能:features = ["serde-protobuf"]
    • 如果您正在使用 PostgreSQL 事件存储后端并且想使用监听器机制,您可以启用 listener 功能:disintegrate-postgres = {version = "0.8.0", features = ["listener"]}

  2. 在您的应用程序中定义事件列表。您可以使用事件风暴技术来识别系统中发生的事件。以下是用 Disintegrate 定义事件的示例

    use disintegrate::Event;
    use serde::{Deserialize, Serialize};
    
    #[derive(Debug, Clone, PartialEq, Eq, Event, Serialize, Deserialize)]
    #[stream(UserEvent, [UserCreated])]
    #[stream(CartEvent, [ItemAdded, ItemRemoved, ItemUpdated, CouponApplied])]
    #[stream(CouponEvent, [CouponEmitted, CouponApplied])]
    pub enum DomainEvent {
        UserCreated {
            #[id]
            user_id: String,
            name: String,
        },
        ItemAdded {
            #[id]
            user_id: String,
            #[id]
            item_id: String,
            quantity: u32,
        },
        ItemRemoved {
            #[id]
            user_id: String,
            #[id]
            item_id: String,
        },
        ItemUpdated {
            #[id]
            user_id: String,
            #[id]
            item_id: String,
            new_quantity: u32,
        },
        CouponEmitted {
            #[id]
            coupon_id: String,
            quantity: u32,
        },
        CouponApplied {
            #[id]
            coupon_id: String,
            #[id]
            user_id: String,
        },
    }
    

    在这个例子中,我们使用#[derive(Event)]属性定义了一个枚举DomainEvent。这个枚举代表了应用程序中可能发生的各种事件。属性#[stream]指定了事件流,如UserEventCartEvent及其对应的变体。这允许您将事件组织到逻辑流中。字段上的属性#[id]允许您指定每个事件的域标识符,这些标识符用于过滤状态查询中的相关事件。

  3. 通过推导StateQuery特质创建一个状态查询,以从事件构建视图。为此,使用属性#[state_query]定义事件流,并使用属性#[id]注释包含标识符的字段。库使用注释的ID来过滤指定流中的事件,仅保留具有对应ID的事件。状态还必须实现StateMutate特质,它定义了如何聚合事件中的数据以构建状态。

    use crate::event::{CartEvent, CouponEvent, DomainEvent};
    use disintegrate::StateQuery;
    use disintegrate::{Decision, StateMutate};
    use serde::{Deserialize, Serialize};
    use std::collections::HashSet;
    use thiserror::Error;
    
    #[derive(Clone, Eq, Hash, PartialEq, Serialize, Deserialize)]
    pub struct Item {
        id: String,
        quantity: u32,
    }
    
    impl Item {
        fn new(id: String, quantity: u32) -> Self {
            Item { id, quantity }
        }
    }
    
    #[derive(Default, StateQuery, Clone, Serialize, Deserialize)]
    #[state_query(CartEvent)]
    pub struct Cart {
        #[id]
        user_id: String,
        items: HashSet<Item>,
        applied_coupon: Option<String>,
    }
    
    impl Cart {
        pub fn new(user_id: &str) -> Self {
            Self {
                user_id: user_id.into(),
                ..Default::default()
            }
        }
    }
    
    impl StateMutate for Cart {
        fn mutate(&mut self, event: Self::Event) {
            match event {
                CartEvent::ItemAdded {
                    item_id, quantity, ..
                } => {
                    self.items.insert(Item::new(item_id, quantity));
                }
                CartEvent::ItemRemoved { item_id, .. } => {
                    self.items.retain(|item| item.id != *item_id);
                }
                CartEvent::ItemUpdated {
                    item_id,
                    new_quantity,
                    ..
                } => {
                    self.items.replace(Item::new(item_id, new_quantity));
                }
                CartEvent::CouponApplied { coupon_id, .. } => {
                    self.applied_coupon = Some(coupon_id);
                }
            }
        }
    }
    

    在这个例子中,结构体Cart表示购物车的状态,并跟踪用户添加的项目。

  4. 创建一个实现Decision特质的结构体。这个结构体代表一个业务决策,负责验证输入并生成更改列表。生成的更改将由DecisionMaker存储在事件存储中。

    #[derive(Debug, Error)]
    pub enum CartError {
        // cart errors
    }
    
    pub struct AddItem {
        user_id: String,
        item_id: String,
        quantity: u32,
    }
    
    impl AddItem {
        pub fn new(user_id: String, item_id: String, quantity: u32) -> Self {
            Self {
                user_id,
                item_id,
                quantity,
            }
        }
    }
    
    /// Implement your business logic
    impl Decision for AddItem {
        type Event = CartEvent;
        type StateQuery = Cart;
        type Error = CartError;
    
        fn state_query(&self) -> Self::StateQuery {
            Cart::new(&self.user_id)
        }
    
        fn process(&self, _state: &Self::StateQuery) -> Result<Vec<Self::Event>, Self::Error> {
            // check your business constraints...
            Ok(vec![CartEvent::ItemAdded {
                user_id: self.user_id.clone(),
                item_id: self.item_id.to_string(),
                quantity: self.quantity,
            }])
        }
    }
    

    在提供的示例中,决策用作对从事件存储构建的状态执行的命令。一个Decision定义了StateQuery,该状态将使用事件存储中的事件进行修改。

    如果事件存储中没有找到事件,则使用默认的StateQuery作为决策的起点。这种情况发生在第一次做出决策时,没有历史数据可用于构建StateQuery

  5. 实例化一个事件存储,创建AddItem决策,并在DecisionMaker上调用make方法。

    mod cart;
    mod event;
    
    use cart::AddItem;
    use event::DomainEvent;
    
    use anyhow::{Ok, Result};
    use disintegrate::serde::json::Json;
    use disintegrate_postgres::PgEventStore;
    use sqlx::{postgres::PgConnectOptions, PgPool};
    
    #[tokio::main]
    async fn main() -> Result<()> {
        dotenv::dotenv().unwrap();
    
        // Create a PostgreSQL poll
        let connect_options = PgConnectOptions::new();
        let pool = PgPool::connect_with(connect_options).await?;
    
        // Create a serde for serialize and deserialize events
        let serde = Json::<DomainEvent>::default();
    
        // Create a PostgreSQL event store
        let event_store = PgEventStore::new(pool, serde).await?;
    
        // Create a Postgres DecisionMaker
        let decision_maker = disintegrate_postgres::decision_maker(event_store);
    
        // Make the decision. This performs the business decision and persists the changes into the
        // event store
        decision_maker
            .make(AddItem::new("user-1".to_string(), "item-1".to_string(), 4))
            .await?;
        Ok(())
    }
    

提供的示例已经演示了一个完整功能的事件源应用程序。但是,如果您的业务逻辑跨越多个聚合,您可以使用多状态查询从多个状态收集所有必需的数据。通常,确保多个聚合之间的不变性需要它们之间的策略。通过一个由StateQuery元组表示的多状态,您可以在单个Decision中验证所有的不变性。

#[derive(Default, StateQuery, Clone, Serialize, Deserialize)]
#[state_query(CouponEvent)]
pub struct Coupon {
    #[id]
    coupon_id: String,
    quantity: u32,
}

impl Coupon {
    pub fn new(coupon_id: &str) -> Self {
        Self {
            coupon_id: coupon_id.to_string(),
            ..Default::default()
        }
    }
}

impl StateMutate for Coupon {
    fn mutate(&mut self, event: Self::Event) {
        match event {
            CouponEvent::CouponEmitted { quantity, .. } => self.quantity += quantity,
            CouponEvent::CouponApplied { .. } => self.quantity -= 1,
        }
    }
}

pub struct ApplyCoupon {
    user_id: String,
    coupon_id: String,
}

impl ApplyCoupon {
    pub fn new(user_id: String, coupon_id: String) -> Self {
        Self { user_id, coupon_id }
    }
}

impl Decision for ApplyCoupon {
    type Event = DomainEvent;
    type StateQuery = (Cart, Coupon);
    type Error = CartError;

    fn state_query(&self) -> Self::StateQuery {
        (Cart::new(&self.user_id), Coupon::new(&self.coupon_id))
    }

    fn process(&self, (cart, coupon): &Self::StateQuery) -> Result<Vec<Self::Event>, Self::Error> {
        // check your business constraints...
        if cart.applied_coupon.is_some() {
            return Err(CartError::CouponAlreadyApplied);
        }
        if coupon.quantity == 0 {
            return Err(CartError::CouponNotAvailable);
        }
        Ok(vec![DomainEvent::CouponApplied {
            coupon_id: self.coupon_id.clone(),
            user_id: self.user_id.clone(),
        }])
    }
}

查看examples文件夹以更好地了解如何在实际应用程序中使用Disintegrate。

许可证

本项目根据MIT许可证授权。

贡献

欢迎贡献!如果您发现任何问题或对改进有建议,请随时打开一个问题或提交一个拉取请求。

请确保在向本项目贡献时遵循贡献指南

感谢您帮助我们让Disintegrate变得更好!

致谢

Disintegrate受到了Sara Pellegrini在《杀死聚合!》演讲中提出的思想的启发,探索了从事件流建模业务概念的新可能性。我们想对演讲者分享她的见解并激发软件开发社区的创新思维表示感谢。

在保留视频中的核心概念的同时,Disintegrate引入了额外的功能,丰富了开发者体验,并将这些想法付诸实践。

  1. Postgres实现:Disintegrate提供了视频中所讨论概念的工作实现。

  2. 强大的查询系统:在视频中,查询是通过域标识符和事件类型的列表构建的。Disintegrate将这一能力提升到了新的水平,使开发者能够创建更复杂的查询,以解决更高级的用例。

  3. 验证查询:虽然承认了视频中使用查询验证状态完整性的价值,但Disintegrate在此方面更进一步,增强了这一功能。在视频中,相同的查询用于构建状态和append API。然而,Disintegrate引入了一个名为Validation的强大功能,它允许开发者在将新事件存储在事件存储时对决策无效化进行更细粒度的控制。这在如银行示例等场景中特别有用。例如,在做出取款决策时,需要计算账户余额,需要将存款事件包含在状态中。然而,即使存款事件改变了状态,也不应使取款决策无效。在这种情况下,必须在构建状态所需的事件子集上执行验证。

  4. 决策概念:Disintegrate引入了Decision的概念,它是开发应用程序业务逻辑的构建块,同时遵循SOLID原则。一个Decision可以看作是一个专注于特定用例的小聚合。因此,当出现新的用例时,可以通过添加一个新的Decision来扩展应用程序,而不需要修改现有的决策。

  5. Multi-StateQuery:通过允许单个Decision使用多个StateQuery,我们获得了在不同决策之间重用这些查询以及组合已定义的StateQuery的灵活性。这种能力使我们能够实现快照机制,这些机制可以在未来的版本中使用不同的和改进的策略进一步优化。

依赖关系

~36–53MB
~885K SLoC