11个版本
0.2.0 | 2024年7月24日 |
---|---|
0.1.2 | 2024年7月12日 |
0.1.1 | 2024年6月30日 |
0.0.7 | 2023年7月24日 |
#254 在 数据库接口
每月180次下载
35KB
653 行
pg_task
基于FSM的可恢复Postgres任务
- 基于FSM - 每个任务都是一个细粒度的状态机
- 可恢复 - 在出错后,修复步骤逻辑或外部世界后,任务可以继续未完成的步骤
- Postgres - 一个表就足以处理任务调度、状态转换和错误处理
目录
教程
完整的可运行代码在 examples/tutorial.rs。
定义任务
我们创建一个由两个步骤组成的问候任务
#[derive(Debug, Deserialize, Serialize)]
pub struct ReadName {
filename: String,
}
#[async_trait]
impl Step<Greeter> for ReadName {
const RETRY_LIMIT: i32 = 5;
async fn step(self, _db: &PgPool) -> StepResult<Greeter> {
let name = std::fs::read_to_string(&self.filename)?;
NextStep::now(SayHello { name })
}
}
第一步尝试从文件中读取一个名字
filename
- 这个步骤中我们需要的唯一状态impl Step<Greeter> for ReadName
- 我们的步骤是Greeter
任务的一部分RETRY_LIMIT
- 这个步骤是可失败的,让我们重试几次NextStep::now(SayHello { name })
- 立即将我们的任务移动到SayHello
步骤
#[derive(Debug, Deserialize, Serialize)]
pub struct SayHello {
name: String,
}
#[async_trait]
impl Step<Greeter> for SayHello {
async fn step(self, _db: &PgPool) -> StepResult<Greeter> {
println!("Hello, {}", self.name);
NextStep::none()
}
}
第二步打印问候语并完成任务,返回 NextStep::none()
。
基本上就是这些,除了你可以在 完整代码 中找到的一些模板代码。让我们运行它
cargo run --example hello
调查错误
你会看到关于6(第一次尝试 + RETRY_LIMIT
)尝试和最终错误信息的日志消息。让我们查看数据库以了解发生了什么
~$ psql pg_task -c 'table pg_task'
-[ RECORD 1 ]------------------------------------------------
id | cddf7de1-1194-4bee-90c6-af73d9206ce2
step | {"Greeter":{"ReadName":{"filename":"name.txt"}}}
wakeup_at | 2024-06-30 09:32:27.703599+06
tried | 6
is_running | f
error | No such file or directory (os error 2)
created_at | 2024-06-30 09:32:22.628563+06
updated_at | 2024-06-30 09:32:27.703599+06
- 一个非空的
error
字段表示任务已出错并包含错误信息 - “
step
”字段提供了有关特定步骤及其在错误发生时的状态的信息
修复世界
在这种情况下,错误是由于外部世界状态引起的。让我们通过创建文件来修复它
echo 'Fixed World' > name.txt
要重新运行任务,我们只需清除其error
psql pg_task -c 'update pg_task set error = null'
您将看到重新运行任务和最终步骤的问候信息的日志消息。就是这样 🎉。
调度任务
本质上,通过将对应行插入到pg_task
表中来完成任务的调度。您可以通过psql
手动完成,或者用任何语言编写代码。
还有一些辅助函数来处理第一步序列化和时间调度
运行工作进程
在定义每个任务的步骤后,我们需要通过task!
将这些步骤包装到表示整个任务的枚举中
pg_task::task!(Task1 { StepA, StepB });
pg_task::task!(Task2 { StepC });
还需要一个枚举来组合所有可能的任务
pg_task::scheduler!(Tasks { Task1, Task2 });
现在我们可以运行工作进程
pg_task::Worker::<Tasks>::new(db).run().await?;
所有通信都由数据库同步,所以您运行多少个工作进程,或者运行方式如何,都没有关系。它可以是单独的进程,也可以是进程内的tokio::spawn
。
停止工作进程
您可以通过使用数据库发送通知来优雅地停止任务运行器
SELECT pg_notify('pg_task_changed', 'stop_worker');
工作进程将等待所有任务当前步骤完成,然后退出。您可以通过检查运行中的任务的存在来等待这种情况
SELECT EXISTS(SELECT 1 FROM pg_task WHERE is_running = true);
延迟步骤
有时您需要延迟下一步。在返回下一步之前使用tokio::time::sleep
将创建一些问题
- 如果在睡眠过程中进程崩溃,它不会被考虑已完成,并且在重启时会重新运行
- 您必须等待gracefulshutdown上的睡眠任务完成
请使用NextStep::delay
代替 - 它通过延迟安排下一步,并立即完成当前步骤。
您可以在examples/delay.rs中找到一个可运行的示例
重试步骤
当您需要在错误时重试任务时,请使用Step::RETRY_LIMIT
和Step::RETRY_DELAY
impl Step<MyTask> for ApiRequest {
const RETRY_LIMIT: i32 = 5;
const RETRY_DELAY: Duration = Duration::from_secs(5);
async fn step(self, _db: &PgPool) -> StepResult<MyTask> {
let result = api_request().await?;
NextStep::now(ProcessResult { result })
}
}
贡献
- 在发送PR之前,请运行.pre-commit.sh,它将检查一切
许可证
该项目受MIT许可证的许可。
依赖关系
~37–50MB
~866K SLoC