#事件溯源 #状态 #事件 #CQRS #模式 #领域 #数据

rust-fmodel

通过有效地在Rust中实现事件溯源和CQRS模式,加速开发组合式、安全和易用的应用程序/信息系统

7个版本 (破坏性更新)

0.7.0 2023年12月29日
0.6.0 2023年12月29日
0.5.0 2023年12月15日
0.4.0 2023年12月3日
0.1.0 2023年9月16日

#7 in #CQRS

每月下载 41

Apache-2.0

82KB
657

f(model) - 使用Rust进行函数式领域建模

crates.iodocs.rs上公开可用

从版本0.7.0+开始,该库使用async fn in Traits特性,该特性目前仅在稳定版Rust 1.75.0+中可用。

如果您使用的是较旧的Rust版本,请使用库的0.6.0版本。它依赖于async-traitcrate。0.6.0版本不再维护,仅针对安全问题和错误进行修补。

当您开发信息系统来自动化业务活动时,您正在对业务进行建模。您设计的抽象、实现的行为和构建的用户界面交互都反映了业务——它们共同构成了领域模型。

event-modeling

IOR<库,灵感>

此项目可以用作库或灵感,或两者兼具。它提供足够的战术领域驱动设计模式,针对事件溯源和CQRS进行优化。

抽象和泛化

抽象可以隐藏不相关的细节,并使用名称来引用对象。它强调对象是什么或做什么,而不是它如何表示或如何工作。

泛化通过用单个结构替换执行类似功能的多个实体来减少复杂性。

抽象和泛化通常一起使用。通过参数化抽象来泛化,以提供更优秀的效用。

Box<dynFn(&C,&S) -> Vec<E>>

type DecideFunction<'a,C,S,E> = Box<dynFn(&C,&S) -> Vec<E> + 'a + Send + Sync>

在更高的抽象层次上,任何信息系统都负责处理意图(Command)并根据当前的State产生新的事实(Events

  • 给定当前 State/S 的输入
  • Command/C 处理 的输入
  • 期望 Vec 新的 Events/E输出 上发布/发射

Box<dynFn(&S,&E) ->S>

type EvolveFunction<'a,S,E> = Box<dynFn(&S,&E) ->S+ 'a + Send + Sync>

新状态总是从当前状态 S 和当前事件 E 中演变而来。

  • 给定当前 State/S 的输入
  • 当处理 Event/E 时(输入上)
  • 期望在输出上发布新的 State/S

有两个函数被封装在一个数据类型类(代数数据结构)中,该类有三个泛型参数进行泛化。

pub struct Decider<'a, C: 'a, S: 'a, E: 'a> {
    pub decide: DecideFunction<'a, C, S, E>,
    pub evolve: EvolveFunction<'a, S, E>,
    pub initial_state: InitialStateFunction<'a, S>,
}

Decider 是最重要的数据类型,但并非唯一。还有其他一些。

onion architecture image

Decider

Decider 是一个表示主要决策算法的代数类型/结构体。它属于域层。它有三个泛型参数 CSE,表示 Decider 可能包含或使用的值的类型。由于这些类型不影响其行为,Decider 可以针对任何类型 CSE 进行特殊化。例如,当 C=IntC=YourCustomType 时,Decider 的行为相同。

Decider 是一个纯域组件。

  • C - 命令
  • S - 状态
  • E - 事件
pub type DecideFunction<'a, C, S, E> = Box<dyn Fn(&C, &S) -> Vec<E> + 'a + Send + Sync>;
pub type EvolveFunction<'a, S, E> = Box<dyn Fn(&S, &E) -> S + 'a + Send + Sync>;
pub type InitialStateFunction<'a, S> = Box<dyn Fn() -> S + 'a + Send + Sync>;

pub struct Decider<'a, C: 'a, S: 'a, E: 'a> {
    pub decide: DecideFunction<'a, C, S, E>,
    pub evolve: EvolveFunction<'a, S, E>,
    pub initial_state: InitialStateFunction<'a, S>,
}

此外,还引入了 initialState 以获得对 Decider 初始状态的更多控制。

事件源聚合

事件源聚合 使用/委托一个 Decider 来处理命令并生成新事件。它属于应用层。为了处理命令,聚合需要通过 EventRepository.fetchEvents 异步函数获取当前状态(表示为事件的列表/向量),然后将命令委托给可以生成新事件的决策者。然后通过 EventRepository.save 异步函数存储生成的事件。

它是事件源信息系统的一个形式化。

状态存储聚合

状态存储聚合 使用/委托一个 Decider 来处理命令并生成新状态。它属于应用层。为了处理命令,聚合首先需要通过 StateRepository.fetchState 异步函数获取当前状态,然后将命令委托给可以生成新状态的决策者。然后通过 StateRepository.save 异步函数存储新状态。

它是状态存储信息系统的一个形式化。

视图

View 是一个表示事件处理算法的代数类型,负责将事件转换为非规范化状态,这对于查询更合适。它属于域层。通常用于创建 CQRS 模式的视图/查询端。显然,CQRS 的命令端通常是一个事件源聚合。

它有两个泛型参数 SE,表示 View 可能包含或使用的值的类型。由于这些类型不影响其行为,View 可以针对任何类型的 SE 进行特殊化。例如,对于 E=IntE=YourCustomTypeView 的行为相同。

View 是一个纯域组件。

  • S - 状态
  • E - 事件
pub struct View<'a, S: 'a, E: 'a> {
    pub evolve: EvolveFunction<'a, S, E>,
    pub initial_state: InitialStateFunction<'a, S>,
}

物化视图

物化视图(Materialized view)使用/委托了一个View来处理类型为E的事件,并作为结果维护非规范化投影的状态。本质上,它代表了CQRS模式的查询/视图方面。它属于应用层。

为了处理事件,物化视图首先需要通过ViewStateRepository.fetchState延迟函数获取当前状态,然后将事件委托给视图,视图可以产生新的状态。新的状态随后通过ViewStateRepository.save延迟函数存储。

代数数据类型

在Rust中,我们可以使用ADTs以函数式方式来建模应用域实体和关系,清晰地定义可能的值和状态集合。Rust主要有两种ADT类型:enumstruct

  • enum用于定义一个可以取多个可能变体的类型 - 模拟一个sum/OR类型。
  • struct用于表达一个具有命名字段的类型 - 模拟一个product/AND类型。

ADTs将有助于

  • 在代码中准确地表示业务域
  • 强制执行正确性
  • 减少错误的可能性。

在FModel中,我们广泛使用ADTs来建模数据。

C / 命令 / 改变系统状态的意图

// models Sum/Or type / multiple possible variants
pub enum OrderCommand {
    Create(CreateOrderCommand),
    Update(UpdateOrderCommand),
    Cancel(CancelOrderCommand),
}
// models Product/And type / a concrete variant, consisting of named fields
pub struct CreateOrderCommand {
    pub order_id: u32,
    pub customer_name: String,
    pub items: Vec<String>,
}
// models Product/And type / a concrete variant, consisting of named fields
pub struct UpdateOrderCommand {
    pub order_id: u32,
    pub new_items: Vec<String>,
}
// models Product/And type / a concrete variant, consisting of named fields
#[derive(Debug)]
pub struct CancelOrderCommand {
    pub order_id: u32,
}

E / 事件 / 事实

// models Sum/Or type / multiple possible variants
pub enum OrderEvent {
    Created(OrderCreatedEvent),
    Updated(OrderUpdatedEvent),
    Cancelled(OrderCancelledEvent),
}
// models Product/And type / a concrete variant, consisting of named fields
pub struct OrderCreatedEvent {
    pub order_id: u32,
    pub customer_name: String,
    pub items: Vec<String>,
}
// models Product/And type / a concrete variant, consisting of named fields
pub struct OrderUpdatedEvent {
    pub order_id: u32,
    pub updated_items: Vec<String>,
}
// models Product/And type / a concrete variant, consisting of named fields
pub struct OrderCancelledEvent {
    pub order_id: u32,
}

S / 状态 / 系统聚合/实体的当前状态

struct OrderState {
    order_id: u32,
    customer_name: String,
    items: Vec<String>,
    is_cancelled: bool,
}

建模我们域的行为

  • 代数数据类型构成了我们的实体(命令、状态和事件)的结构。
  • 函数/lambda提供了以组合方式操作实体的代数,有效地建模行为。

这导致设计模块化,并清晰地分离了实体的结构和实体的函数/行为。

Fmodel库提供了通用的抽象组件,可以针对您特定的案例/预期行为进行专业化。

  • 决策者 - 表示主要决策算法的数据类型。
fn decider<'a>() -> Decider<'a, OrderCommand, OrderState, OrderEvent> {
    Decider {
        // Your decision logic goes here.
        decide: Box::new(|command, state| match command {
            // Exhaustive pattern matching on the command
            OrderCommand::Create(create_cmd) => {
                vec![OrderEvent::Created(OrderCreatedEvent {
                    order_id: create_cmd.order_id,
                    customer_name: create_cmd.customer_name.to_owned(),
                    items: create_cmd.items.to_owned(),
                })]
            }
            OrderCommand::Update(update_cmd) => {
                // Your validation logic goes here
                if state.order_id == update_cmd.order_id {
                    vec![OrderEvent::Updated(OrderUpdatedEvent {
                        order_id: update_cmd.order_id,
                        updated_items: update_cmd.new_items.to_owned(),
                    })]
                } else {
                    // In case of validation failure, return empty list of events or error event
                    vec![]
                }
            }
            OrderCommand::Cancel(cancel_cmd) => {
                // Your validation logic goes here
                if state.order_id == cancel_cmd.order_id {
                    vec![OrderEvent::Cancelled(OrderCancelledEvent {
                        order_id: cancel_cmd.order_id,
                    })]
                } else {
                    // In case of validation failure, return empty list of events or error event
                    vec![]
                }
            }
        }),
        // Evolve the state based on the event(s)
        evolve: Box::new(|state, event| {
            let mut new_state = state.clone();
            // Exhaustive pattern matching on the event
            match event {
                OrderEvent::Created(created_event) => {
                    new_state.order_id = created_event.order_id;
                    new_state.customer_name = created_event.customer_name.to_owned();
                    new_state.items = created_event.items.to_owned();
                }
                OrderEvent::Updated(updated_event) => {
                    new_state.items = updated_event.updated_items.to_owned();
                }
                OrderEvent::Cancelled(_) => {
                    new_state.is_cancelled = true;
                }
            }
            new_state
        }),
        // Initial state
        initial_state: Box::new(|| OrderState {
            order_id: 0,
            customer_name: "".to_string(),
            items: Vec::new(),
            is_cancelled: false,
        }),
    }
}
  • 视图 - 代表处理事件并将事件转换为适合查询的规范化状态的算法。
// The state of the view component
struct OrderViewState {
    order_id: u32,
    customer_name: String,
    items: Vec<String>,
    is_cancelled: bool,
}

fn view<'a>() -> View<'a, OrderViewState, OrderEvent> {
    View {
        // Evolve the state of the `view` based on the event(s)
        evolve: Box::new(|state, event| {
            let mut new_state = state.clone();
            // Exhaustive pattern matching on the event
            match event {
                OrderEvent::Created(created_event) => {
                    new_state.order_id = created_event.order_id;
                    new_state.customer_name = created_event.customer_name.to_owned();
                    new_state.items = created_event.items.to_owned();
                }
                OrderEvent::Updated(updated_event) => {
                    new_state.items = updated_event.updated_items.to_owned();
                }
                OrderEvent::Cancelled(_) => {
                    new_state.is_cancelled = true;
                }
            }
            new_state
        }),
        // Initial state
        initial_state: Box::new(|| OrderViewState {
            order_id: 0,
            customer_name: "".to_string(),
            items: Vec::new(),
            is_cancelled: false,
        }),
    }
}

应用层

逻辑执行将由外部组件执行,这些组件使用域组件(决策者、视图)来执行计算。这些组件将负责获取和保存数据(存储库)。

图中的箭头(适配器->应用->域)显示了依赖关系的方向。请注意,所有依赖关系都指向内部,且域不依赖于任何人或任何事物。

将这些决策从核心域模型中推出来非常有价值。能够推迟它们是良好架构的标志。

事件源聚合

    let repository = InMemoryOrderEventRepository::new();
    let aggregate = EventSourcedAggregate::new(repository, decider());

    let command = OrderCommand::Create(CreateOrderCommand {
        order_id: 1,
        customer_name: "John Doe".to_string(),
        items: vec!["Item 1".to_string(), "Item 2".to_string()],
    });

    let result = aggregate.handle(&command).await;
    assert!(result.is_ok());
    assert_eq!(
        result.unwrap(),
        [(
            OrderEvent::Created(OrderCreatedEvent {
                order_id: 1,
                customer_name: "John Doe".to_string(),
                items: vec!["Item 1".to_string(), "Item 2".to_string()],
            }),
            0
        )]
    );

状态存储聚合

    let repository = InMemoryOrderStateRepository::new();
    let aggregate = StateStoredAggregate::new(repository, decider());

    let command = OrderCommand::Create(CreateOrderCommand {
        order_id: 1,
        customer_name: "John Doe".to_string(),
        items: vec!["Item 1".to_string(), "Item 2".to_string()],
    });
    let result = aggregate.handle(&command).await;
    assert!(result.is_ok());
    assert_eq!(
        result.unwrap(),
        (
            OrderState {
                order_id: 1,
                customer_name: "John Doe".to_string(),
                items: vec!["Item 1".to_string(), "Item 2".to_string()],
                is_cancelled: false,
            },
            0
        )
    );

无畏并发

将程序中的计算拆分为多个线程以同时运行多个任务可以提高性能。然而,使用线程编程有困难的名声。Rust的类型系统和所有权模型保证了线程安全。

聚合并发执行的示例

async fn es_test() {
    let repository = InMemoryOrderEventRepository::new();
    let aggregate = Arc::new(EventSourcedAggregate::new(repository, decider()));
    // Makes a clone of the Arc pointer. This creates another pointer to the same allocation, increasing the strong reference count.
    let aggregate2 = Arc::clone(&aggregate);

    // Lets spawn two threads to simulate two concurrent requests
    let handle1 = thread::spawn(|| async move {
        let command = OrderCommand::Create(CreateOrderCommand {
            order_id: 1,
            customer_name: "John Doe".to_string(),
            items: vec!["Item 1".to_string(), "Item 2".to_string()],
        });

        let result = aggregate.handle(&command).await;
        assert!(result.is_ok());
        assert_eq!(
            result.unwrap(),
            [(
                OrderEvent::Created(OrderCreatedEvent {
                    order_id: 1,
                    customer_name: "John Doe".to_string(),
                    items: vec!["Item 1".to_string(), "Item 2".to_string()],
                }),
                0
            )]
        );
        let command = OrderCommand::Update(UpdateOrderCommand {
            order_id: 1,
            new_items: vec!["Item 3".to_string(), "Item 4".to_string()],
        });
        let result = aggregate.handle(&command).await;
        assert!(result.is_ok());
        assert_eq!(
            result.unwrap(),
            [(
                OrderEvent::Updated(OrderUpdatedEvent {
                    order_id: 1,
                    updated_items: vec!["Item 3".to_string(), "Item 4".to_string()],
                }),
                1
            )]
        );
        let command = OrderCommand::Cancel(CancelOrderCommand { order_id: 1 });
        let result = aggregate.handle(&command).await;
        assert!(result.is_ok());
        assert_eq!(
            result.unwrap(),
            [(
                OrderEvent::Cancelled(OrderCancelledEvent { order_id: 1 }),
                2
            )]
        );
    });

    let handle2 = thread::spawn(|| async move {
        let command = OrderCommand::Create(CreateOrderCommand {
            order_id: 2,
            customer_name: "John Doe".to_string(),
            items: vec!["Item 1".to_string(), "Item 2".to_string()],
        });
        let result = aggregate2.handle(&command).await;
        assert!(result.is_ok());
        assert_eq!(
            result.unwrap(),
            [(
                OrderEvent::Created(OrderCreatedEvent {
                    order_id: 2,
                    customer_name: "John Doe".to_string(),
                    items: vec!["Item 1".to_string(), "Item 2".to_string()],
                }),
                0
            )]
        );
        let command = OrderCommand::Update(UpdateOrderCommand {
            order_id: 2,
            new_items: vec!["Item 3".to_string(), "Item 4".to_string()],
        });
        let result = aggregate2.handle(&command).await;
        assert!(result.is_ok());
        assert_eq!(
            result.unwrap(),
            [(
                OrderEvent::Updated(OrderUpdatedEvent {
                    order_id: 2,
                    updated_items: vec!["Item 3".to_string(), "Item 4".to_string()],
                }),
                1
            )]
        );
        let command = OrderCommand::Cancel(CancelOrderCommand { order_id: 2 });
        let result = aggregate2.handle(&command).await;
        assert!(result.is_ok());
        assert_eq!(
            result.unwrap(),
            [(
                OrderEvent::Cancelled(OrderCancelledEvent { order_id: 2 }),
                2
            )]
        );
    });

    handle1.join().unwrap().await;
    handle2.join().unwrap().await;
}

您可能会想知道为什么Rust中所有原始类型都不是原子的,以及为什么标准库类型没有默认实现为使用Arc<T>。原因在于线程安全性会带来性能损失,您只有在真正需要的时候才愿意付出这个代价。

您选择如何运行它! 您可以在单线程、多线程或分布式环境中运行它。

将crate作为您项目的依赖项安装

在您的项目目录中运行以下Cargo命令

cargo add fmodel-rust

或者将以下行添加到您的Cargo.toml文件中

fmodel-rust = "0.7.0"

示例

FModel在其他语言中

进一步阅读

致谢

特别感谢Jérémie Chassaing分享他的研究,以及Adam Dymitruk主持聚会。


Fraktalio用❤️创建

依赖关系

~0.4–1MB
~23K SLoC