#工作流程 #步骤 #yaml #行为 #引擎 #小型 #模型

acts

一个快速、小巧、可扩展的工作流程引擎

9个版本 (破坏性更新)

0.10.4 2024年5月21日
0.10.3 2024年5月21日
0.9.0 2024年3月24日
0.8.0 2024年2月20日
0.1.5 2023年1月5日

#365异步

每月24 次下载

Apache-2.0

1MB
24K SLoC

Acts工作流程引擎

Build Test

acts 是一个快速、小巧、可扩展的工作流程引擎,它提供了基于yml模型执行工作流程的能力。

Yml工作流程模型与传统工作流程不同,例如 bpmn。Yml格式受到GitHub Actions的启发。这个工作流程的主要目的是通过 act 节点创建一个顶层抽象来运行工作流程逻辑并与客户端交互。

此工作流程引擎专注于工作流程逻辑本身和消息分发。复杂业务逻辑将通过 act 通过act消息完成。

关键特性

快速

使用Rust创建库,没有虚拟机,没有数据库依赖。它还提供了 store 功能以启用本地存储。

  1. 与内存存储进行基准测试
load                    time:   [66.438 µs 75.248 µs 84.207 µs]
deploy                  time:   [6.612 µs 17.356 µs 18.282 µs]
start                   time:   [69.952 µs 70.628 µs 71.287 µs]
act                     time:   [7.9698 ms 8.5588 ms 9.0608 ms]

小巧

库大小仅为3MB(无存储),您可以使用适配器创建外部存储。

可扩展

支持扩展插件,支持创建外部存储,请参阅 src/store/db/local 下的代码。

安装

获取 acts 最新版本的 easiest 方法是通过 cargo 安装。

cargo add acts

快速入门

  1. 通过 engine.new() 创建并启动工作流程引擎。
  2. 加载yaml模型以创建 workflow
  3. 通过 engine.manager() 在步骤2中部署模型。
  4. 通过 engine.channel() 配置事件。
  5. 通过 engine.executor() 启动工作流程。
use acts::{Engine, Vars, Workflow};

#[tokio::main]
async fn main() {
    let engine = Engine::new();

    let text = include_str!("../examples/simple/model.yml");
    let workflow = Workflow::from_yml(text).unwrap();

    let executor = engine.executor();
    engine.manager().deploy(&workflow).expect("fail to deploy workflow");

    let mut vars = Vars::new();
    vars.insert("input".into(), 3.into());
    vars.insert("pid".to_string(), "w1".into());
    executor.start(&workflow.id, &vars).expect("fail to start workflow");;
    let chan = engine.channel();

    chan.on_start(|e| {
        println!("start: {}", e.start_time);
    });

    chan.on_message(|e| {
        println!("message: {:?}", e);
    });

    chan.on_complete(|e| {
        println!("outputs: {:?} end_time: {}", e.outputs, e.end_time);
    });

    chan.on_error(|e| {
        println!("error on proc id: {} model id: {}", e.pid, e.model.id);
    });
}

示例

请参阅 examples

模型使用

该模型是一个YAML格式文件,其中包含不同类型的节点,包括WorkflowBranchStep以及Act。每个工作流程可以拥有更多步骤,每个步骤可以拥有更多分支。在步骤中,由多个操作组成以完成步骤任务,例如'req'、'msg'、'each'、'chain'、'set'、'expose'等。这些操作负责与客户端交互或简单地执行单个任务。

run属性是基于javascript的脚本。可以设置inputs属性以初始化每个节点中的变量。

name: model name
inputs:
  value: 0
steps:
  - name: step 1
    run: |
      print("step 1")

  - name: step 2
    branches:
      - name: branch 1
        if: ${ $("value") > 100 }
        run: |
            print("branch 1");

      - name: branch 2
        if: ${ $("value") <= 100 }
        steps:
            - name: step 3
              run: |
                print("branch 2")      

输入

Workflow中,可以设置inputs以初始化工作流程变量。

name: model name
inputs:
  a: 100
steps:
  - name: step1
    run: |
      env.set("output_key", "output value");

也可以通过启动工作流程来设置输入。

use acts::{Engine, Vars, Workflow};

#[tokio::main]
async fn main() {
  let engine = Engine::new();
  let executor = engine.executor();

  let mut vars = Vars::new();
  vars.insert("input".into(), 3.into());
  vars.insert("pid".to_string(), "w2".into());

  executor.start("m1", &vars);
}

输出

Workflow中,可以设置outputs以输出环境变量。

name: model name
outputs:
  output_key:
steps:
  - name: step1
    run: |
      env.set("output_key", "output value");

设置

workflow节点中,可以通过setup设置操作。

操作msg用于向客户端发送消息。有关更多操作,请参阅以下注释

name: model name
setup:
setup:
  # set the data by !set
  - !set
    a: ["u1", "u2"]
    v: 10

  # checks the condition and enters into the 'then' acts
  - !if
    on: $("v") > 0
    then:
      - !msg
        id: msg2
  # on step created
  - !on_created
    - !msg
      id: msg3

  # on workflow completed
  - !on_completed
    - !msg
      id: msg4
  # on act created
  - !on_before_update
    - !msg
      id: msg5
  # on act completed
  - !on_updated
    - !msg
      id: msg5

  # on step created or completed
  - !on_step
      - !msg
        id: msg3
  # on error catch
  - !on_error_catch
    - err: err1
      then:
        - !req
          id: act3
  # expose the data with special keys
  - !expose
      out:

步骤

使用steps将步骤添加到工作流程中

name: model name
steps:
  - id: step1
    name: step 1
  - id: step2
    name: step 2

step.setup

使用setup在步骤创建时设置一些操作。

操作包括'req'、'msg'、'set'、'expose'、'chain'、'each'和'if',还包括一些钩子,例如'on_created'、'on_completed'、'on_before_update'、'on_updated'、'on_timeout'和'on_error_catch'。

name: a setup example
id: setup
steps:
  - name: step 1
    id: step1
    setup:
  
      # set the data by !set
      - !set
        a: ["u1", "u2"]
        v: 10
      # send message with key msg1
      - !msg
        id: msg1
        inputs:
          data: ${ $("a") }

      # chains and runs 'run' one by one by 'in' data
      - !chain
        in: $("a")
        run:
          - !req
            id: act1

      # each the var 'a'
      - !each
        in: $("a")
        run:
          # the each will generate two !req with `act_index`  and `act_value`
          # the `act_index` is the each index. It is 0 and 1 in this example
          # the `act_value` is the each data. It is 'u1' and 'u2' in this example
          - !req
            id: act2
      # checks the condition and enters into the 'then' acts
      - !if
        on: $("v") > 0
        then:
          - !msg
            id: msg2
      # on step created
      - !on_created
        - !msg
          id: msg3

      # on step completed
      - !on_completed
        - !msg
          id: msg4
      # on act created
      - !on_before_update
        - !msg
          id: msg5
      # on act completed
      - !on_updated
        - !msg
          id: msg5

      # on step created or completed
      - !on_step
          - !msg
            id: msg3
      # on error catch
      - !on_error_catch
        - err: err1
          then:
            - !req
              id: act3
      # on timeout 
      - !on_timeout
        - on: 6h
          then:
            - !req
              id: act3
      # expose the data with special keys
      - !expose
         out:
  - name: final
    id: final

有关更多操作示例,请参阅examples

step.catches

使用catches来捕获step错误。

name: a catches example
id: catches
steps:
  - name: prepare
    id: prepare
    acts:
      - !req
        id: init
  - name: step1
    id: step1
    acts:
      - !req
        id: act1
    # catch the step errors
    catches:
      - id: catch1
        err: err1
        then:
          - !req
            id: act2
      - id: catch2
        err: err2
        then:
          - !req
            id: act3
      - id: catch_others

  - name: final
    id: final

step.timeout

使用timeout来检查任务时间。

name: a timeout example
id: timeout
steps:
  - name: prepare
    id: prepare
    acts:
      - !req
        id: init
  - name: step1
    id: step1
    acts:
      - !req
        id: act1
    # check timeout rules
    timeout:
      # 1d means one day
      # triggers act2 when timeout
      - on: 1d
        then:
          - !req
            id: act2
      # 2h means two hours
      # triggers act3 when timeout
      - on: 2h
        then:
          - !req
            id: act3

  - name: final
    id: final

分支

使用branches将分支添加到步骤中

name: model name
steps:
  - id: step1
    name: step 1
    branches:
      - id: b1
        if: $("v") > 0
        steps: 
          - name: step a
          - name: step b
      - id: b2
        else: true
        steps:
          - name: step c
          - name: step d
  - id: step2
    name: step 2

操作

使用acts创建操作以与客户端交互,或者通过几种操作类型完成特殊功能。

name: model name
outputs:
  output_key:
steps:
  - name: step1
    acts:
      # send message to client
      - !msg
        id: msg1
        inputs:
          a: 1
          
      # req is a act to send a request from acts server
      # the client can complete the act and pass data to serever
      - !req
        id: init
        name: my act init

        # passes data to the act
        inputs:
          a: 6
        
        # exposes the data to step
        outputs:
          a:

        # limits the data keys when acting
        rets:
          a:

有关更多操作示例,请参阅examples

存储

可以使用store启用存储功能,该功能使用duckdb来构建。

要启用store功能

[dependencies]
acts = { version = "*", features = ["store"] }

对于外部存储

use acts::{Engine, Builder, data::{Model, Proc, Task, Package, Message}, DbSet, StoreAdapter};
use std::sync::Arc;

#[derive(Clone)]
struct TestStore;

impl StoreAdapter for TestStore {
    fn models(&self) -> Arc<dyn DbSet<Item = Model>> {
        todo!()
    }
    fn procs(&self) -> Arc<dyn DbSet<Item =Proc>> {
        todo!()
    }
    fn tasks(&self) -> Arc<dyn DbSet<Item =Task>> {
        todo!()
    }
    fn packages(&self) -> Arc<dyn DbSet<Item =Package>> {
        todo!()
    }
    fn messages(&self) -> Arc<dyn DbSet<Item =Message>> {
        todo!()
    }
    fn init(&self) {}
    fn close(&self) {}
}

#[tokio::main]
async fn main() {
   // set custom store
 let store = TestStore;
 let engine = Builder::new().store(&store).build();
}

acts引擎集成了rquickjs运行时来执行包,这可以扩展引擎的能力。有关更多信息,请参阅示例package

Acts-Server

创建一个基于grpc的acts-server以与客户端交互。请参阅更多内容acts-server

Acts-Channel

通道用于与服务器交互。操作包括'deploy'、'start'、'push'、'remove'、'complete'、'back'、'cancel'、'skip'、'abort'和'error'。

请参阅更多内容acts-channel

依赖

~21–37MB
~600K SLoC