#task-execution #task #dag #async-parallel #parallel #async

bin+lib dagrs

名为dagrs的DAG引擎,旨在执行具有图状依赖关系的多个任务。它提供高性能和异步执行,为Rust开发者提供一个方便的编程接口。

2个不稳定版本

0.2.0 2023年7月26日
0.1.0 2023年6月23日

#359 in 并发


deml中使用

MIT/Apache

86KB
1K SLoC

Dagrs

dagrs适用于执行具有图状依赖关系的多个任务。dagrs具有高性能和异步执行的特点。它为用户提供了一个方便的编程接口。

dagrs能做什么

dagrs允许用户轻松执行具有复杂图依赖关系的多个任务集。它只需要:用户定义任务并指定任务的依赖关系,然后dagrs就可以在图的拓扑顺序中顺序执行任务。例如

flowchart LR
	A((Task a))-->B
	A-->C
	B((Task b))-->D
	C((Task c))-->D
	B-->F
	C-->E
	D((Task d))-->G
	E((Task e))-->G
	F((Task f))-->G((Task g))

此图表示任务之间的依赖关系,由任务组成的图必须满足以下两个条件

  • 图允许只有一个入度为零和出度为零的点(只允许一个起始任务和一个结束任务)。

  • 图本身是定向的,用户必须确保图中没有环路,即任务的依赖关系不能形成一个闭环,否则引擎将拒绝执行所有任务,例如

    flowchart LR
    	A-->C
    	A((Task 1))-->B
    	subgraph "Task B, C, and D form a loop"
    	B((Task 2))-->C
    	C((Task 3))-->D
    	D((Task 4))-->B
    	end
    

其中,每个任务都可能产生输出,也可能需要某些任务的输出作为其输入。

尝试使用dagrs

dagrs提供了两种基本的任务定义方法,即编程实现任务逻辑和定义yaml配置文件。通过编程实现任务定义将使任务逻辑更加灵活,这也是使用dagrs的主要方法。接下来,我们将详细介绍这两种方法的用法。

确保Rust编译环境可用。

编程实现任务定义

用户需要编程实现Action特质来定义任务的特定逻辑,然后构建一系列DefaultTask。例如:examples/compute_dag.rsDefaultTask是Task特质的默认实现,它有几个强制属性

  • id:由全局ID分配器分配的任务的唯一标识符
  • name:任务的名称
  • predecessor_tasks:此任务的先导任务
  • action:是一种动态类型,用于用户编程中实现Action特质,它表示任务要执行的具体逻辑
use std::sync::Arc;
use dagrs::{
    Action,
    Dag, DefaultTask, EnvVar,log, Input, Output, RunningError,LogLevel
};

struct SimpleAction(usize);
/// Implement the `Action` trait for `SimpleAction`, defining the logic of the `run` function. 
/// The logic here is simply to get the output value (`usize`) of all predecessor tasks and then accumulate.
impl Action for SimpleAction{
    fn run(&self, input: Input,env:Arc<EnvVar>) -> Result<Output,RunningError> {
        let base = env.get::<usize>("base").unwrap();
        let mut sum = self.0;
        input
            .get_iter()
            .for_each(|i| sum += i.get::<usize>().unwrap() * base);
        Ok(Output::new(sum))
    }
}

// Initialize the global logger
log::init_logger(LogLevel::Info,None);
// Generate four tasks.
let a= DefaultTask::new(SimpleAction(10),"Task a");
let mut b=DefaultTask::new(SimpleAction(20),"Task b");
let mut c=DefaultTask::new(SimpleAction(30),"Task c");
let mut d=DefaultTask::new(SimpleAction(40),"Task d");
// Set the precursor for each task.
b.set_predecessors(&[&a]);
c.set_predecessors(&[&a]);
d.set_predecessors(&[&b,&c]);
// Take these four tasks as a Dag.
let mut dag=Dag::with_tasks(vec![a,b,c,d]);
// Set a global environment variable for this dag.
let mut env = EnvVar::new();
env.set("base", 2usize);
dag.set_env(env);
// Begin execution.
assert!(dag.start().unwrap());
// Get execution result
assert_eq!(dag.get_result::<usize>().unwrap(),220);

说明

首先,我们定义SimpleAction并为该结构实现Action特质。在重写的run函数中,我们简单地获取先导任务输出值并乘以环境变量base。然后将乘积结果累加到self.0。

定义特定任务逻辑后,开始创建执行Dag的先决条件:首先初始化全局日志记录器。在这里,我们将日志级别设置为Info,并且不指定日志输出文件,默认让日志输出到控制台。

使用SimpleAction创建一个DefaultTask并给任务一个名称。然后设置任务之间的依赖关系。

然后创建一个Dag并为它分配一个全局环境变量。

最后,我们调用Dagstart函数来执行所有任务。任务执行后,调用get_result函数以获取任务的最终执行结果。

任务形成的图如下所示

flowchart LR;
	A((Task a))-->B;	A-->C;	B((Task b))-->D;	C((Task c))-->D((Task d));

执行顺序是a->c->b->d。

$cargo run
[Start] -> Task a -> Task c -> Task b -> Task d -> [End]
Executing Task[name: Task a]
Task executed successfully. [name: Task a]
Executing Task[name: Task b]
Executing Task[name: Task c]
Task executed successfully. [name: Task b]
Task executed successfully. [name: Task c]
Executing Task[name: Task d]
Task executed successfully. [name: Task d]

Process finished with exit code 0

Yaml配置文件

以下给出一个标准的yaml配置文件格式

dagrs:
  a:
    name: "Task 1"
    after: [ b, c ]
    cmd: echo a
  b:
    name: "Task 2"
    after: [ c, f, g ]
    cmd: echo b
  c:
    name: "Task 3"
    after: [ e, g ]
    cmd: echo c
  d:
    name: "Task 4"
    after: [ c, e ]
    cmd: echo d
  e:
    name: "Task 5"
    after: [ h ]
    cmd: echo e
  f:
    name: "Task 6"
    after: [ g ]
    cmd: python3 ./tests/config/test.py
  g:
    name: "Task 7"
    after: [ h ]
    cmd: node ./tests/config/test.js
  h:
    name: "Task 8"
    cmd: echo h

这些yaml定义的任务项形成一个复杂的依赖图。在yaml配置文件中

  • 文件以dagrs开始
  • abc...类似,是任务的唯一标识符
  • name是必需属性,表示任务的名称
  • after是可选属性(只有第一个执行的任务没有此属性),表示在任务之后执行哪些任务,即指定任务的依赖关系
  • cmd是可选属性。需要指出要执行的命令,例如基本的shell命令:echo hello,执行python脚本python test.py等。用户必须确保执行脚本的解释器存在于环境变量中。CommandAction是脚本的特定执行逻辑的实现,它被放入一个特定的Task类型中。如果用户想要自定义其他类型的脚本任务或实现自己的脚本执行逻辑,他们可以通过编程实现"Action"功能,在解析配置文件时,向解析器提供一个实现Action功能的特定类型,该方法应采用键值对的形式:<id,action>。虽然这比较麻烦,但这种方法将更加灵活。

要解析yaml配置文件,需要编译此项目,要求rust版本>= 1.70

$cargo build --release
$ .\target\release\dagrs.exe --help
Usage: dagrs.exe [OPTIONS] --yaml <YAML>

Options:
      --log-path <LOG_PATH>    Log output file, the default is to print to the terminal
      --yaml <YAML>            yaml configuration file path
      --log-level <LOG_LEVEL>  Log level, the default is Info
  -h, --help                   Print help
  -V, --version                Print version

参数说明

  • 参数yaml代表yaml配置文件的路径,是必需参数。
  • 参数log-path代表日志输出文件的路径,是可选参数。如果未指定,则默认将日志打印到控制台。
  • 参数log-level代表日志输出级别,是可选参数,默认为info。

我们可以在tests/config/correct.yaml中尝试一个已定义的文件。

$./target/release/dagrs --yaml=./tests/config/correct.yaml --log-path=./dagrs.log --log-level=info
[Start] -> Task 8 -> Task 5 -> Task 7 -> Task 6 -> Task 3 -> Task 2 -> Task 1 -> Task 4 -> [End]
Executing Task[name: Task 8]
Executing Task[name: Task 5]
Executing Task[name: Task 7]
Executing Task[name: Task 6]
Executing Task[name: Task 3]
Executing Task[name: Task 2]
Executing Task[name: Task 4]
Executing Task[name: Task 1]

您可以查看一个示例: examples/yaml_dag.rs。实际上,您还可以以编程方式读取yaml配置文件生成任务,这非常简单,只需使用Dag提供的with_yaml函数来解析配置文件。

除了这两种方法之外,dagrs还支持高级任务自定义配置。

  • DefaultTaskTask特质的默认实现。用户也可以自定义任务,并为任务添加更多功能和属性,但他们仍然需要在DefaultTask中具有四个必要的属性。YamlTaskTask具体实现的另一个示例,其源代码可供参考,或参阅example/custom_task.rs
  • 除了yaml类型的配置文件之外,用户还可以提供其他类型的配置文件,但为了允许其他类型的配置文件作为任务进行解析,用户需要实现Parser特质。YamlParser的源代码可供参考,或参阅examples/custom_parser.rs

分析任务执行逻辑

Dag的执行过程大致如下

  • 用户给出一个任务列表tasks。这些任务可以来自配置文件,或由用户编程实现。

  • 根据任务依赖关系内部生成Graph,并根据rely_graph生成执行序列。

    flowchart TD
    	subgraph tasks
    	direction LR
            A-->B
            A-->C
            B-->D
            B-->F
            C-->D
            C-->E
            D-->F
            E-->F
    	end
    	subgraph seq
    	direction LR
    		a(A)-->b(B)-->c(C)-->d(D)-->e(E)-->f(F)
    	end
    	tasks==Generate execution sequence based on topological sort==>seq
    
  • 任务被安排异步开始执行。

  • 任务将等待获取前驱任务执行产生的结果execute_states

    ---
    title: data flow
    ---
    flowchart LR
    	A-->oa((out))
    	oa--input-->B
    	oa--input-->C
    	B-->ob((out))
    	ob--input-->D
    	ob--input-->F
    	C-->oc((out))
    	oc--input-->D
    	oc--input-->E
    	D-->od((out))
    	od--input-->F
    	E-->oe((out))
    	oe--input-->F
    	F-->of((out))
    
  • 如果可以获取前驱任务的结果,检查继续状态can_continue,如果为真,则继续执行定义的逻辑;如果为假,则触发handle_error,并取消后续任务的执行。

  • 所有任务执行完毕后,将继续状态设置为false,这意味着dag的任务无法再次安排执行。

dagrs的任务执行模式是并行。在图中,执行序列被垂直分割线分为四个区间。在任务的整体执行过程中,它将经过四个并行执行阶段。如图所示:首先执行任务A,任务B和C获取A的输出作为它们自己的任务的输入并开始并行执行;同样,任务D和E必须等待它们获得前驱任务的输出后才能开始并行执行;最后,任务F必须等待任务B、D和E执行完毕后才能开始执行。

gantt
	dateFormat X
    axisFormat %s
    title Execution timing
	section Step1
		Task A:0,10
		Task B:0,1
		Task C:0,1
		Task D:0,1
		Task E:0,1
		Task F:0,1
	section Step2
		Task B:10,19
		Task C:10,19
	section Step3
		Task D:19,28
		Task E:19,28
	section Step4
		Task F:28,37

示例

基本功能使用

examples/compute_dag.rs:使用自定义宏生成多个简单任务。

examples/impl_action.rs:定义一个简单的操作,以相同的逻辑构建多个任务。

examples/yaml_dag.rs:使用给定的yaml配置文件启动多个任务。

examples/use_macro.rs:使用dagrs提供的gen_task宏生成多个简单任务。

examples/engine.rs:使用Engine管理具有不同任务类型的不同dag。

高级功能

examples/custom_task.rs:实现Task特质并定义自己的任务类型。

examples/custom_parser.rs:实现 Parser 特性来自定义任务配置文件解析器。

examples/custom_log.rs:实现 Logger 特性来自定义全局日志记录器。

贡献

dagrs 项目依赖于社区贡献,旨在简化入门过程。要开发 dagrs,请克隆仓库,然后安装所有依赖项,运行测试套件并在本地尝试使用。选择一个问题,进行更改,并提交拉取请求以供社区审查。

贡献是什么

以下是关于为该项目贡献的一些指南

  1. 报告问题/错误:如果您在项目中发现任何问题或错误,请通过在问题跟踪器上创建问题来报告它们。详细描述问题,并说明复现步骤。您提供的信息越详细,我调查和修复问题的难度就越小。
  2. 建议增强:如果您有增强或改进此项目的想法,您可以通过在问题跟踪器上创建问题来提出建议。详细说明您的增强及其用例和好处。我非常欣赏经过深思熟虑的增强建议。
  3. 贡献代码:如果您想开发和贡献代码,请按照以下步骤操作
    • 选择一个要解决的问题。标记为 good first issue 的问题适合新手。您还可以寻找标记为 help wanted 的问题。
    • 为您的更改创建一个 dagrs 仓库的分支。
    • 进行更改并将它们提交,附带清晰的提交信息。通过在提交信息中添加 Signed-off-by 行签署 开发者证书原产地(DCO)。这证实了您编写或有权提交您要贡献给项目的代码。
    • 将您的更改推送到 GitHub 并打开一个拉取请求。
    • 对您的拉取请求上的任何反馈做出回应。dagrs 维护者将审查您的更改,并在合并前可能要求修改。请确保您的代码格式正确,并遵循现有代码库的风格。
    • 一旦您的拉取请求被合并,您将作为贡献者列在项目仓库和文档中。
  4. 编写教程/博客文章:您可以通过编写教程或博客文章来帮助用户开始使用此项目进行贡献。将您的文章提交到问题跟踪器以供审查和包含。高质量的文章,为用户提供价值,将受到高度赞赏。
  5. 改进文档:如果您发现文档中有任何空白或认为任何部分可以改进,您可以修改文档文件夹中的文件并提交 PR。确保文档与最新更改保持最新。

您的贡献非常受欢迎。如果您在贡献过程中有任何疑问或遇到问题,请随时提问。您贡献得越多,您将学到的知识和技能就越多。

DCO & PGP

为了符合要求,贡献者必须在他们的提交信息中包括 Signed-off-by 行和 PGP 签名。有关如何生成 PGP 密钥的更多信息,请参阅此处

Git 也有一个命令行选项 -s 可以自动将其附加到您的提交信息,-S 可以用您的 PGP 密钥签名您的提交。例如

$ git commit -S -s -m 'This is my commit message'

重新合并分支

如果您有本地的 Git 环境,并且满足以下条件,一个选项是重新合并分支并在新提交中添加您的“已签署-提交”行。请注意,如果其他人已经开始基于该分支的工作,此解决方案将重写历史记录,可能会导致合作者遇到严重问题(请参阅 Git 文档中的“重合并的弊端”)。

只有在以下情况下才进行此操作:

  • 您是该分支中所有提交的唯一作者
  • 您绝对确定没有人正在基于该分支进行任何工作
  • 分支中没有空提交(例如,使用 -allow-empty 添加的 DCO 补救提交)

将您的“已签署-提交”行添加到该分支中的每个提交

  • 确保您有本地分支的副本,通过在本地通过命令行检出拉取请求来检查。
  • 在您的本地分支中,运行:git rebase HEAD~1 --signoff
  • 强制推送您的更改以覆盖分支:git push --force-with-lease origin main

许可证

Freighter 在此许可证下授权

联系我们

秋志磊 邮箱:[email protected]/[email protected]

依赖项

~4–13MB
~142K SLoC