14 个版本
0.5.0 | 2023 年 10 月 15 日 |
---|---|
0.4.1 | 2022 年 6 月 30 日 |
0.3.5 | 2022 年 5 月 20 日 |
0.3.4 | 2021 年 12 月 1 日 |
0.1.1 | 2021 年 3 月 30 日 |
#176 in 数据库接口
1,128 每月下载量
用于 requeuest
76KB
1.5K SLoC
sqlxmq
基于 sqlx
和 PostgreSQL
构建的作业队列。
此库允许 CRUD 应用程序在不复杂化其部署的情况下运行后台作业。唯一的运行时依赖项是 PostgreSQL
,因此这对于已经使用 PostgreSQL
数据库的应用程序来说非常理想。
虽然使用 SQL 数据库作为作业队列意味着在交付作业的延迟上做出妥协,但普通作业队列中存在的一些关键问题可以通过这种方式完全避免。
在大多数其他作业队列中,正在执行的作业状态没有被正常数据库备份覆盖。即使作业被备份,也没有办法在不手动解决冲突的情况下将数据库和作业队列恢复到一致的时间点。
通过在数据库中存储作业,现有的备份程序将存储正在执行的作业和持久数据的完美一致状态。此外,作业可以作为其他事务的一部分生成和完成,这使得编写正确的应用程序代码变得容易。
利用 PostgreSQL
的强大功能,此作业队列提供了一些在其他作业队列中不存在的功能。
特性
-
同时发送/接收多个作业。
这减少了数据库查询的数量。
-
将作业发送到未来日期和时间执行。
避免了对单独调度系统的需求。
-
作业的可靠交付。
-
指数退避的自动重试。
重试次数和初始退避参数可配置。
-
事务性发送作业。
避免在事务回滚时发送虚假作业。
-
事务性完成作业。
如果作业的所有副作用都是数据库的更新,则这提供了作业的真正一次执行。
-
作业的事务性检查点。
长时间运行的任务可以将状态保存到检查点,以避免在出现故障时需要从头开始重新启动:下一次重试可以从最后一个检查点继续。
-
选择严格有序的任务投递。
如果为任务启用了此选项,同一通道内的任务将按顺序严格处理。
-
公平的任务投递。
有很多待运行任务通道不会使较少任务通道饿死。
-
选择两阶段提交。
这对于有序通道特别有用,其中可以在任务顺序中“预留”位置,但直到稍后才能提交。
-
JSON和/或二进制有效载荷。
任务可以使用最方便的。
-
自动保持任务活动状态。
长时间运行的任务将自动“保持活动状态”,以防止在它们仍在进行时被重试。
-
并发限制。
指定每个运行者应处理的并发任务的最小和最大数量。
-
通过属性宏实现的内置任务注册。
任务可以轻松注册到运行者,并在每个任务的基础上指定默认配置。
-
隐式通道。
在任务发送和处理时,通道会隐式创建和销毁,因此不需要设置。
-
通道组。
由于通道名称和通道参数的分离,可以轻松同时订阅多个通道。
-
基于NOTIFY的轮询。
当正在处理很少的任务时,这可以节省资源。
入门
数据库模式
这个包期望某些数据库表和存储过程存在。您可以将此包中的迁移文件复制到您自己的迁移文件夹中。
此包创建的所有数据库项都以前缀mq
开头,以免与您的模式冲突。
定义任务
第一步是定义一个要在任务队列上运行的函数。
use std::error::Error;
use sqlxmq::{job, CurrentJob};
// Arguments to the `#[job]` attribute allow setting default job options.
#[job(channel_name = "foo")]
async fn example_job(
// The first argument should always be the current job.
mut current_job: CurrentJob,
// Additional arguments are optional, but can be used to access context
// provided via [`JobRegistry::set_context`].
message: &'static str,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
// Decode a JSON payload
let who: Option<String> = current_job.json()?;
// Do some work
println!("{}, {}!", message, who.as_deref().unwrap_or("world"));
// Mark the job as complete
current_job.complete().await?;
Ok(())
}
监听任务
接下来,我们需要创建一个任务运行者:这是监听新任务并执行它们的。
use std::error::Error;
use sqlxmq::JobRegistry;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// You'll need to provide a Postgres connection pool.
let pool = connect_to_db().await?;
// Construct a job registry from our single job.
let mut registry = JobRegistry::new(&[example_job]);
// Here is where you can configure the registry
// registry.set_error_handler(...)
// And add context
registry.set_context("Hello");
let runner = registry
// Create a job runner using the connection pool.
.runner(&pool)
// Here is where you can configure the job runner
// Aim to keep 10-20 jobs running at a time.
.set_concurrency(10, 20)
// Start the job runner in the background.
.run()
.await?;
// The job runner will continue listening and running
// jobs until `runner` is dropped.
Ok(())
}
生成任务
最后一步是实际运行一个任务。
example_job.builder()
// This is where we can override job configuration
.set_channel_name("bar")
.set_json("John")?
.spawn(&pool)
.await?;
关于README的说明
大多数README内容是由[cargo-readme-sync][]自动从包文档复制的。这样,README总是与文档同步,并且示例经过了测试。
因此,如果您想更改README的一部分,在<!-- cargo-sync-readme start -->
和<!-- cargo-sync-readme end -->
标记之间,请直接更改README.md
,而不是更改src/lib.rs
上的文档,然后与README同步
cargo sync-readme
(确保已安装cargo命令)
cargo install cargo-sync-readme
依赖关系
~50MB
~860K SLoC