9个版本 (破坏性更新)
0.10.4 | 2024年5月21日 |
---|---|
0.10.3 | 2024年5月21日 |
0.9.0 | 2024年3月24日 |
0.8.0 | 2024年2月20日 |
0.1.5 | 2023年1月5日 |
#365 在 异步
每月24 次下载
1MB
24K SLoC
Acts工作流程引擎
acts
是一个快速、小巧、可扩展的工作流程引擎,它提供了基于yml模型执行工作流程的能力。
Yml工作流程模型与传统工作流程不同,例如 bpmn
。Yml格式受到GitHub Actions的启发。这个工作流程的主要目的是通过 act
节点创建一个顶层抽象来运行工作流程逻辑并与客户端交互。
此工作流程引擎专注于工作流程逻辑本身和消息分发。复杂业务逻辑将通过 act
通过act消息完成。
关键特性
快速
使用Rust创建库,没有虚拟机,没有数据库依赖。它还提供了 store
功能以启用本地存储。
- 与内存存储进行基准测试
load time: [66.438 µs 75.248 µs 84.207 µs]
deploy time: [6.612 µs 17.356 µs 18.282 µs]
start time: [69.952 µs 70.628 µs 71.287 µs]
act time: [7.9698 ms 8.5588 ms 9.0608 ms]
小巧
库大小仅为3MB(无存储),您可以使用适配器创建外部存储。
可扩展
支持扩展插件,支持创建外部存储,请参阅 src/store/db/local
下的代码。
安装
获取 acts
最新版本的 easiest 方法是通过 cargo
安装。
cargo add acts
快速入门
- 通过
engine.new()
创建并启动工作流程引擎。 - 加载yaml模型以创建
workflow
。 - 通过
engine.manager()
在步骤2中部署模型。 - 通过
engine.channel()
配置事件。 - 通过
engine.executor()
启动工作流程。
use acts::{Engine, Vars, Workflow};
#[tokio::main]
async fn main() {
let engine = Engine::new();
let text = include_str!("../examples/simple/model.yml");
let workflow = Workflow::from_yml(text).unwrap();
let executor = engine.executor();
engine.manager().deploy(&workflow).expect("fail to deploy workflow");
let mut vars = Vars::new();
vars.insert("input".into(), 3.into());
vars.insert("pid".to_string(), "w1".into());
executor.start(&workflow.id, &vars).expect("fail to start workflow");;
let chan = engine.channel();
chan.on_start(|e| {
println!("start: {}", e.start_time);
});
chan.on_message(|e| {
println!("message: {:?}", e);
});
chan.on_complete(|e| {
println!("outputs: {:?} end_time: {}", e.outputs, e.end_time);
});
chan.on_error(|e| {
println!("error on proc id: {} model id: {}", e.pid, e.model.id);
});
}
示例
请参阅 examples
模型使用
该模型是一个YAML格式文件,其中包含不同类型的节点,包括Workflow
、Branch
、Step
以及Act
。每个工作流程可以拥有更多步骤,每个步骤可以拥有更多分支。在步骤中,由多个操作组成以完成步骤任务,例如'req'、'msg'、'each'、'chain'、'set'、'expose'等。这些操作负责与客户端交互或简单地执行单个任务。
run
属性是基于javascript
的脚本。可以设置inputs
属性以初始化每个节点中的变量。
name: model name
inputs:
value: 0
steps:
- name: step 1
run: |
print("step 1")
- name: step 2
branches:
- name: branch 1
if: ${ $("value") > 100 }
run: |
print("branch 1");
- name: branch 2
if: ${ $("value") <= 100 }
steps:
- name: step 3
run: |
print("branch 2")
输入
在Workflow
中,可以设置inputs
以初始化工作流程变量。
name: model name
inputs:
a: 100
steps:
- name: step1
run: |
env.set("output_key", "output value");
也可以通过启动工作流程来设置输入。
use acts::{Engine, Vars, Workflow};
#[tokio::main]
async fn main() {
let engine = Engine::new();
let executor = engine.executor();
let mut vars = Vars::new();
vars.insert("input".into(), 3.into());
vars.insert("pid".to_string(), "w2".into());
executor.start("m1", &vars);
}
输出
在Workflow
中,可以设置outputs
以输出环境变量。
name: model name
outputs:
output_key:
steps:
- name: step1
run: |
env.set("output_key", "output value");
设置
在workflow
节点中,可以通过setup
设置操作。
操作msg
用于向客户端发送消息。有关更多操作,请参阅以下注释
name: model name
setup:
setup:
# set the data by !set
- !set
a: ["u1", "u2"]
v: 10
# checks the condition and enters into the 'then' acts
- !if
on: $("v") > 0
then:
- !msg
id: msg2
# on step created
- !on_created
- !msg
id: msg3
# on workflow completed
- !on_completed
- !msg
id: msg4
# on act created
- !on_before_update
- !msg
id: msg5
# on act completed
- !on_updated
- !msg
id: msg5
# on step created or completed
- !on_step
- !msg
id: msg3
# on error catch
- !on_error_catch
- err: err1
then:
- !req
id: act3
# expose the data with special keys
- !expose
out:
步骤
使用steps
将步骤添加到工作流程中
name: model name
steps:
- id: step1
name: step 1
- id: step2
name: step 2
step.setup
使用setup
在步骤创建时设置一些操作。
操作包括'req'、'msg'、'set'、'expose'、'chain'、'each'和'if',还包括一些钩子,例如'on_created'、'on_completed'、'on_before_update'、'on_updated'、'on_timeout'和'on_error_catch'。
name: a setup example
id: setup
steps:
- name: step 1
id: step1
setup:
# set the data by !set
- !set
a: ["u1", "u2"]
v: 10
# send message with key msg1
- !msg
id: msg1
inputs:
data: ${ $("a") }
# chains and runs 'run' one by one by 'in' data
- !chain
in: $("a")
run:
- !req
id: act1
# each the var 'a'
- !each
in: $("a")
run:
# the each will generate two !req with `act_index` and `act_value`
# the `act_index` is the each index. It is 0 and 1 in this example
# the `act_value` is the each data. It is 'u1' and 'u2' in this example
- !req
id: act2
# checks the condition and enters into the 'then' acts
- !if
on: $("v") > 0
then:
- !msg
id: msg2
# on step created
- !on_created
- !msg
id: msg3
# on step completed
- !on_completed
- !msg
id: msg4
# on act created
- !on_before_update
- !msg
id: msg5
# on act completed
- !on_updated
- !msg
id: msg5
# on step created or completed
- !on_step
- !msg
id: msg3
# on error catch
- !on_error_catch
- err: err1
then:
- !req
id: act3
# on timeout
- !on_timeout
- on: 6h
then:
- !req
id: act3
# expose the data with special keys
- !expose
out:
- name: final
id: final
有关更多操作示例,请参阅examples
step.catches
使用catches
来捕获step
错误。
name: a catches example
id: catches
steps:
- name: prepare
id: prepare
acts:
- !req
id: init
- name: step1
id: step1
acts:
- !req
id: act1
# catch the step errors
catches:
- id: catch1
err: err1
then:
- !req
id: act2
- id: catch2
err: err2
then:
- !req
id: act3
- id: catch_others
- name: final
id: final
step.timeout
使用timeout
来检查任务时间。
name: a timeout example
id: timeout
steps:
- name: prepare
id: prepare
acts:
- !req
id: init
- name: step1
id: step1
acts:
- !req
id: act1
# check timeout rules
timeout:
# 1d means one day
# triggers act2 when timeout
- on: 1d
then:
- !req
id: act2
# 2h means two hours
# triggers act3 when timeout
- on: 2h
then:
- !req
id: act3
- name: final
id: final
分支
使用branches
将分支添加到步骤中
name: model name
steps:
- id: step1
name: step 1
branches:
- id: b1
if: $("v") > 0
steps:
- name: step a
- name: step b
- id: b2
else: true
steps:
- name: step c
- name: step d
- id: step2
name: step 2
操作
使用acts
创建操作以与客户端交互,或者通过几种操作类型完成特殊功能。
name: model name
outputs:
output_key:
steps:
- name: step1
acts:
# send message to client
- !msg
id: msg1
inputs:
a: 1
# req is a act to send a request from acts server
# the client can complete the act and pass data to serever
- !req
id: init
name: my act init
# passes data to the act
inputs:
a: 6
# exposes the data to step
outputs:
a:
# limits the data keys when acting
rets:
a:
有关更多操作示例,请参阅examples
存储
可以使用store
启用存储功能,该功能使用duckdb
来构建。
要启用store
功能
[dependencies]
acts = { version = "*", features = ["store"] }
对于外部存储
use acts::{Engine, Builder, data::{Model, Proc, Task, Package, Message}, DbSet, StoreAdapter};
use std::sync::Arc;
#[derive(Clone)]
struct TestStore;
impl StoreAdapter for TestStore {
fn models(&self) -> Arc<dyn DbSet<Item = Model>> {
todo!()
}
fn procs(&self) -> Arc<dyn DbSet<Item =Proc>> {
todo!()
}
fn tasks(&self) -> Arc<dyn DbSet<Item =Task>> {
todo!()
}
fn packages(&self) -> Arc<dyn DbSet<Item =Package>> {
todo!()
}
fn messages(&self) -> Arc<dyn DbSet<Item =Message>> {
todo!()
}
fn init(&self) {}
fn close(&self) {}
}
#[tokio::main]
async fn main() {
// set custom store
let store = TestStore;
let engine = Builder::new().store(&store).build();
}
包
acts
引擎集成了rquickjs
运行时来执行包,这可以扩展引擎的能力。有关更多信息,请参阅示例package
Acts-Server
创建一个基于grpc的acts-server以与客户端交互。请参阅更多内容acts-server
Acts-Channel
通道用于与服务器交互。操作包括'deploy'、'start'、'push'、'remove'、'complete'、'back'、'cancel'、'skip'、'abort'和'error'。
请参阅更多内容acts-channel
依赖
~21–37MB
~600K SLoC