5 个版本
0.0.7 | 2023 年 12 月 24 日 |
---|---|
0.0.6 | 2023 年 10 月 25 日 |
0.0.4 | 2023 年 9 月 29 日 |
0.0.0 |
|
#497 in 开发工具
69 每月下载量
145KB
3.5K SLoC
Flowmium
Flowmium 是一个使用 Kubernetes 的工作流编排器。您可以定义和运行一个 YAML 工作流 或运行一个 Python 工作流,其中每个函数都作为一个 Kubernetes pod 运行。
Python 工作流示例
from flowmium import Flow, FlowContext
from flowmium.serializers import plain_text, json_text, pkl
flow = Flow("testing")
@flow.task(serializer=json_text)
def foo() -> str:
return "Hallo world"
@flow.task({"input_str": foo}, serializer=plain_text)
def replace_letter_a(input_str: str, flowctx: FlowContext) -> str:
return input_str.replace("a", "e") + str(flowctx.task_id)
@flow.task({"input_str": foo}, serializer=pkl)
def replace_letter_t(input_str: str) -> str:
return input_str.replace("t", "d")
@flow.task(
{"first": replace_letter_t, "second": replace_letter_a}, serializer=plain_text
)
def concat(first: str, second: str) -> str:
return f"{first} {second}"
if __name__ == "__main__":
flow.run()
入门
- 在本地设置以进行测试
- 在生产环境中部署
- 示例 Python 包工作流
- 示例 Python 脚本工作流
- 示例 YAML 定义工作流
- Python 框架文档
- API 文档
- Rust 客户端文档
- 集成到现有的 Rust 项目中
flowctl
命令行界面
flowctl
命令行界面用于监视工作流当前状态、提交新工作流和下载工件。
安装
cargo install flowmium
用法
操作 | 命令 |
---|---|
列出工作流 | flowctl list |
使用显式 URL | flowctl--url http://localhost:8080 list |
提交 YAML 工作流 | flowctl submit flow.yaml |
下载工件 | flowctl download<flow-id> <output-name> <local-dir-path> |
订阅事件 | flowctl subscribe |
描述工作流 | flowctl describe<id> |
创建密钥 | flowctl secret create<key> <value> |
更新密钥 | flowctl secret update<key> <value> |
删除密钥 | flowctl secret delete<key> |
注意
密钥存储在服务器上,可以引用在 YAML 定义或 Python 工作流中设置环境变量值。这样您就不必将密钥提交到您的仓库。但是,它们不使用 Kubernetes 密钥,当工作流任务作为 Job 部署时,它们被设置为正常的环境变量。
YAML 工作流定义模式
YAML流程定义参考。请参阅示例。
根
键 | 类型 | 描述 |
---|---|---|
name |
字符串 | 流程名称 |
任务 |
任务列表 | 任务列表,每个任务都将作为Kubernetes作业部署 |
任务
键 | 类型 | 描述 |
---|---|---|
name |
字符串 | 任务名称 |
镜像 |
字符串 | 任务的Docker镜像 |
依赖 |
字符串列表 | 此任务所依赖的其他任务的名称列表,这些任务将在该任务之前运行 |
命令 |
字符串列表 | 任务的入口命令 |
环境 |
环境变量列表 | 任务的 environment variables 列表 |
输入 |
输入列表 | 从依赖任务下载的输入列表 |
输出 |
输出列表 | 从任务上传的输出列表,以便其他任务可以使用 |
环境
键 | 类型 | 描述 |
---|---|---|
name |
字符串 | 环境变量名称 |
value 或fromSecret |
字符串 | 如果为value 则为字面字符串值,如果为fromSecret 则为密钥的名称 |
输入
键 | 类型 | 描述 |
---|---|---|
来自 |
字符串 | 依赖任务输出的名称,需要下载 |
path |
字符串 | 输入应该下载到的路径 |
输出
键 | 类型 | 描述 |
---|---|---|
name |
字符串 | 输出名称 |
path |
字符串 | 运行cmd 时将输出写入的路径 |
从源运行
从源运行python流程示例
这些说明将允许您从本地源运行示例python流程(framework/tests/example_flow.py
),无需从上游提取(包括执行器)。使用此功能来验证您的更改。说明假定您位于存储库的根目录。
-
安装sqlx CLI
cargo install sqlx-cli
-
在本地运行测试Kubernetes集群、minio和容器注册库
cd flowmium/ make up
-
监视本地集群中运行的Pod
cd flowmium/ make watch
-
运行迁移
cd flowmium/ sqlx migrate run
-
从存储库的根目录运行flowmium服务器
cd flowmium/ export FLOWMIUM_POSTGRES_URL='postgres://flowmium:flowmium@localhost/flowmium' export FLOWMIUM_STORE_URL='https://127.0.0.1:9000' export FLOWMIUM_TASK_STORE_URL='http://172.16.238.4:9000' export FLOWMIUM_BUCKET_NAME='flowmium-test' export FLOWMIUM_ACCESS_KEY='minio' export FLOWMIUM_SECRET_KEY='password' export FLOWMIUM_INIT_CONTAINER_IMAGE='docker.io/shnoo28/flowmium:latest' export FLOWMIUM_NAMESPACE=default export KUBECONFIG=./kubeconfig.yaml cargo run --bin flowmium -- server --port 8080
-
使用
flowctl
监视流程状态cd flowmium/ cargo build watch ./target/debug/flowctl list
-
构建并推送示例python流程(注意:如果您正在运行第二次测试,则可能需要使用不同的镜像名称,或者修剪您的机器上的Docker镜像)
cd framework/ docker build . -t py-flow-test docker tag py-flow-test localhost:5180/py-flow-test:latest docker push localhost:5180/py-flow-test:latest
-
将流程提交给执行器服务器
python3 -m tests --image registry:5000/py-flow-test:latest --cmd 'python3 -m tests' --flowmium-server http://localhost:8080
运行端到端测试
-
对于使用上游init容器的端到端测试
make test
-
对于使用源init容器的端到端测试
FLOWMIUM_INIT_CONTAINER_IMAGE_FROM_SOURCE=true make test
运行python框架的单元测试
从framework/
路径运行make test
依赖项
~89MB
~1.5M SLoC