#pipeline #orchestration #tool #tasks #thepipelinetool #airflow #tpt

thepipelinetool_core

一个*实验性*的管道编排工具,借鉴了Apache Airflow的概念

7个版本

0.2.7 2024年4月22日
0.2.6 2024年4月21日

#6#thepipelinetool

Download history 478/week @ 2024-04-18 48/week @ 2024-04-25 4/week @ 2024-05-02 1/week @ 2024-05-23 5/week @ 2024-05-30 2/week @ 2024-06-06 1/week @ 2024-07-04

405 每月下载量
2 个crate中使用了(通过 thepipelinetool

MIT/Apache

56KB
1.5K SLoC

use thepipelinetool_core::{prelude::*, tpt};

#[tpt::main]
fn main() {
    // define your tasks here
}

文档

  • 这里找到最新的文档

示例

简单的DAG

use thepipelinetool_core::{prelude::*, tpt};

fn produce_data(_: ()) -> String {
    "world".to_string()
}

fn print_data(arg: String) -> () {
    println!("hello {arg}");
}

#[tpt::main]
fn main() {
    let opts = &TaskOptions::default();

    // add a task that uses the function 'produce_data'
    let task_ref = add_task(produce_data, (), opts);

    // add a task that depends on 'produce_data'
    let _ = add_task_with_ref(print_data, &task_ref, opts);
}
flowchart TD
  id0(produce_data_0)
  style id0 color:black,stroke:grey,fill:white,stroke-width:4px
  id1(print_data_1)
  style id1 color:black,stroke:grey,fill:white,stroke-width:4px
  id0-->id1

手动定义依赖关系

use thepipelinetool_core::{prelude::*, tpt};

fn produce_data(_: ()) -> String {
    "world".to_string()
}

fn print_data(arg: String) -> () {
    println!("hello {arg}");
}

#[tpt::main]
fn main() {
    let opts = &TaskOptions::default();
    let task_ref = add_task(produce_data, (), opts);

    // these tasks will execute in parallel
    let _task_ref1 = add_task_with_ref(print_data, &task_ref, opts);
    let _task_ref2 = add_task_with_ref(print_data, &task_ref, opts);

    // declare downstream dependencies using right-shift operator '>>'
    let task_ref3 = add_task_with_ref(print_data, &task_ref, opts);
    let task_ref4 = add_task_with_ref(print_data, &task_ref, opts);
    let _ = task_ref4 >> task_ref3; // run task4 before task3

    // declare upstream dependencies using left-shift operator '<<'
    let task_ref5 = add_task_with_ref(print_data, &task_ref, opts);
    let task_ref6 = add_task_with_ref(print_data, &task_ref, opts);
    let _ = &task_ref5 << task_ref6; // run task6 before task5

    // declare parallel tasks using bitwise-or operator '|'
    let task_ref7 = add_task_with_ref(print_data, &task_ref, opts);
    let task_ref8 = add_task_with_ref(print_data, &task_ref, opts);
    let parallel_task_ref = task_ref7 | task_ref8; // run task7 and task8 in parallel

    // use previous results for further dependency declaration
    let _ = parallel_task_ref >> task_ref5;

    // chaining
    let task_ref9 = add_task_with_ref(print_data, &task_ref, opts);
    let task_ref10 = add_task_with_ref(print_data, &task_ref, opts);
    let task_ref11 = add_task_with_ref(print_data, &task_ref, opts);

    let _ = task_ref9 >> task_ref10 >> task_ref11;
    // the result of taskA >> taskB is taskB, so the above is equivalent to:
    // ((task_ref9 >> task_ref10) >> task_ref11)
}
flowchart TD
  id0(produce_data_0)
  style id0 color:black,stroke:grey,fill:white,stroke-width:4px
  id1(print_data_1)
  style id1 color:black,stroke:grey,fill:white,stroke-width:4px
  id0-->id1
  id2(print_data_2)
  style id2 color:black,stroke:grey,fill:white,stroke-width:4px
  id0-->id2
  id3(print_data_3)
  style id3 color:black,stroke:grey,fill:white,stroke-width:4px
  id0-->id3
  id4-->id3
  id4(print_data_4)
  style id4 color:black,stroke:grey,fill:white,stroke-width:4px
  id0-->id4
  id5(print_data_5)
  style id5 color:black,stroke:grey,fill:white,stroke-width:4px
  id8-->id5
  id7-->id5
  id0-->id5
  id6-->id5
  id6(print_data_6)
  style id6 color:black,stroke:grey,fill:white,stroke-width:4px
  id0-->id6
  id7(print_data_7)
  style id7 color:black,stroke:grey,fill:white,stroke-width:4px
  id0-->id7
  id8(print_data_8)
  style id8 color:black,stroke:grey,fill:white,stroke-width:4px
  id0-->id8
  id9(print_data_9)
  style id9 color:black,stroke:grey,fill:white,stroke-width:4px
  id0-->id9
  id10(print_data_10)
  style id10 color:black,stroke:grey,fill:white,stroke-width:4px
  id9-->id10
  id0-->id10
  id11(print_data_11)
  style id11 color:black,stroke:grey,fill:white,stroke-width:4px
  id10-->id11
  id0-->id11

分支任务

use thepipelinetool_core::{prelude::*, tpt};

fn branch_task(_: ()) -> Branch<usize> {
    Branch::Left(0)
}

fn left(arg: usize) -> () {
    println!("left {arg}");
}

fn right(_: usize) -> () {
    println!("this won't execute");
}

#[tpt::main]
fn main() {
    // only 'left' task will be executed since branch_task returns Branch::Left
    let _ = branch(branch_task, (), left, right, &TaskOptions::default());
}
flowchart TD
  id0(branch_task_0)
  style id0 color:black,stroke:grey,fill:white,stroke-width:4px
  id1(left_1)
  style id1 color:black,stroke:grey,fill:white,stroke-width:4px
  id0-->id1
  id2(right_2)
  style id2 color:black,stroke:grey,fill:white,stroke-width:4px
  id0-->id2

动态任务

use thepipelinetool_core::{prelude::*, tpt};

fn produce_lazy(_: ()) -> Vec<u8> {
    vec![0, 1]
}

fn say_hello(arg: u8) -> u8 {
    println!("hello {arg}");
    arg
}

#[tpt::main]
fn main() {
    let opts = &TaskOptions::default();

    let produce_lazy_task_ref = add_task(produce_lazy, (), opts);

    // creates a new task for each item in 'produce_lazy' result
    let expanded_lazy_task_ref = expand_lazy(say_hello, &produce_lazy_task_ref, opts);

    // you can also chain lazily expanded tasks
    let _ = expand_lazy(say_hello, &expanded_lazy_task_ref, opts);
}

更多示例

这里找到更多示例

部署

为了部署管道,编译的二进制文件必须放置在 PIPELINES_DIR 中,以便服务器和工作者都能访问。访问模板项目以获取docker-compose.yml示例

许可证

AGPLv3

依赖

~4–5.5MB
~105K SLoC