#event-stream #events #disintegrate #aggregate #macro #domain #refer

disintegrate-macros

Disintegrate宏。不建议直接使用。有关详细信息,请参阅disintegrate软件包。

11个版本 (6个重大更新)

0.8.0 2024年4月20日
0.7.3 2024年3月9日
0.7.2 2024年1月28日
0.6.0 2023年12月28日
0.4.0 2023年7月15日

790进程宏

Download history 153/week @ 2024-04-14 123/week @ 2024-04-21 85/week @ 2024-04-28 130/week @ 2024-05-05 92/week @ 2024-05-12 15/week @ 2024-05-19 5/week @ 2024-05-26 22/week @ 2024-06-02 18/week @ 2024-06-09 11/week @ 2024-06-16 1/week @ 2024-06-23 81/week @ 2024-06-30 9/week @ 2024-07-07 5/week @ 2024-07-14 2/week @ 2024-07-21 79/week @ 2024-07-28

96 每月下载量
2 软件包 中使用

MIT 许可证

44KB
544

Disintegrate

Crates.io docs

Disintegrate是一个Rust库,它提供了一种从事件流构建领域对象的不同方法。虽然支持传统的聚合,但Disintegrate引入了一种新颖的方法,允许在建模业务规则时具有更大的灵活性和适应性。

为什么选择Disintegrate?

Disintegrate通过摆脱仅依赖聚合的方法,为设计业务应用程序提供了一种新的视角。它允许开发人员直接从事件流构建业务概念。这种方法允许分散化和动态架构,这些架构可以随着时间的推移而演变。

通过利用事件流作为基础,Disintegrate使开发人员能够构建无需在聚合中重复同一事件的多个版本即可捕获业务事件本质的模型。这减少了重复和复杂性,从而使得代码更加清洁且易于维护。

关键特性

  • 事件流建模:Disintegrate允许直接从事件流构建业务概念,提供了一种更加灵活和分散的方法。

  • 支持聚合:尽管推广了一种新的方法,但Disintegrate仍然支持传统的聚合,允许开发人员逐步过渡或在其应用程序中同时使用这两种方法。

  • 适应不断变化的业务规则:Disintegrate允许更容易地随着时间的推移演变和适应不断变化的业务规则。通过将模型从聚合中解耦,开发人员可以自由地调整和改进业务概念,而不需要沉重的依赖。

  • 测试驱动开发(TDD):该库提供了一个 TestHarness 工具,以鼓励使用测试驱动开发(TDD)。在开始新的应用程序实现之前,您应该首先编写描述您业务逻辑的测试。这样,您可以确保您的实现满足您的业务不变性。 TestHarness 允许以 给定-当-然后 风格编写测试。 given 表示过去的事件,when 是一个决策,而 then 是决策或错误引发的事件

        #[test]
        fn it_withdraws_an_amount() {
            disintegrate::TestHarness::given([
                DomainEvent::AccountOpened {
                    account_id: "some account".into(),
                },
                DomainEvent::AmountDeposited {
                    account_id: "some account".into(),
                    amount: 10,
                },
            ])
            .when(WithdrawAmount::new("some account".into(), 10))
            .then([DomainEvent::AmountWithdrawn {
                account_id: "some account".into(),
                amount: 10,
            }]);
        }
    
        #[test]
        fn it_should_not_withdraw_an_amount_when_the_balance_is_insufficient() {
            disintegrate::TestHarness::given([
                DomainEvent::AccountOpened {
                    account_id: "some account".into(),
                },
                DomainEvent::AmountDeposited {
                    account_id: "some account".into(),
                    amount: 10,
                },
                DomainEvent::AmountWithdrawn {
                    account_id: "some account".into(),
                    amount: 26,
                },
            ])
            .when(WithdrawAmount::new("some account".into(), 5))
            .then_err(Error::InsufficientBalance);
        }
    

使用方法

要将 Disintegrate 添加到您的项目中,请按照以下步骤操作

  1. disintegratedisintegrate-postgres 添加为 Cargo.toml 文件中的依赖项

    [dependencies]
    disintegrate = "0.8.0"
    disintegrate-postgres = "0.8.0"
    
    • Disintegrate 提供了多种功能,您可以根据项目需求启用它们。您可以在 Cargo.toml 文件中按如下方式包含它们
    [dependencies]
    disintegrate = { version = "0.8.0", features = ["macros", "serde-prost"] }
    disintegrate-postgres = { version = "0.8.0", features = ["listener"] }
    
    • 宏功能允许使用 derive 宏简化事件实现。

    • 对于事件的序列化和反序列化,Disintegrate 通过 Serde 生态系统支持不同的序列化格式。您可以通过包含相应的功能来启用所需的格式

      • 要启用 JSON 序列化,使用 serde-json 功能:features = ["serde-json"]
      • 要启用 Avro 序列化,使用 serde-avro 功能:features = ["serde-avro"]
      • 要启用 Prost 序列化,使用 serde-prost 功能:features = ["serde-prost"]
      • 要启用 Protocol Buffers 序列化,使用 serde-protobuf 功能:features = ["serde-protobuf"]
    • 如果您正在使用 PostgreSQL 事件存储后端并想使用监听器机制,可以启用 listener 功能:disintegrate-postgres = {version = "0.8.0", features = ["listener"]}

  2. 在您的应用程序中定义事件列表。您可以使用事件风暴技术来识别系统中发生的事件。以下是一个使用 Disintegrate 定义事件的示例

    use disintegrate::Event;
    use serde::{Deserialize, Serialize};
    
    #[derive(Debug, Clone, PartialEq, Eq, Event, Serialize, Deserialize)]
    #[stream(UserEvent, [UserCreated])]
    #[stream(CartEvent, [ItemAdded, ItemRemoved, ItemUpdated, CouponApplied])]
    #[stream(CouponEvent, [CouponEmitted, CouponApplied])]
    pub enum DomainEvent {
        UserCreated {
            #[id]
            user_id: String,
            name: String,
        },
        ItemAdded {
            #[id]
            user_id: String,
            #[id]
            item_id: String,
            quantity: u32,
        },
        ItemRemoved {
            #[id]
            user_id: String,
            #[id]
            item_id: String,
        },
        ItemUpdated {
            #[id]
            user_id: String,
            #[id]
            item_id: String,
            new_quantity: u32,
        },
        CouponEmitted {
            #[id]
            coupon_id: String,
            quantity: u32,
        },
        CouponApplied {
            #[id]
            coupon_id: String,
            #[id]
            user_id: String,
        },
    }
    

    在这个示例中,我们使用#[derive(Event)]属性定义了一个枚举DomainEvent。该枚举代表应用程序可能发生的各种事件。#[stream]属性指定了事件流,例如UserEventCartEvent及其相应的变体。这允许您将事件组织成逻辑流。字段上的#[id]属性允许您指定每个事件的领域标识符,这些标识符用于在状态查询中过滤相关事件。

  3. 通过推导StateQuery特质来创建一个状态查询,以从事件构建视图。为此,使用#[state_query]属性定义事件流,并使用#[id]注解包含标识符的字段。库使用注解的ID在指定的流中过滤事件,仅保留具有相应ID的事件。状态还必须实现StateMutate特质,该特质定义了如何聚合事件中包含的数据以构建状态。

    use crate::event::{CartEvent, CouponEvent, DomainEvent};
    use disintegrate::StateQuery;
    use disintegrate::{Decision, StateMutate};
    use serde::{Deserialize, Serialize};
    use std::collections::HashSet;
    use thiserror::Error;
    
    #[derive(Clone, Eq, Hash, PartialEq, Serialize, Deserialize)]
    pub struct Item {
        id: String,
        quantity: u32,
    }
    
    impl Item {
        fn new(id: String, quantity: u32) -> Self {
            Item { id, quantity }
        }
    }
    
    #[derive(Default, StateQuery, Clone, Serialize, Deserialize)]
    #[state_query(CartEvent)]
    pub struct Cart {
        #[id]
        user_id: String,
        items: HashSet<Item>,
        applied_coupon: Option<String>,
    }
    
    impl Cart {
        pub fn new(user_id: &str) -> Self {
            Self {
                user_id: user_id.into(),
                ..Default::default()
            }
        }
    }
    
    impl StateMutate for Cart {
        fn mutate(&mut self, event: Self::Event) {
            match event {
                CartEvent::ItemAdded {
                    item_id, quantity, ..
                } => {
                    self.items.insert(Item::new(item_id, quantity));
                }
                CartEvent::ItemRemoved { item_id, .. } => {
                    self.items.retain(|item| item.id != *item_id);
                }
                CartEvent::ItemUpdated {
                    item_id,
                    new_quantity,
                    ..
                } => {
                    self.items.replace(Item::new(item_id, new_quantity));
                }
                CartEvent::CouponApplied { coupon_id, .. } => {
                    self.applied_coupon = Some(coupon_id);
                }
            }
        }
    }
    

    在这个示例中,Cart结构体表示购物车的状态,并跟踪用户添加的项目。

  4. 创建一个实现Decision特质的结构体。这个结构体代表业务决策,负责验证输入并生成一个更改列表。这些更改将由DecisionMaker存储在事件存储中。

    #[derive(Debug, Error)]
    pub enum CartError {
        // cart errors
    }
    
    pub struct AddItem {
        user_id: String,
        item_id: String,
        quantity: u32,
    }
    
    impl AddItem {
        pub fn new(user_id: String, item_id: String, quantity: u32) -> Self {
            Self {
                user_id,
                item_id,
                quantity,
            }
        }
    }
    
    /// Implement your business logic
    impl Decision for AddItem {
        type Event = CartEvent;
        type StateQuery = Cart;
        type Error = CartError;
    
        fn state_query(&self) -> Self::StateQuery {
            Cart::new(&self.user_id)
        }
    
        fn process(&self, _state: &Self::StateQuery) -> Result<Vec<Self::Event>, Self::Error> {
            // check your business constraints...
            Ok(vec![CartEvent::ItemAdded {
                user_id: self.user_id.clone(),
                item_id: self.item_id.to_string(),
                quantity: self.quantity,
            }])
        }
    }
    

    在提供的示例中,决策被用作对从事件存储构建的状态执行的命令。一个Decision定义了StateQuery,它将使用事件存储中的事件进行修改。

    如果在事件存储中没有找到事件,则默认使用StateQuery作为决策的起点。这种情况下,当决策首次执行且没有可用历史数据来构建StateQuery时,会出现这种情况。

  5. 实例化一个事件存储,创建AddItem决策,并在DecisionMaker上调用make方法。

    mod cart;
    mod event;
    
    use cart::AddItem;
    use event::DomainEvent;
    
    use anyhow::{Ok, Result};
    use disintegrate::serde::json::Json;
    use disintegrate_postgres::PgEventStore;
    use sqlx::{postgres::PgConnectOptions, PgPool};
    
    #[tokio::main]
    async fn main() -> Result<()> {
        dotenv::dotenv().unwrap();
    
        // Create a PostgreSQL poll
        let connect_options = PgConnectOptions::new();
        let pool = PgPool::connect_with(connect_options).await?;
    
        // Create a serde for serialize and deserialize events
        let serde = Json::<DomainEvent>::default();
    
        // Create a PostgreSQL event store
        let event_store = PgEventStore::new(pool, serde).await?;
    
        // Create a Postgres DecisionMaker
        let decision_maker = disintegrate_postgres::decision_maker(event_store);
    
        // Make the decision. This performs the business decision and persists the changes into the
        // event store
        decision_maker
            .make(AddItem::new("user-1".to_string(), "item-1".to_string(), 4))
            .await?;
        Ok(())
    }
    

提供的示例已经说明了功能齐全的事件源应用程序。但是,如果您的业务逻辑跨越多个聚合,您可以使用多状态查询从各种状态收集所有所需数据。通常,确保多个聚合之间的不变性需要它们之间的策略。通过多状态,表示为StateQuery元组的集合,您可以在单个Decision中验证所有不变性。

#[derive(Default, StateQuery, Clone, Serialize, Deserialize)]
#[state_query(CouponEvent)]
pub struct Coupon {
    #[id]
    coupon_id: String,
    quantity: u32,
}

impl Coupon {
    pub fn new(coupon_id: &str) -> Self {
        Self {
            coupon_id: coupon_id.to_string(),
            ..Default::default()
        }
    }
}

impl StateMutate for Coupon {
    fn mutate(&mut self, event: Self::Event) {
        match event {
            CouponEvent::CouponEmitted { quantity, .. } => self.quantity += quantity,
            CouponEvent::CouponApplied { .. } => self.quantity -= 1,
        }
    }
}

pub struct ApplyCoupon {
    user_id: String,
    coupon_id: String,
}

impl ApplyCoupon {
    pub fn new(user_id: String, coupon_id: String) -> Self {
        Self { user_id, coupon_id }
    }
}

impl Decision for ApplyCoupon {
    type Event = DomainEvent;
    type StateQuery = (Cart, Coupon);
    type Error = CartError;

    fn state_query(&self) -> Self::StateQuery {
        (Cart::new(&self.user_id), Coupon::new(&self.coupon_id))
    }

    fn process(&self, (cart, coupon): &Self::StateQuery) -> Result<Vec<Self::Event>, Self::Error> {
        // check your business constraints...
        if cart.applied_coupon.is_some() {
            return Err(CartError::CouponAlreadyApplied);
        }
        if coupon.quantity == 0 {
            return Err(CartError::CouponNotAvailable);
        }
        Ok(vec![DomainEvent::CouponApplied {
            coupon_id: self.coupon_id.clone(),
            user_id: self.user_id.clone(),
        }])
    }
}

查看examples文件夹以更好地了解如何在实际应用程序中使用Disintegrate。

许可证

本项目受MIT许可证许可。

贡献

欢迎贡献!如果您发现任何问题或对改进有建议,请随时打开一个问题或提交一个pull请求。

请在向本项目做出贡献时务必遵循贡献指南

感谢您帮助改进Disintegrate!

致谢

Disintegrate受到Sara Pellegrini的演讲Kill Aggregate!的启发,探讨了从事件流中建模业务概念的新可能性。我们想对演讲者分享她的见解并在软件开发社区激发创新思维表示衷心的感谢。

在保留视频中的核心概念的同时,Disintegrate引入了额外的功能,丰富了开发者的体验,并将这些想法付诸实践。

  1. Postgres实现:Disintegrate为视频中所讨论的概念提供了一种工作实现。

  2. 强大的查询系统:在视频中,查询是通过一系列域标识符和事件类型构建的。Disintegrate将这一能力提升到新的水平,使开发者能够创建更复杂的查询,以应对更高级的使用场景。

  3. 验证查询:虽然认可了视频中利用查询验证状态完整性的价值,但Disintegrate在这一方面更进一步。在视频中,相同的查询既用于构建状态,也用于append API。然而,Disintegrate引入了一个名为Validation的强大功能,它使开发者能够在事件存储中存储新事件时对决策的无效化进行精细控制。这在银行示例等场景中特别有用。例如,在做出取款决策时,需要计算账户余额,这要求在状态中包含存款事件。然而,即使存款事件改变了状态,也不应该使取款决策无效。在这种情况下,必须在构建状态所需的事件子集上执行验证。

  4. 决策概念:Disintegrate引入了Decision的概念,它作为开发应用业务逻辑的构建块,同时遵守SOLID原则。一个Decision可以被视为专注于特定用例的小型聚合。因此,当出现新的用例时,可以通过添加一个新的Decision来扩展应用程序,而无需修改现有的一个。

  5. Multi-StateQuery:通过允许单个Decision使用多个StateQuery,我们获得了在不同决策之间重用这些查询以及组合已定义的StateQuery的灵活性。这一功能使我们能够实现快照机制,在未来版本中可以使用不同的和改进的策略进一步细化。

依赖关系

~300–750KB
~18K SLoC