11个版本

0.2.0 2024年7月24日
0.1.2 2024年7月12日
0.1.1 2024年6月30日
0.0.7 2023年7月24日

#254数据库接口

Download history 23/week @ 2024-06-17 251/week @ 2024-06-24 99/week @ 2024-07-01 125/week @ 2024-07-08 9/week @ 2024-07-15 140/week @ 2024-07-22 26/week @ 2024-07-29 14/week @ 2024-08-12

每月180次下载

MIT许可证

35KB
653

pg_task

License Crates.io Docs.rs

基于FSM的可恢复Postgres任务

  • 基于FSM - 每个任务都是一个细粒度的状态机
  • 可恢复 - 在出错后,修复步骤逻辑或外部世界后,任务可以继续未完成的步骤
  • Postgres - 一个表就足以处理任务调度、状态转换和错误处理

目录

教程

完整的可运行代码在 examples/tutorial.rs

定义任务

我们创建一个由两个步骤组成的问候任务

#[derive(Debug, Deserialize, Serialize)]
pub struct ReadName {
    filename: String,
}

#[async_trait]
impl Step<Greeter> for ReadName {
    const RETRY_LIMIT: i32 = 5;

    async fn step(self, _db: &PgPool) -> StepResult<Greeter> {
        let name = std::fs::read_to_string(&self.filename)?;
        NextStep::now(SayHello { name })
    }
}

第一步尝试从文件中读取一个名字

  • filename - 这个步骤中我们需要的唯一状态
  • impl Step<Greeter> for ReadName - 我们的步骤是 Greeter 任务的一部分
  • RETRY_LIMIT - 这个步骤是可失败的,让我们重试几次
  • NextStep::now(SayHello { name }) - 立即将我们的任务移动到 SayHello 步骤
#[derive(Debug, Deserialize, Serialize)]
pub struct SayHello {
    name: String,
}
#[async_trait]
impl Step<Greeter> for SayHello {
    async fn step(self, _db: &PgPool) -> StepResult<Greeter> {
        println!("Hello, {}", self.name);
        NextStep::none()
    }
}

第二步打印问候语并完成任务,返回 NextStep::none()

基本上就是这些,除了你可以在 完整代码 中找到的一些模板代码。让我们运行它

cargo run --example hello

调查错误

你会看到关于6(第一次尝试 + RETRY_LIMIT)尝试和最终错误信息的日志消息。让我们查看数据库以了解发生了什么

~$ psql pg_task -c 'table pg_task'
-[ RECORD 1 ]------------------------------------------------
id         | cddf7de1-1194-4bee-90c6-af73d9206ce2
step       | {"Greeter":{"ReadName":{"filename":"name.txt"}}}
wakeup_at  | 2024-06-30 09:32:27.703599+06
tried      | 6
is_running | f
error      | No such file or directory (os error 2)
created_at | 2024-06-30 09:32:22.628563+06
updated_at | 2024-06-30 09:32:27.703599+06
  • 一个非空的 error 字段表示任务已出错并包含错误信息
  • step”字段提供了有关特定步骤及其在错误发生时的状态的信息

修复世界

在这种情况下,错误是由于外部世界状态引起的。让我们通过创建文件来修复它

echo 'Fixed World' > name.txt

要重新运行任务,我们只需清除其error

psql pg_task -c 'update pg_task set error = null'

您将看到重新运行任务和最终步骤的问候信息的日志消息。就是这样 🎉。

调度任务

本质上,通过将对应行插入到pg_task表中来完成任务的调度。您可以通过psql手动完成,或者用任何语言编写代码。

还有一些辅助函数来处理第一步序列化和时间调度

运行工作进程

定义每个任务的步骤后,我们需要通过task!将这些步骤包装到表示整个任务的枚举中

pg_task::task!(Task1 { StepA, StepB });
pg_task::task!(Task2 { StepC });

还需要一个枚举来组合所有可能的任务

pg_task::scheduler!(Tasks { Task1, Task2 });

现在我们可以运行工作进程

pg_task::Worker::<Tasks>::new(db).run().await?;

所有通信都由数据库同步,所以您运行多少个工作进程,或者运行方式如何,都没有关系。它可以是单独的进程,也可以是进程内的tokio::spawn

停止工作进程

您可以通过使用数据库发送通知来优雅地停止任务运行器

SELECT pg_notify('pg_task_changed', 'stop_worker');

工作进程将等待所有任务当前步骤完成,然后退出。您可以通过检查运行中的任务的存在来等待这种情况

SELECT EXISTS(SELECT 1 FROM pg_task WHERE is_running = true);

延迟步骤

有时您需要延迟下一步。在返回下一步之前使用tokio::time::sleep将创建一些问题

  • 如果在睡眠过程中进程崩溃,它不会被考虑已完成,并且在重启时会重新运行
  • 您必须等待gracefulshutdown上的睡眠任务完成

请使用NextStep::delay代替 - 它通过延迟安排下一步,并立即完成当前步骤。

您可以在examples/delay.rs中找到一个可运行的示例

重试步骤

当您需要在错误时重试任务时,请使用Step::RETRY_LIMITStep::RETRY_DELAY

impl Step<MyTask> for ApiRequest {
    const RETRY_LIMIT: i32 = 5;
    const RETRY_DELAY: Duration = Duration::from_secs(5);

    async fn step(self, _db: &PgPool) -> StepResult<MyTask> {
        let result = api_request().await?;
        NextStep::now(ProcessResult { result })
    }
}

贡献

许可证

该项目受MIT许可证的许可。

依赖关系

~37–50MB
~866K SLoC