#事件存储 #序列化 #事件溯源 #disintegrate #业务 #应用 #ddd

disintegrate-serde

Disintegrate事件存储的序列化和反序列化库。不建议直接使用。请参考 disintegrate 包以获取详细信息。

8个版本 (4个重大更改)

0.8.0 2024年4月20日
0.7.3 2024年3月9日
0.7.1 2024年1月4日
0.6.0 2023年12月28日
0.1.0 2023年5月27日

#522编码

Download history 104/week @ 2024-04-22 90/week @ 2024-04-29 146/week @ 2024-05-06 64/week @ 2024-05-13 17/week @ 2024-05-20 3/week @ 2024-05-27 24/week @ 2024-06-03 22/week @ 2024-06-10 8/week @ 2024-06-17 14/week @ 2024-06-24 21/week @ 2024-07-01 9/week @ 2024-07-08 5/week @ 2024-07-15 5/week @ 2024-07-22 74/week @ 2024-07-29 6/week @ 2024-08-05

每月91 次下载
2 个 包中使用

MIT 许可证

33KB
276

Disintegrate

Crates.io docs

Disintegrate是一个Rust库,它提供了一种从事件流中构建领域对象的替代方法。虽然支持传统聚合,但Disintegrate引入了一种新的方法,允许在建模业务规则时具有更大的灵活性和适应性。

为什么选择Disintegrate?

Disintegrate通过从事件流中直接构建业务概念,为设计业务应用提供了新的视角。这种方法允许开发人员构建去中心化和动态的架构,这些架构可以随时间发展。

通过利用事件流作为基础,Disintegrate使开发人员能够构建模型,捕捉业务事件的本质,而无需在聚合中重复相同的事件的多个版本。这减少了重复和复杂性,从而产生了更干净、更易于维护的代码。

主要特性

  • 事件流建模:Disintegrate允许直接从事件流中构建业务概念,提供了一种更灵活和去中心化的方法。

  • 支持聚合:虽然推广了新的方法,但Disintegrate仍然支持传统聚合,允许开发人员逐步过渡或在应用程序中同时使用这两种方法。

  • 适应不断变化的业务规则:Disintegrate允许更轻松地随时间演变和适应不断变化的业务规则。通过将模型与聚合解耦,开发人员可以自由地调整和细化业务概念,而不需要沉重的依赖。

  • 测试驱动开发(TDD):该库提供了一个TestHarness实用工具,以鼓励使用测试驱动开发(TDD)。要开始实现您的新应用程序,您应该首先编写描述您业务逻辑的测试。这样,您可以确保您的实现满足您的业务不变量。TestHarness允许以给定的-当-那么风格编写测试。其中given代表过去事件,when是一个决策,而then是决策或错误产生的事件

        #[test]
        fn it_withdraws_an_amount() {
            disintegrate::TestHarness::given([
                DomainEvent::AccountOpened {
                    account_id: "some account".into(),
                },
                DomainEvent::AmountDeposited {
                    account_id: "some account".into(),
                    amount: 10,
                },
            ])
            .when(WithdrawAmount::new("some account".into(), 10))
            .then([DomainEvent::AmountWithdrawn {
                account_id: "some account".into(),
                amount: 10,
            }]);
        }
    
        #[test]
        fn it_should_not_withdraw_an_amount_when_the_balance_is_insufficient() {
            disintegrate::TestHarness::given([
                DomainEvent::AccountOpened {
                    account_id: "some account".into(),
                },
                DomainEvent::AmountDeposited {
                    account_id: "some account".into(),
                    amount: 10,
                },
                DomainEvent::AmountWithdrawn {
                    account_id: "some account".into(),
                    amount: 26,
                },
            ])
            .when(WithdrawAmount::new("some account".into(), 5))
            .then_err(Error::InsufficientBalance);
        }
    

使用方法

要将Disintegrate添加到您的项目中,请按照以下步骤操作

  1. disintegratedisintegrate-postgres作为依赖项添加到您的Cargo.toml文件中

    [dependencies]
    disintegrate = "0.8.0"
    disintegrate-postgres = "0.8.0"
    
    • Disintegrate提供了一些功能,您可以根据项目需求启用它们。您可以在Cargo.toml文件中如下所示包含它们
    [dependencies]
    disintegrate = { version = "0.8.0", features = ["macros", "serde-prost"] }
    disintegrate-postgres = { version = "0.8.0", features = ["listener"] }
    
    • 宏功能允许使用derive宏简化事件实现。

    • 对于事件序列化和反序列化,Disintegrate通过Serde生态系统支持不同的序列化格式。您可以通过包含相应的功能来启用所需的格式

      • 要启用JSON序列化,请使用serde-json功能:features = ["serde-json"]
      • 要启用Avro序列化,请使用serde-avro功能:features = ["serde-avro"]
      • 要启用Prost序列化,请使用serde-prost功能:features = ["serde-prost"]
      • 要启用Protocol Buffers序列化,请使用serde-protobuf功能:features = ["serde-protobuf"]
    • 如果您正在使用PostgreSQL事件存储后端并希望使用监听器机制,您可以通过启用listener功能来启用它:disintegrate-postgres = {version = "0.8.0", features = ["listener"]}

  2. 在您的应用程序中定义事件列表。您可以使用事件风暴技术来识别系统中发生的事件。以下是一个使用Disintegrate定义事件的示例

    use disintegrate::Event;
    use serde::{Deserialize, Serialize};
    
    #[derive(Debug, Clone, PartialEq, Eq, Event, Serialize, Deserialize)]
    #[stream(UserEvent, [UserCreated])]
    #[stream(CartEvent, [ItemAdded, ItemRemoved, ItemUpdated, CouponApplied])]
    #[stream(CouponEvent, [CouponEmitted, CouponApplied])]
    pub enum DomainEvent {
        UserCreated {
            #[id]
            user_id: String,
            name: String,
        },
        ItemAdded {
            #[id]
            user_id: String,
            #[id]
            item_id: String,
            quantity: u32,
        },
        ItemRemoved {
            #[id]
            user_id: String,
            #[id]
            item_id: String,
        },
        ItemUpdated {
            #[id]
            user_id: String,
            #[id]
            item_id: String,
            new_quantity: u32,
        },
        CouponEmitted {
            #[id]
            coupon_id: String,
            quantity: u32,
        },
        CouponApplied {
            #[id]
            coupon_id: String,
            #[id]
            user_id: String,
        },
    }
    

    在这个示例中,我们使用属性#[derive(Event)]定义了一个枚举DomainEvent。这个枚举表示应用程序中可能发生的各种事件。属性#[stream]指定了事件流,例如UserEventCartEvent及其对应的变化。这允许您将事件组织成逻辑流。字段上的属性#[id]允许您指定每个事件的域标识符,这些标识符用于过滤状态查询的相关事件。

  3. 通过推导StateQuery特质来创建一个状态查询,以从事件构建视图。为此,使用属性#[state_query]定义事件流,并用#[id]注释包含标识符的字段。库使用注解的ID来过滤指定流中的事件,仅保留具有相应ID的事件。状态还必须实现StateMutate特质,该特质定义了事件中包含的数据如何聚合以构建状态。

    use crate::event::{CartEvent, CouponEvent, DomainEvent};
    use disintegrate::StateQuery;
    use disintegrate::{Decision, StateMutate};
    use serde::{Deserialize, Serialize};
    use std::collections::HashSet;
    use thiserror::Error;
    
    #[derive(Clone, Eq, Hash, PartialEq, Serialize, Deserialize)]
    pub struct Item {
        id: String,
        quantity: u32,
    }
    
    impl Item {
        fn new(id: String, quantity: u32) -> Self {
            Item { id, quantity }
        }
    }
    
    #[derive(Default, StateQuery, Clone, Serialize, Deserialize)]
    #[state_query(CartEvent)]
    pub struct Cart {
        #[id]
        user_id: String,
        items: HashSet<Item>,
        applied_coupon: Option<String>,
    }
    
    impl Cart {
        pub fn new(user_id: &str) -> Self {
            Self {
                user_id: user_id.into(),
                ..Default::default()
            }
        }
    }
    
    impl StateMutate for Cart {
        fn mutate(&mut self, event: Self::Event) {
            match event {
                CartEvent::ItemAdded {
                    item_id, quantity, ..
                } => {
                    self.items.insert(Item::new(item_id, quantity));
                }
                CartEvent::ItemRemoved { item_id, .. } => {
                    self.items.retain(|item| item.id != *item_id);
                }
                CartEvent::ItemUpdated {
                    item_id,
                    new_quantity,
                    ..
                } => {
                    self.items.replace(Item::new(item_id, new_quantity));
                }
                CartEvent::CouponApplied { coupon_id, .. } => {
                    self.applied_coupon = Some(coupon_id);
                }
            }
        }
    }
    

    在这个示例中,结构体Cart表示购物车的状态,并跟踪用户添加的项目。

  4. 创建一个实现Decision特质的结构体。这个结构体代表一个业务决策,负责验证输入并生成一系列更改。这些更改将由DecisionMaker存储在事件存储中。

    #[derive(Debug, Error)]
    pub enum CartError {
        // cart errors
    }
    
    pub struct AddItem {
        user_id: String,
        item_id: String,
        quantity: u32,
    }
    
    impl AddItem {
        pub fn new(user_id: String, item_id: String, quantity: u32) -> Self {
            Self {
                user_id,
                item_id,
                quantity,
            }
        }
    }
    
    /// Implement your business logic
    impl Decision for AddItem {
        type Event = CartEvent;
        type StateQuery = Cart;
        type Error = CartError;
    
        fn state_query(&self) -> Self::StateQuery {
            Cart::new(&self.user_id)
        }
    
        fn process(&self, _state: &Self::StateQuery) -> Result<Vec<Self::Event>, Self::Error> {
            // check your business constraints...
            Ok(vec![CartEvent::ItemAdded {
                user_id: self.user_id.clone(),
                item_id: self.item_id.to_string(),
                quantity: self.quantity,
            }])
        }
    }
    

    在提供的示例中,决策被用作命令,在从事件存储构建的状态上执行。一个Decision定义了StateQuery,它将使用事件存储中的事件进行更改。

    如果在事件存储中没有找到事件,则使用默认的StateQuery作为决策的起点。这种情况发生在第一次做出决策时,没有历史数据来构建一个StateQuery

  5. 实例化一个事件存储,创建AddItem决策,并在DecisionMaker上调用make方法。

    mod cart;
    mod event;
    
    use cart::AddItem;
    use event::DomainEvent;
    
    use anyhow::{Ok, Result};
    use disintegrate::serde::json::Json;
    use disintegrate_postgres::PgEventStore;
    use sqlx::{postgres::PgConnectOptions, PgPool};
    
    #[tokio::main]
    async fn main() -> Result<()> {
        dotenv::dotenv().unwrap();
    
        // Create a PostgreSQL poll
        let connect_options = PgConnectOptions::new();
        let pool = PgPool::connect_with(connect_options).await?;
    
        // Create a serde for serialize and deserialize events
        let serde = Json::<DomainEvent>::default();
    
        // Create a PostgreSQL event store
        let event_store = PgEventStore::new(pool, serde).await?;
    
        // Create a Postgres DecisionMaker
        let decision_maker = disintegrate_postgres::decision_maker(event_store);
    
        // Make the decision. This performs the business decision and persists the changes into the
        // event store
        decision_maker
            .make(AddItem::new("user-1".to_string(), "item-1".to_string(), 4))
            .await?;
        Ok(())
    }
    

提供的示例已经说明了功能齐全的事件源应用程序。但是,如果您的业务逻辑跨越多个聚合,您可以使用多状态查询来从各种状态收集所需的所有数据。通常,确保多个聚合之间的不变性需要它们之间的策略。使用由StateQuery元组表示的多状态,您可以在单个Decision中验证所有的不变性。

#[derive(Default, StateQuery, Clone, Serialize, Deserialize)]
#[state_query(CouponEvent)]
pub struct Coupon {
    #[id]
    coupon_id: String,
    quantity: u32,
}

impl Coupon {
    pub fn new(coupon_id: &str) -> Self {
        Self {
            coupon_id: coupon_id.to_string(),
            ..Default::default()
        }
    }
}

impl StateMutate for Coupon {
    fn mutate(&mut self, event: Self::Event) {
        match event {
            CouponEvent::CouponEmitted { quantity, .. } => self.quantity += quantity,
            CouponEvent::CouponApplied { .. } => self.quantity -= 1,
        }
    }
}

pub struct ApplyCoupon {
    user_id: String,
    coupon_id: String,
}

impl ApplyCoupon {
    pub fn new(user_id: String, coupon_id: String) -> Self {
        Self { user_id, coupon_id }
    }
}

impl Decision for ApplyCoupon {
    type Event = DomainEvent;
    type StateQuery = (Cart, Coupon);
    type Error = CartError;

    fn state_query(&self) -> Self::StateQuery {
        (Cart::new(&self.user_id), Coupon::new(&self.coupon_id))
    }

    fn process(&self, (cart, coupon): &Self::StateQuery) -> Result<Vec<Self::Event>, Self::Error> {
        // check your business constraints...
        if cart.applied_coupon.is_some() {
            return Err(CartError::CouponAlreadyApplied);
        }
        if coupon.quantity == 0 {
            return Err(CartError::CouponNotAvailable);
        }
        Ok(vec![DomainEvent::CouponApplied {
            coupon_id: self.coupon_id.clone(),
            user_id: self.user_id.clone(),
        }])
    }
}

查看examples文件夹以更好地了解如何在现实世界的应用程序中使用Disintegrate。

许可证

本项目采用 MIT 许可证 许可。

贡献

欢迎贡献!如果您发现任何问题或有改进建议,请随时提交问题或拉取请求。

在向本项目做出贡献时,请务必遵循 贡献指南

我们感谢您的帮助,让 Disintegrate 更好!

致谢

Disintegrate 受到 Sara Pellegrini 在 Kill Aggregate! 演讲中提出的思想的启发,探索从事件流中建模业务概念的新可能性。我们感谢演讲者分享她的见解,并在软件开发社区中激发创新思维。

在保留视频中的核心概念的同时,Disintegrate 引入了额外的功能,丰富了开发者体验,并将这些想法付诸实践。

  1. Postgres 实现:Disintegrate 提供了视频中所讨论概念的工作实现。

  2. 强大的查询系统:在视频中,查询是通过一系列领域标识符和事件类型构建的。Disintegrate 将这一能力提升到新的水平,使开发者能够创建更复杂的查询,以解决高级用例。

  3. 验证查询:虽然承认视频方法利用查询验证状态完整性的价值,但 Disintegrate 在此方面更进一步。在视频中,相同的查询用于构建状态和追加 API。然而,Disintegrate 引入了一个强大的特性,称为 Validation 查询,它允许开发者在将新事件存储在事件存储中时对决策无效化进行细粒度控制。这在诸如银行示例等场景中特别有用。例如,在做出提款决策时,需要计算账户余额,这需要将存款事件包含在状态中。但是,即使存款事件改变了状态,也不应该使提款决策无效。在这种情况下,必须在构建状态所需的事件子集上执行验证。

  4. 决策概念:Disintegrate 引入了 Decision 的概念,它作为开发应用程序业务逻辑的构建块,同时遵循 SOLID 原则。一个 Decision 可以被视为专注于特定用例的小型聚合体。因此,当出现新的用例时,可以通过添加一个新的 Decision 来扩展应用程序,而无需修改现有的 Decision

  5. Multi-StateQuery:通过允许单个 Decision 使用多个 StateQuery,我们获得了在不同决策之间重用这些查询以及组合已定义的 StateQuery 的灵活性。这种能力使我们能够实现快照机制,这可以在未来的版本中通过不同的和改进的策略进一步细化。

依赖关系

~0.4–2.3MB
~47K SLoC