8 个重大版本
0.9.0 | 2023年7月12日 |
---|---|
0.7.0 | 2023年3月27日 |
#397 in 数据库接口
844 每月下载量
150KB
1.5K SLoC
使用 Tokio 的异步持久背景任务处理,将异步任务队列到工作器进行处理。设计简单易用,横向可扩展。使用 Postgres 作为存储后端,也可以扩展以支持其他类型的存储。
Backie 的工作原理概述
- 客户端将任务放入队列
- 服务器为每个队列启动多个工作器
- 工作器从队列中拉取任务并开始处理
- 多个工作器并发处理任务
Backie 最初是 fang crate 的分支,但很快在实现上有了显著的不同。
主要功能
以下是 Backie 的一些主要功能
- 保证执行:至少执行一次任务
- 异步工作器:工作器作为 Tokio 任务启动
- 应用程序上下文:任务可以访问共享的用户提供的应用程序上下文
- 单用途工作器:任务存储在一起,但工作器配置为只执行特定队列的任务
- 重试:任务以自定义退避模式重试
- 优雅关闭:提供关闭工作器的未来,即时任务不会中断
- 恢复未完成任务:未完成的任务将在下一个工作器启动时重试
- 唯一任务:如果提供唯一哈希,则任务在队列中不会重复
- 执行超时:如果任务未及时完成,则任务会重试
其他计划的功能
- 任务调度:任务可以计划在特定时间执行
任务执行协议
以下图表显示了执行任务使用的协议
stateDiagram-v2
[*] --> Ready
Ready --> Running: Task is picked up by a worker
Running --> Done: Task is finished
Running --> Failed: Task failed
Failed --> Ready: Task is retried
Failed --> [*]: Max retries reached
Done --> [*]
当任务从 Running
状态变为 Failed
时,会进行重试。重试次数由 BackgroundTask::MAX_RETRIES
属性控制。默认实现使用 3
次重试。
安全性
该包使用 #![forbid(unsafe_code)]
确保所有内容都在 100% 安全的 Rust 中实现。
最低支持的 Rust 版本
Backie 的最低支持 Rust 版本是 1.68。
安装
- 将以下内容添加到您的
Cargo.toml
[dependencies]
backie = "0.1"
如果您还没有使用,您还希望包括以下依赖项以定义您的任务
[dependencies]
async-trait = "0.1"
serde = { version = "1.0", features = ["derive"] }
diesel = { version = "2.0", features = ["postgres", "serde_json", "chrono", "uuid"] }
diesel-async = { version = "0.2", features = ["postgres", "bb8"] }
这些依赖项是使用 #[async_trait]
和 #[derive(Serialize, Deserialize)]
属性在任务定义中以及在连接到 Postgres 数据库时所需的。
- 在 Postgres 数据库中创建
backie_tasks
表。迁移文件可以在 迁移目录 中找到。
使用方法
使用 BackgroundTask
特征来定义任务。您必须为所有要执行的任务实现此特征。
需要注意的是,使用属性 BackgroundTask::TASK_NAME
,该属性在整个应用程序中必须唯一。此属性对于从数据库重新构造任务至关重要。
使用 BackgroundTask::AppData
可以将应用程序特定的上下文信息作为参数传递给任务。例如,将数据库连接池传递给任务或传递其他应用程序配置很有用。
BackgroundTask::Error
是由 BackgroundTask::run
方法返回的错误类型。您可以使用此来为任务定义自己的错误类型。
BackgroundTask::run
方法是您定义后台任务执行行为的地方。此方法将由任务队列工作者调用。
use async_trait::async_trait;
use backie::{BackgroundTask, CurrentTask};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
pub struct MyTask {
info: String,
}
#[async_trait]
impl BackgroundTask for MyTask {
const TASK_NAME: &'static str = "my_task_unique_name";
type AppData = ();
type Error = ();
async fn run(&self, task: CurrentTask, context: Self::AppData) -> Result<(), Self::Error> {
// Do something
Ok(())
}
}
启动工作者
首先,我们需要创建一个 TaskStore
特征实例。这是负责从数据库中存储和检索任务的对象。Backie 目前仅支持通过提供的 PgTaskStore
使用 Postgres 作为存储后端。您可以通过实现 TaskStore
特征来实现其他存储后端。
然后,我们可以使用 task_store
通过 WorkerPool
来启动工作池。负责启动工作进程并管理其生命周期的 WorkerPool
。
启动工作池的完整示例可以在 示例目录 中找到。
任务排队
我们可以使用 PgTask
特征在任何点安排任务。这将排队任务,每当有可用的工人时,它将开始处理。在排队任务之前不需要启动工人。只要工人能够访问相同的底层存储系统,工人不需要与队列在同一个进程中。这使工人的水平扩展成为可能。
许可
本项目采用 [MIT 许可证][license] 许可。
贡献
- 将它分叉!
- 创建您的功能分支(
git checkout -b my-new-feature
) - 提交您的更改(
git commit -am 'Add some feature'
) - 将更改推送到分支(
git push origin my-new-feature
) - 创建新的 Pull Request
致谢
我要感谢 Fang 和 background_job crate 的作者,这些作者是这个项目的灵感来源。
依赖关系
~7–18MB
~229K SLoC