#kubernetes #workflow #container #task #flow #python #yaml

bin+lib flowmium

Flowmium 是一个使用 Kubernetes 的工作流编排器

5 个版本

0.0.7 2023 年 12 月 24 日
0.0.6 2023 年 10 月 25 日
0.0.4 2023 年 9 月 29 日
0.0.0 2023 年 1 月 16 日

#497 in 开发工具

Download history

69 每月下载量

MIT 许可证

145KB
3.5K SLoC


Flowmium

Flowmium 是一个使用 Kubernetes 的工作流编排器。您可以定义和运行一个 YAML 工作流 或运行一个 Python 工作流,其中每个函数都作为一个 Kubernetes pod 运行。

Python 工作流示例

from flowmium import Flow, FlowContext
from flowmium.serializers import plain_text, json_text, pkl


flow = Flow("testing")


@flow.task(serializer=json_text)
def foo() -> str:
    return "Hallo world"


@flow.task({"input_str": foo}, serializer=plain_text)
def replace_letter_a(input_str: str, flowctx: FlowContext) -> str:
    return input_str.replace("a", "e") + str(flowctx.task_id)


@flow.task({"input_str": foo}, serializer=pkl)
def replace_letter_t(input_str: str) -> str:
    return input_str.replace("t", "d")


@flow.task(
    {"first": replace_letter_t, "second": replace_letter_a}, serializer=plain_text
)
def concat(first: str, second: str) -> str:
    return f"{first} {second}"


if __name__ == "__main__":
    flow.run()

入门

flowctl 命令行界面

flowctl 命令行界面用于监视工作流当前状态、提交新工作流和下载工件。

安装

cargo install flowmium

用法

操作 命令
列出工作流 flowctl list
使用显式 URL flowctl--url http://localhost:8080 list
提交 YAML 工作流 flowctl submit flow.yaml
下载工件 flowctl download<flow-id> <output-name> <local-dir-path>
订阅事件 flowctl subscribe
描述工作流 flowctl describe<id>
创建密钥 flowctl secret create<key> <value>
更新密钥 flowctl secret update<key> <value>
删除密钥 flowctl secret delete<key>

注意

密钥存储在服务器上,可以引用在 YAML 定义或 Python 工作流中设置环境变量值。这样您就不必将密钥提交到您的仓库。但是,它们不使用 Kubernetes 密钥,当工作流任务作为 Job 部署时,它们被设置为正常的环境变量。

YAML 工作流定义模式

YAML流程定义参考。请参阅示例

类型 描述
name 字符串 流程名称
任务 任务列表 任务列表,每个任务都将作为Kubernetes作业部署

任务

类型 描述
name 字符串 任务名称
镜像 字符串 任务的Docker镜像
依赖 字符串列表 此任务所依赖的其他任务的名称列表,这些任务将在该任务之前运行
命令 字符串列表 任务的入口命令
环境 环境变量列表 任务的 environment variables 列表
输入 输入列表 从依赖任务下载的输入列表
输出 输出列表 从任务上传的输出列表,以便其他任务可以使用

环境

类型 描述
name 字符串 环境变量名称
valuefromSecret 字符串 如果为value则为字面字符串值,如果为fromSecret则为密钥的名称

输入

类型 描述
来自 字符串 依赖任务输出的名称,需要下载
path 字符串 输入应该下载到的路径

输出

类型 描述
name 字符串 输出名称
path 字符串 运行cmd时将输出写入的路径

从源运行

从源运行python流程示例

这些说明将允许您从本地源运行示例python流程(framework/tests/example_flow.py),无需从上游提取(包括执行器)。使用此功能来验证您的更改。说明假定您位于存储库的根目录。

  • 安装sqlx CLI

    cargo install sqlx-cli
    
  • 在本地运行测试Kubernetes集群、minio和容器注册库

    cd flowmium/
    make up
    
  • 监视本地集群中运行的Pod

    cd flowmium/
    make watch
    
  • 运行迁移

    cd flowmium/
    sqlx migrate run
    
  • 从存储库的根目录运行flowmium服务器

    cd flowmium/
    export FLOWMIUM_POSTGRES_URL='postgres://flowmium:flowmium@localhost/flowmium'
    export FLOWMIUM_STORE_URL='https://127.0.0.1:9000'
    export FLOWMIUM_TASK_STORE_URL='http://172.16.238.4:9000'
    export FLOWMIUM_BUCKET_NAME='flowmium-test'
    export FLOWMIUM_ACCESS_KEY='minio'
    export FLOWMIUM_SECRET_KEY='password'
    export FLOWMIUM_INIT_CONTAINER_IMAGE='docker.io/shnoo28/flowmium:latest'
    export FLOWMIUM_NAMESPACE=default
    export KUBECONFIG=./kubeconfig.yaml
    cargo run --bin flowmium -- server --port 8080
    
  • 使用flowctl监视流程状态

    cd flowmium/
    cargo build
    watch ./target/debug/flowctl list
    
  • 构建并推送示例python流程(注意:如果您正在运行第二次测试,则可能需要使用不同的镜像名称,或者修剪您的机器上的Docker镜像)

    cd framework/
    docker build . -t py-flow-test
    docker tag py-flow-test localhost:5180/py-flow-test:latest
    docker push localhost:5180/py-flow-test:latest
    
  • 将流程提交给执行器服务器

    python3 -m tests --image registry:5000/py-flow-test:latest --cmd 'python3 -m tests' --flowmium-server http://localhost:8080
    

运行端到端测试

  • 对于使用上游init容器的端到端测试

    make test
    
  • 对于使用源init容器的端到端测试

    FLOWMIUM_INIT_CONTAINER_IMAGE_FROM_SOURCE=true make test
    

运行python框架的单元测试

framework/路径运行make test

依赖项

~89MB
~1.5M SLoC