7个版本 (破坏性更新)
0.7.0 | 2023年12月29日 |
---|---|
0.6.0 | 2023年12月29日 |
0.5.0 | 2023年12月15日 |
0.4.0 | 2023年12月3日 |
0.1.0 | 2023年9月16日 |
#7 in #CQRS
每月下载 41 次
82KB
657 行
f(
model)
- 使用Rust进行函数式领域建模
从版本0.7.0+开始,该库使用
async fn
in Traits特性,该特性目前仅在稳定版Rust 1.75.0+中可用。
如果您使用的是较旧的Rust版本,请使用库的0.6.0版本。它依赖于
async-trait
crate。0.6.0版本不再维护,仅针对安全问题和错误进行修补。
当您开发信息系统来自动化业务活动时,您正在对业务进行建模。您设计的抽象、实现的行为和构建的用户界面交互都反映了业务——它们共同构成了领域模型。
IOR<库,灵感>
此项目可以用作库或灵感,或两者兼具。它提供足够的战术领域驱动设计模式,针对事件溯源和CQRS进行优化。
抽象和泛化
抽象可以隐藏不相关的细节,并使用名称来引用对象。它强调对象是什么或做什么,而不是它如何表示或如何工作。
泛化通过用单个结构替换执行类似功能的多个实体来减少复杂性。
抽象和泛化通常一起使用。通过参数化抽象来泛化,以提供更优秀的效用。
Box<dynFn(&C,&S) -> Vec<E>>
type DecideFunction<'a,C,S,E> = Box<dynFn(&C,&S) -> Vec<E> + 'a + Send + Sync>
在更高的抽象层次上,任何信息系统都负责处理意图(Command
)并根据当前的State
产生新的事实(Events
)
- 给定当前
State/S
的输入, - 当
Command/C
处理 的输入, - 期望
Vec
新的Events/E
在 输出 上发布/发射
Box<dynFn(&S,&E) ->S>
type EvolveFunction<'a,S,E> = Box<dynFn(&S,&E) ->S+ 'a + Send + Sync>
新状态总是从当前状态 S
和当前事件 E
中演变而来。
- 给定当前
State/S
的输入, - 当处理
Event/E
时(输入上) - 期望在输出上发布新的
State/S
有两个函数被封装在一个数据类型类(代数数据结构)中,该类有三个泛型参数进行泛化。
pub struct Decider<'a, C: 'a, S: 'a, E: 'a> {
pub decide: DecideFunction<'a, C, S, E>,
pub evolve: EvolveFunction<'a, S, E>,
pub initial_state: InitialStateFunction<'a, S>,
}
Decider
是最重要的数据类型,但并非唯一。还有其他一些。
Decider
Decider
是一个表示主要决策算法的代数类型/结构体。它属于域层。它有三个泛型参数 C
、S
、E
,表示 Decider
可能包含或使用的值的类型。由于这些类型不影响其行为,Decider
可以针对任何类型 C
、S
或 E
进行特殊化。例如,当 C
=Int 或 C
=YourCustomType 时,Decider
的行为相同。
Decider
是一个纯域组件。
C
- 命令S
- 状态E
- 事件
pub type DecideFunction<'a, C, S, E> = Box<dyn Fn(&C, &S) -> Vec<E> + 'a + Send + Sync>;
pub type EvolveFunction<'a, S, E> = Box<dyn Fn(&S, &E) -> S + 'a + Send + Sync>;
pub type InitialStateFunction<'a, S> = Box<dyn Fn() -> S + 'a + Send + Sync>;
pub struct Decider<'a, C: 'a, S: 'a, E: 'a> {
pub decide: DecideFunction<'a, C, S, E>,
pub evolve: EvolveFunction<'a, S, E>,
pub initial_state: InitialStateFunction<'a, S>,
}
此外,还引入了 initialState
以获得对 Decider 初始状态的更多控制。
事件源聚合
事件源聚合 使用/委托一个 Decider
来处理命令并生成新事件。它属于应用层。为了处理命令,聚合需要通过 EventRepository.fetchEvents
异步函数获取当前状态(表示为事件的列表/向量),然后将命令委托给可以生成新事件的决策者。然后通过 EventRepository.save
异步函数存储生成的事件。
它是事件源信息系统的一个形式化。
状态存储聚合
状态存储聚合 使用/委托一个 Decider
来处理命令并生成新状态。它属于应用层。为了处理命令,聚合首先需要通过 StateRepository.fetchState
异步函数获取当前状态,然后将命令委托给可以生成新状态的决策者。然后通过 StateRepository.save
异步函数存储新状态。
它是状态存储信息系统的一个形式化。
视图
View
是一个表示事件处理算法的代数类型,负责将事件转换为非规范化状态,这对于查询更合适。它属于域层。通常用于创建 CQRS 模式的视图/查询端。显然,CQRS 的命令端通常是一个事件源聚合。
它有两个泛型参数 S
、E
,表示 View
可能包含或使用的值的类型。由于这些类型不影响其行为,View
可以针对任何类型的 S
、E
进行特殊化。例如,对于 E
=Int 或 E
=YourCustomType,View
的行为相同。
View
是一个纯域组件。
S
- 状态E
- 事件
pub struct View<'a, S: 'a, E: 'a> {
pub evolve: EvolveFunction<'a, S, E>,
pub initial_state: InitialStateFunction<'a, S>,
}
物化视图
物化视图(Materialized view)使用/委托了一个View
来处理类型为E
的事件,并作为结果维护非规范化投影的状态。本质上,它代表了CQRS模式的查询/视图方面。它属于应用层。
为了处理事件,物化视图首先需要通过ViewStateRepository.fetchState
延迟函数获取当前状态,然后将事件委托给视图,视图可以产生新的状态。新的状态随后通过ViewStateRepository.save
延迟函数存储。
代数数据类型
在Rust中,我们可以使用ADTs以函数式方式来建模应用域实体和关系,清晰地定义可能的值和状态集合。Rust主要有两种ADT类型:enum
和struct
。
enum
用于定义一个可以取多个可能变体的类型 - 模拟一个sum/OR
类型。struct
用于表达一个具有命名字段的类型 - 模拟一个product/AND
类型。
ADTs将有助于
- 在代码中准确地表示业务域
- 强制执行正确性
- 减少错误的可能性。
在FModel中,我们广泛使用ADTs来建模数据。
C
/ 命令 / 改变系统状态的意图
// models Sum/Or type / multiple possible variants
pub enum OrderCommand {
Create(CreateOrderCommand),
Update(UpdateOrderCommand),
Cancel(CancelOrderCommand),
}
// models Product/And type / a concrete variant, consisting of named fields
pub struct CreateOrderCommand {
pub order_id: u32,
pub customer_name: String,
pub items: Vec<String>,
}
// models Product/And type / a concrete variant, consisting of named fields
pub struct UpdateOrderCommand {
pub order_id: u32,
pub new_items: Vec<String>,
}
// models Product/And type / a concrete variant, consisting of named fields
#[derive(Debug)]
pub struct CancelOrderCommand {
pub order_id: u32,
}
E
/ 事件 / 事实
// models Sum/Or type / multiple possible variants
pub enum OrderEvent {
Created(OrderCreatedEvent),
Updated(OrderUpdatedEvent),
Cancelled(OrderCancelledEvent),
}
// models Product/And type / a concrete variant, consisting of named fields
pub struct OrderCreatedEvent {
pub order_id: u32,
pub customer_name: String,
pub items: Vec<String>,
}
// models Product/And type / a concrete variant, consisting of named fields
pub struct OrderUpdatedEvent {
pub order_id: u32,
pub updated_items: Vec<String>,
}
// models Product/And type / a concrete variant, consisting of named fields
pub struct OrderCancelledEvent {
pub order_id: u32,
}
S
/ 状态 / 系统聚合/实体的当前状态
struct OrderState {
order_id: u32,
customer_name: String,
items: Vec<String>,
is_cancelled: bool,
}
建模我们域的行为
- 代数数据类型构成了我们的实体(命令、状态和事件)的结构。
- 函数/lambda提供了以组合方式操作实体的代数,有效地建模行为。
这导致设计模块化,并清晰地分离了实体的结构和实体的函数/行为。
Fmodel库提供了通用的抽象组件,可以针对您特定的案例/预期行为进行专业化。
- 决策者 - 表示主要决策算法的数据类型。
fn decider<'a>() -> Decider<'a, OrderCommand, OrderState, OrderEvent> {
Decider {
// Your decision logic goes here.
decide: Box::new(|command, state| match command {
// Exhaustive pattern matching on the command
OrderCommand::Create(create_cmd) => {
vec![OrderEvent::Created(OrderCreatedEvent {
order_id: create_cmd.order_id,
customer_name: create_cmd.customer_name.to_owned(),
items: create_cmd.items.to_owned(),
})]
}
OrderCommand::Update(update_cmd) => {
// Your validation logic goes here
if state.order_id == update_cmd.order_id {
vec![OrderEvent::Updated(OrderUpdatedEvent {
order_id: update_cmd.order_id,
updated_items: update_cmd.new_items.to_owned(),
})]
} else {
// In case of validation failure, return empty list of events or error event
vec![]
}
}
OrderCommand::Cancel(cancel_cmd) => {
// Your validation logic goes here
if state.order_id == cancel_cmd.order_id {
vec![OrderEvent::Cancelled(OrderCancelledEvent {
order_id: cancel_cmd.order_id,
})]
} else {
// In case of validation failure, return empty list of events or error event
vec![]
}
}
}),
// Evolve the state based on the event(s)
evolve: Box::new(|state, event| {
let mut new_state = state.clone();
// Exhaustive pattern matching on the event
match event {
OrderEvent::Created(created_event) => {
new_state.order_id = created_event.order_id;
new_state.customer_name = created_event.customer_name.to_owned();
new_state.items = created_event.items.to_owned();
}
OrderEvent::Updated(updated_event) => {
new_state.items = updated_event.updated_items.to_owned();
}
OrderEvent::Cancelled(_) => {
new_state.is_cancelled = true;
}
}
new_state
}),
// Initial state
initial_state: Box::new(|| OrderState {
order_id: 0,
customer_name: "".to_string(),
items: Vec::new(),
is_cancelled: false,
}),
}
}
- 视图 - 代表处理事件并将事件转换为适合查询的规范化状态的算法。
// The state of the view component
struct OrderViewState {
order_id: u32,
customer_name: String,
items: Vec<String>,
is_cancelled: bool,
}
fn view<'a>() -> View<'a, OrderViewState, OrderEvent> {
View {
// Evolve the state of the `view` based on the event(s)
evolve: Box::new(|state, event| {
let mut new_state = state.clone();
// Exhaustive pattern matching on the event
match event {
OrderEvent::Created(created_event) => {
new_state.order_id = created_event.order_id;
new_state.customer_name = created_event.customer_name.to_owned();
new_state.items = created_event.items.to_owned();
}
OrderEvent::Updated(updated_event) => {
new_state.items = updated_event.updated_items.to_owned();
}
OrderEvent::Cancelled(_) => {
new_state.is_cancelled = true;
}
}
new_state
}),
// Initial state
initial_state: Box::new(|| OrderViewState {
order_id: 0,
customer_name: "".to_string(),
items: Vec::new(),
is_cancelled: false,
}),
}
}
应用层
逻辑执行将由外部组件执行,这些组件使用域组件(决策者、视图)来执行计算。这些组件将负责获取和保存数据(存储库)。
图中的箭头(适配器->应用->域)显示了依赖关系的方向。请注意,所有依赖关系都指向内部,且域不依赖于任何人或任何事物。
将这些决策从核心域模型中推出来非常有价值。能够推迟它们是良好架构的标志。
事件源聚合
let repository = InMemoryOrderEventRepository::new();
let aggregate = EventSourcedAggregate::new(repository, decider());
let command = OrderCommand::Create(CreateOrderCommand {
order_id: 1,
customer_name: "John Doe".to_string(),
items: vec!["Item 1".to_string(), "Item 2".to_string()],
});
let result = aggregate.handle(&command).await;
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
[(
OrderEvent::Created(OrderCreatedEvent {
order_id: 1,
customer_name: "John Doe".to_string(),
items: vec!["Item 1".to_string(), "Item 2".to_string()],
}),
0
)]
);
状态存储聚合
let repository = InMemoryOrderStateRepository::new();
let aggregate = StateStoredAggregate::new(repository, decider());
let command = OrderCommand::Create(CreateOrderCommand {
order_id: 1,
customer_name: "John Doe".to_string(),
items: vec!["Item 1".to_string(), "Item 2".to_string()],
});
let result = aggregate.handle(&command).await;
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
(
OrderState {
order_id: 1,
customer_name: "John Doe".to_string(),
items: vec!["Item 1".to_string(), "Item 2".to_string()],
is_cancelled: false,
},
0
)
);
无畏并发
将程序中的计算拆分为多个线程以同时运行多个任务可以提高性能。然而,使用线程编程有困难的名声。Rust的类型系统和所有权模型保证了线程安全。
聚合并发执行的示例
async fn es_test() {
let repository = InMemoryOrderEventRepository::new();
let aggregate = Arc::new(EventSourcedAggregate::new(repository, decider()));
// Makes a clone of the Arc pointer. This creates another pointer to the same allocation, increasing the strong reference count.
let aggregate2 = Arc::clone(&aggregate);
// Lets spawn two threads to simulate two concurrent requests
let handle1 = thread::spawn(|| async move {
let command = OrderCommand::Create(CreateOrderCommand {
order_id: 1,
customer_name: "John Doe".to_string(),
items: vec!["Item 1".to_string(), "Item 2".to_string()],
});
let result = aggregate.handle(&command).await;
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
[(
OrderEvent::Created(OrderCreatedEvent {
order_id: 1,
customer_name: "John Doe".to_string(),
items: vec!["Item 1".to_string(), "Item 2".to_string()],
}),
0
)]
);
let command = OrderCommand::Update(UpdateOrderCommand {
order_id: 1,
new_items: vec!["Item 3".to_string(), "Item 4".to_string()],
});
let result = aggregate.handle(&command).await;
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
[(
OrderEvent::Updated(OrderUpdatedEvent {
order_id: 1,
updated_items: vec!["Item 3".to_string(), "Item 4".to_string()],
}),
1
)]
);
let command = OrderCommand::Cancel(CancelOrderCommand { order_id: 1 });
let result = aggregate.handle(&command).await;
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
[(
OrderEvent::Cancelled(OrderCancelledEvent { order_id: 1 }),
2
)]
);
});
let handle2 = thread::spawn(|| async move {
let command = OrderCommand::Create(CreateOrderCommand {
order_id: 2,
customer_name: "John Doe".to_string(),
items: vec!["Item 1".to_string(), "Item 2".to_string()],
});
let result = aggregate2.handle(&command).await;
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
[(
OrderEvent::Created(OrderCreatedEvent {
order_id: 2,
customer_name: "John Doe".to_string(),
items: vec!["Item 1".to_string(), "Item 2".to_string()],
}),
0
)]
);
let command = OrderCommand::Update(UpdateOrderCommand {
order_id: 2,
new_items: vec!["Item 3".to_string(), "Item 4".to_string()],
});
let result = aggregate2.handle(&command).await;
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
[(
OrderEvent::Updated(OrderUpdatedEvent {
order_id: 2,
updated_items: vec!["Item 3".to_string(), "Item 4".to_string()],
}),
1
)]
);
let command = OrderCommand::Cancel(CancelOrderCommand { order_id: 2 });
let result = aggregate2.handle(&command).await;
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
[(
OrderEvent::Cancelled(OrderCancelledEvent { order_id: 2 }),
2
)]
);
});
handle1.join().unwrap().await;
handle2.join().unwrap().await;
}
您可能会想知道为什么Rust中所有原始类型都不是原子的,以及为什么标准库类型没有默认实现为使用Arc<T>
。原因在于线程安全性会带来性能损失,您只有在真正需要的时候才愿意付出这个代价。
您选择如何运行它! 您可以在单线程、多线程或分布式环境中运行它。
将crate作为您项目的依赖项安装
在您的项目目录中运行以下Cargo命令
cargo add fmodel-rust
或者将以下行添加到您的Cargo.toml
文件中
fmodel-rust = "0.7.0"
示例
FModel在其他语言中
进一步阅读
- https://doc.rust-lang.net.cn/book/
- https://fraktalio.com/fmodel/
- https://fraktalio.com/fmodel-ts/
- https://xebia.com/blog/functional-domain-modeling-in-rust-part-1/
致谢
特别感谢Jérémie Chassaing
分享他的研究,以及Adam Dymitruk
主持聚会。
由Fraktalio用❤️创建
依赖关系
~0.4–1MB
~23K SLoC