8 个重大版本

0.9.0 2023年7月12日
0.7.0 2023年3月27日

#397 in 数据库接口

Download history 56/week @ 2024-03-13 152/week @ 2024-03-20 110/week @ 2024-03-27 114/week @ 2024-04-03 79/week @ 2024-04-10 60/week @ 2024-04-17 70/week @ 2024-04-24 67/week @ 2024-05-01 46/week @ 2024-05-08 109/week @ 2024-05-15 82/week @ 2024-05-22 147/week @ 2024-05-29 332/week @ 2024-06-05 143/week @ 2024-06-12 71/week @ 2024-06-19 265/week @ 2024-06-26

844 每月下载量

MIT 许可证

150KB
1.5K SLoC

Backie

使用 Tokio 的异步持久背景任务处理,将异步任务队列到工作器进行处理。设计简单易用,横向可扩展。使用 Postgres 作为存储后端,也可以扩展以支持其他类型的存储。

Backie 的工作原理概述

  • 客户端将任务放入队列
  • 服务器为每个队列启动多个工作器
  • 工作器从队列中拉取任务并开始处理
  • 多个工作器并发处理任务

Backie 最初是 fang crate 的分支,但很快在实现上有了显著的不同。

主要功能

以下是 Backie 的一些主要功能

  • 保证执行:至少执行一次任务
  • 异步工作器:工作器作为 Tokio 任务启动
  • 应用程序上下文:任务可以访问共享的用户提供的应用程序上下文
  • 单用途工作器:任务存储在一起,但工作器配置为只执行特定队列的任务
  • 重试:任务以自定义退避模式重试
  • 优雅关闭:提供关闭工作器的未来,即时任务不会中断
  • 恢复未完成任务:未完成的任务将在下一个工作器启动时重试
  • 唯一任务:如果提供唯一哈希,则任务在队列中不会重复
  • 执行超时:如果任务未及时完成,则任务会重试

其他计划的功能

  • 任务调度:任务可以计划在特定时间执行

任务执行协议

以下图表显示了执行任务使用的协议

stateDiagram-v2
    [*] --> Ready
    Ready --> Running: Task is picked up by a worker
    Running --> Done: Task is finished
    Running --> Failed: Task failed
    Failed --> Ready: Task is retried
    Failed --> [*]: Max retries reached
    Done --> [*]

当任务从 Running 状态变为 Failed 时,会进行重试。重试次数由 BackgroundTask::MAX_RETRIES 属性控制。默认实现使用 3 次重试。

安全性

该包使用 #![forbid(unsafe_code)] 确保所有内容都在 100% 安全的 Rust 中实现。

最低支持的 Rust 版本

Backie 的最低支持 Rust 版本是 1.68。

安装

  1. 将以下内容添加到您的 Cargo.toml
[dependencies]
backie = "0.1"

如果您还没有使用,您还希望包括以下依赖项以定义您的任务

[dependencies]
async-trait = "0.1"
serde = { version = "1.0", features = ["derive"] }
diesel = { version = "2.0", features = ["postgres", "serde_json", "chrono", "uuid"] }
diesel-async = { version = "0.2", features = ["postgres", "bb8"] }

这些依赖项是使用 #[async_trait]#[derive(Serialize, Deserialize)] 属性在任务定义中以及在连接到 Postgres 数据库时所需的。

  1. 在 Postgres 数据库中创建 backie_tasks 表。迁移文件可以在 迁移目录 中找到。

使用方法

使用 BackgroundTask 特征来定义任务。您必须为所有要执行的任务实现此特征。

需要注意的是,使用属性 BackgroundTask::TASK_NAME,该属性在整个应用程序中必须唯一。此属性对于从数据库重新构造任务至关重要。

使用 BackgroundTask::AppData 可以将应用程序特定的上下文信息作为参数传递给任务。例如,将数据库连接池传递给任务或传递其他应用程序配置很有用。

BackgroundTask::Error 是由 BackgroundTask::run 方法返回的错误类型。您可以使用此来为任务定义自己的错误类型。

BackgroundTask::run 方法是您定义后台任务执行行为的地方。此方法将由任务队列工作者调用。

use async_trait::async_trait;
use backie::{BackgroundTask, CurrentTask};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
pub struct MyTask {
    info: String,
}

#[async_trait]
impl BackgroundTask for MyTask {
    const TASK_NAME: &'static str = "my_task_unique_name";
    type AppData = ();
    type Error = ();

    async fn run(&self, task: CurrentTask, context: Self::AppData) -> Result<(), Self::Error> {
        // Do something
        Ok(())
    }
}

启动工作者

首先,我们需要创建一个 TaskStore 特征实例。这是负责从数据库中存储和检索任务的对象。Backie 目前仅支持通过提供的 PgTaskStore 使用 Postgres 作为存储后端。您可以通过实现 TaskStore 特征来实现其他存储后端。

然后,我们可以使用 task_store 通过 WorkerPool 来启动工作池。负责启动工作进程并管理其生命周期的 WorkerPool

启动工作池的完整示例可以在 示例目录 中找到。

任务排队

我们可以使用 PgTask 特征在任何点安排任务。这将排队任务,每当有可用的工人时,它将开始处理。在排队任务之前不需要启动工人。只要工人能够访问相同的底层存储系统,工人不需要与队列在同一个进程中。这使工人的水平扩展成为可能。

许可

本项目采用 [MIT 许可证][license] 许可。

贡献

  1. 将它分叉!
  2. 创建您的功能分支(git checkout -b my-new-feature
  3. 提交您的更改(git commit -am 'Add some feature'
  4. 将更改推送到分支(git push origin my-new-feature
  5. 创建新的 Pull Request

致谢

我要感谢 Fangbackground_job crate 的作者,这些作者是这个项目的灵感来源。

依赖关系

~7–18MB
~229K SLoC