12个版本 (7个破坏性更新)

0.8.0 2024年4月20日
0.7.3 2024年3月9日
0.7.2 2024年1月28日
0.6.0 2023年12月28日
0.5.0 2023年7月15日

#451 in 异步

Download history 194/week @ 2024-04-15 105/week @ 2024-04-22 90/week @ 2024-04-29 146/week @ 2024-05-06 64/week @ 2024-05-13 11/week @ 2024-05-20 23/week @ 2024-06-03 19/week @ 2024-06-10 8/week @ 2024-06-17 10/week @ 2024-06-24 74/week @ 2024-07-01 9/week @ 2024-07-08 4/week @ 2024-07-15 2/week @ 2024-07-22 91/week @ 2024-07-29

每月107次下载
分解-后端中使用

MIT授权

120KB
2K SLoC

分解

Crates.io docs

分解是一个Rust库,它提供了一种从事件流中构建领域对象的替代方法。虽然支持传统的聚合,但分解引入了一种新颖的方法,可以在建模业务规则方面提供更多的灵活性和适应性。

为什么选择分解?

分解通过摆脱对聚合的依赖,为设计业务应用提供了一种全新的视角。相反,它使开发者能够直接从事件流中构建业务概念。这种方法允许去中心化和动态的架构,这些架构可以随着时间的推移而发展。

通过利用事件流作为基础,分解使开发者能够构建模型,以捕获业务事件的核心,而无需在聚合中重复同一事件的不同版本。这减少了重复和复杂性,导致代码更加清洁和易于维护。

关键特性

  • 事件流建模:分解使开发者能够直接从事件流中构建业务概念,提供了一种更灵活和去中心化的方法。

  • 支持聚合:虽然推广了新的方法,但分解仍然支持传统的聚合,允许开发者逐步过渡或在他们的应用程序中同时使用这两种方法。

  • 适应不断变化的业务规则:分解允许更容易地随时间演变和适应不断发展的业务规则。通过将模型与聚合解耦,开发者可以在没有沉重依赖的情况下调整和细化业务概念。

  • 测试驱动开发(TDD):该库提供了一个用于鼓励使用测试驱动开发(TDD)的 TestHarness 工具。在开始新的应用程序实现之前,您应该首先编写描述您业务逻辑的测试。这样,您可以确保您的实现满足您的业务不变性。 TestHarness 允许使用 给定-当-然后 风格编写测试。given 表示过去事件,when 是一个决策,而 then 是决策或错误产生的事件。

        #[test]
        fn it_withdraws_an_amount() {
            disintegrate::TestHarness::given([
                DomainEvent::AccountOpened {
                    account_id: "some account".into(),
                },
                DomainEvent::AmountDeposited {
                    account_id: "some account".into(),
                    amount: 10,
                },
            ])
            .when(WithdrawAmount::new("some account".into(), 10))
            .then([DomainEvent::AmountWithdrawn {
                account_id: "some account".into(),
                amount: 10,
            }]);
        }
    
        #[test]
        fn it_should_not_withdraw_an_amount_when_the_balance_is_insufficient() {
            disintegrate::TestHarness::given([
                DomainEvent::AccountOpened {
                    account_id: "some account".into(),
                },
                DomainEvent::AmountDeposited {
                    account_id: "some account".into(),
                    amount: 10,
                },
                DomainEvent::AmountWithdrawn {
                    account_id: "some account".into(),
                    amount: 26,
                },
            ])
            .when(WithdrawAmount::new("some account".into(), 5))
            .then_err(Error::InsufficientBalance);
        }
    

使用方法

要将 Disintegrate 添加到您的项目中,请按照以下步骤操作

  1. disintegratedisintegrate-postgres 添加到您的 Cargo.toml 文件中的依赖项

    [dependencies]
    disintegrate = "0.8.0"
    disintegrate-postgres = "0.8.0"
    
    • Disintegrate 根据项目需求提供了一些您可以选择启用的功能。您可以在 Cargo.toml 文件中按如下方式包括它们
    [dependencies]
    disintegrate = { version = "0.8.0", features = ["macros", "serde-prost"] }
    disintegrate-postgres = { version = "0.8.0", features = ["listener"] }
    
    • 宏功能允许使用 derive 宏简化事件实现。

    • 对于事件的序列化和反序列化,Disintegrate 通过 Serde 生态系统支持不同的序列化格式。您可以通过包含相应的功能来启用所需的格式

      • 要启用 JSON 序列化,使用 serde-json 功能:features = ["serde-json"]
      • 要启用 Avro 序列化,使用 serde-avro 功能:features = ["serde-avro"]
      • 要启用 Prost 序列化,使用 serde-prost 功能:features = ["serde-prost"]
      • 要启用 Protocol Buffers 序列化,使用 serde-protobuf 功能:features = ["serde-protobuf"]
    • 如果您正在使用 PostgreSQL 事件存储后端并且想使用监听器机制,可以启用 listener 功能:disintegrate-postgres = {version = "0.8.0", features = ["listener"]}

  2. 在您的应用程序中定义事件列表。您可以使用事件风暴技术来识别系统中发生的事件。以下是一个使用 Disintegrate 定义事件的示例

    use disintegrate::Event;
    use serde::{Deserialize, Serialize};
    
    #[derive(Debug, Clone, PartialEq, Eq, Event, Serialize, Deserialize)]
    #[stream(UserEvent, [UserCreated])]
    #[stream(CartEvent, [ItemAdded, ItemRemoved, ItemUpdated, CouponApplied])]
    #[stream(CouponEvent, [CouponEmitted, CouponApplied])]
    pub enum DomainEvent {
        UserCreated {
            #[id]
            user_id: String,
            name: String,
        },
        ItemAdded {
            #[id]
            user_id: String,
            #[id]
            item_id: String,
            quantity: u32,
        },
        ItemRemoved {
            #[id]
            user_id: String,
            #[id]
            item_id: String,
        },
        ItemUpdated {
            #[id]
            user_id: String,
            #[id]
            item_id: String,
            new_quantity: u32,
        },
        CouponEmitted {
            #[id]
            coupon_id: String,
            quantity: u32,
        },
        CouponApplied {
            #[id]
            coupon_id: String,
            #[id]
            user_id: String,
        },
    }
    

    在这个例子中,我们使用属性#[derive(Event)]定义了一个枚举DomainEvent。该枚举代表应用程序可能发生的各种事件。属性#[stream]指定事件流,例如UserEventCartEvent及其对应的变体。这允许您将事件组织成逻辑流。字段上的属性#[id]允许您指定每个事件的领域标识符,这些标识符用于过滤状态查询的相关事件。

  3. 通过推导属性#[state_query]定义事件流,并使用属性#[id]标注包含标识符的字段,创建一个用于从事件构建视图的状态查询。库使用标注的ID来过滤指定流中的事件,仅保留具有对应ID的事件。状态还必须实现StateMutate属性,该属性定义如何聚合事件中包含的数据以构建状态。

    use crate::event::{CartEvent, CouponEvent, DomainEvent};
    use disintegrate::StateQuery;
    use disintegrate::{Decision, StateMutate};
    use serde::{Deserialize, Serialize};
    use std::collections::HashSet;
    use thiserror::Error;
    
    #[derive(Clone, Eq, Hash, PartialEq, Serialize, Deserialize)]
    pub struct Item {
        id: String,
        quantity: u32,
    }
    
    impl Item {
        fn new(id: String, quantity: u32) -> Self {
            Item { id, quantity }
        }
    }
    
    #[derive(Default, StateQuery, Clone, Serialize, Deserialize)]
    #[state_query(CartEvent)]
    pub struct Cart {
        #[id]
        user_id: String,
        items: HashSet<Item>,
        applied_coupon: Option<String>,
    }
    
    impl Cart {
        pub fn new(user_id: &str) -> Self {
            Self {
                user_id: user_id.into(),
                ..Default::default()
            }
        }
    }
    
    impl StateMutate for Cart {
        fn mutate(&mut self, event: Self::Event) {
            match event {
                CartEvent::ItemAdded {
                    item_id, quantity, ..
                } => {
                    self.items.insert(Item::new(item_id, quantity));
                }
                CartEvent::ItemRemoved { item_id, .. } => {
                    self.items.retain(|item| item.id != *item_id);
                }
                CartEvent::ItemUpdated {
                    item_id,
                    new_quantity,
                    ..
                } => {
                    self.items.replace(Item::new(item_id, new_quantity));
                }
                CartEvent::CouponApplied { coupon_id, .. } => {
                    self.applied_coupon = Some(coupon_id);
                }
            }
        }
    }
    

    在这个例子中,结构体Cart表示购物车的状态,并跟踪用户添加的项目。

  4. 创建一个实现Decision属性的结构体。这个结构体代表一个业务决策,负责验证输入并生成更改列表。生成的更改将由DecisionMaker存储在事件存储中。

    #[derive(Debug, Error)]
    pub enum CartError {
        // cart errors
    }
    
    pub struct AddItem {
        user_id: String,
        item_id: String,
        quantity: u32,
    }
    
    impl AddItem {
        pub fn new(user_id: String, item_id: String, quantity: u32) -> Self {
            Self {
                user_id,
                item_id,
                quantity,
            }
        }
    }
    
    /// Implement your business logic
    impl Decision for AddItem {
        type Event = CartEvent;
        type StateQuery = Cart;
        type Error = CartError;
    
        fn state_query(&self) -> Self::StateQuery {
            Cart::new(&self.user_id)
        }
    
        fn process(&self, _state: &Self::StateQuery) -> Result<Vec<Self::Event>, Self::Error> {
            // check your business constraints...
            Ok(vec![CartEvent::ItemAdded {
                user_id: self.user_id.clone(),
                item_id: self.item_id.to_string(),
                quantity: self.quantity,
            }])
        }
    }
    

    在提供的例子中,决策作为命令被执行,针对从事件存储构建的状态。一个Decision定义了StateQuery,该查询将使用事件存储中的事件进行修改。

    如果在事件存储中没有找到事件,则使用默认的StateQuery作为决策的起点。这种情况发生在决策是第一次被采取,且没有可用的历史数据来构建StateQuery时。

  5. 实例化一个事件存储,创建AddItem决策,并在DecisionMaker上调用make方法。

    mod cart;
    mod event;
    
    use cart::AddItem;
    use event::DomainEvent;
    
    use anyhow::{Ok, Result};
    use disintegrate::serde::json::Json;
    use disintegrate_postgres::PgEventStore;
    use sqlx::{postgres::PgConnectOptions, PgPool};
    
    #[tokio::main]
    async fn main() -> Result<()> {
        dotenv::dotenv().unwrap();
    
        // Create a PostgreSQL poll
        let connect_options = PgConnectOptions::new();
        let pool = PgPool::connect_with(connect_options).await?;
    
        // Create a serde for serialize and deserialize events
        let serde = Json::<DomainEvent>::default();
    
        // Create a PostgreSQL event store
        let event_store = PgEventStore::new(pool, serde).await?;
    
        // Create a Postgres DecisionMaker
        let decision_maker = disintegrate_postgres::decision_maker(event_store);
    
        // Make the decision. This performs the business decision and persists the changes into the
        // event store
        decision_maker
            .make(AddItem::new("user-1".to_string(), "item-1".to_string(), 4))
            .await?;
        Ok(())
    }
    

提供的例子已经展示了功能齐全的事件源应用程序。但是,如果您的业务逻辑跨越多个聚合,您可以使用多状态查询从各种状态中收集所有必要的数据。通常,确保多个聚合之间的不变性需要它们之间的策略。通过表示为StateQuery元组的多个状态,您可以在单个Decision中验证所有的不变性。

#[derive(Default, StateQuery, Clone, Serialize, Deserialize)]
#[state_query(CouponEvent)]
pub struct Coupon {
    #[id]
    coupon_id: String,
    quantity: u32,
}

impl Coupon {
    pub fn new(coupon_id: &str) -> Self {
        Self {
            coupon_id: coupon_id.to_string(),
            ..Default::default()
        }
    }
}

impl StateMutate for Coupon {
    fn mutate(&mut self, event: Self::Event) {
        match event {
            CouponEvent::CouponEmitted { quantity, .. } => self.quantity += quantity,
            CouponEvent::CouponApplied { .. } => self.quantity -= 1,
        }
    }
}

pub struct ApplyCoupon {
    user_id: String,
    coupon_id: String,
}

impl ApplyCoupon {
    pub fn new(user_id: String, coupon_id: String) -> Self {
        Self { user_id, coupon_id }
    }
}

impl Decision for ApplyCoupon {
    type Event = DomainEvent;
    type StateQuery = (Cart, Coupon);
    type Error = CartError;

    fn state_query(&self) -> Self::StateQuery {
        (Cart::new(&self.user_id), Coupon::new(&self.coupon_id))
    }

    fn process(&self, (cart, coupon): &Self::StateQuery) -> Result<Vec<Self::Event>, Self::Error> {
        // check your business constraints...
        if cart.applied_coupon.is_some() {
            return Err(CartError::CouponAlreadyApplied);
        }
        if coupon.quantity == 0 {
            return Err(CartError::CouponNotAvailable);
        }
        Ok(vec![DomainEvent::CouponApplied {
            coupon_id: self.coupon_id.clone(),
            user_id: self.user_id.clone(),
        }])
    }
}

查看examples文件夹,以更好地了解如何在现实世界应用程序中使用Disintegrate。

许可

本项目使用MIT许可证授权。

贡献

欢迎贡献!如果您发现任何问题或有改进建议,请随时开启一个问题或提交一个拉取请求。

请确保在向本项目贡献时遵循贡献指南

我们感谢您的帮助,让Disintegrate变得更好!

致谢

Disintegrate受到Sara Pellegrini在《杀掉聚合!》演讲中提出的思想的启发,探索从事件流中建模业务概念的新可能性。我们想对演讲者分享她的见解并激发软件开发社区的创新思维表示衷心的感谢。

在保留视频中的核心概念的同时,Disintegrate引入了额外的功能,丰富了开发者的体验,并将这些想法转化为实际的应用。

  1. Postgres实现:Disintegrate提供了视频中讨论的概念的可行实现。

  2. 强大的查询系统:在视频中,查询是通过域名标识符和事件类型列表构建的。Disintegrate将这一能力提升到新的高度,使开发者能够创建更复杂的查询,以解决高级用例。

  3. 验证查询:虽然认可视频中的方法在利用查询验证状态完整性方面的价值,但Disintegrate将这一方面提升到了更高的层次。在视频中,相同的查询用于构建状态和追加API。然而,Disintegrate引入了一种名为Validation的强大查询功能,使开发者能够在事件存储中存储新事件时对决策的无效化进行细粒度控制。这在如银行示例等场景中特别有用。例如,在做出提款决策时,需要计算账户余额,这要求将存款事件包含在状态中。但是,即使存款事件改变了状态,也不应使提款决策无效。在这种情况下,必须在构建状态所需的事件子集上进行验证。

  4. 决策概念:Disintegrate引入了Decision的概念,作为开发应用业务逻辑的构建块,同时遵循SOLID原则。一个Decision可以看作是一个关注特定用例的小聚合体。因此,当出现新的用例时,可以通过添加新的Decision来扩展应用程序,而无需修改现有的一个。

  5. Multi-StateQuery:通过允许单个Decision使用多个StateQuery,我们获得了在不同的决策和组合已定义的StateQuery之间的查询复用的灵活性。这一能力使我们能够实现快照机制,这可以在未来的版本中使用不同的和改进的策略进一步优化。

依赖关系

~4–6.5MB
~121K SLoC