14个版本
0.5.0 | 2023年10月15日 |
---|---|
0.4.1 | 2022年6月30日 |
0.3.5 | 2022年5月20日 |
0.3.4 | 2021年12月1日 |
0.1.1 | 2021年3月30日 |
在 过程宏 中排名第1046
每月下载量1,453次
在 2 个crate中使用 (通过 sqlxmq)
18KB
197 行
sqlxmq
基于 sqlx
和 PostgreSQL
的作业队列。
这个库允许CRUD应用程序在不复杂化其部署的情况下运行后台作业。唯一的运行时依赖项是 PostgreSQL
,因此这对于已经使用 PostgreSQL
数据库的应用程序来说非常理想。
虽然使用SQL数据库作为作业队列意味着在交付作业的延迟上做出妥协,但常规作业队列中存在的一些问题在这里被完全避免。
在大多数其他作业队列中,正在进行的作业状态通常不在正常数据库备份范围内。即使作业 确实 被备份,也没有办法在手动解决冲突的情况下,将数据库和作业队列恢复到一致的时间点。
通过在数据库中存储作业,现有的备份程序将存储正在进行的作业和持久数据的完美一致状态。此外,作业可以作为其他事务的一部分生成和完成,这使得编写正确的应用程序代码变得容易。
利用 PostgreSQL
的强大功能,这个作业队列提供了其他作业队列中没有的几个功能。
功能
-
一次性发送/接收多个作业。
这减少了数据库查询的数量。
-
发送在未来日期和时间执行的作业。
避免了需要单独的调度系统。
-
可靠的作业交付。
-
具有指数退避的自动重试。
重试次数和初始退避参数可配置。
-
事务性发送作业。
避免在事务回滚时发送虚假作业。
-
事务性完成作业。
如果作业的所有副作用都是对数据库的更新,这可以提供真正的单次执行作业。
-
作业的事务性检查点。
长时间运行的作业可以将状态检查点保存下来,以避免在出现故障时需要从头开始:下一次重试可以从最后一个检查点继续。
-
选择加入严格排序的作业交付。
如果为作业启用了此选项,则同一通道内的作业将按顺序严格处理。
-
公平作业交付。
有很多作业准备运行的通道不会饿死作业更少的通道。
-
选择加入两阶段提交。
这在有序通道中特别有用,其中可以在作业顺序中“预留”位置,但不会在稍后提交。
-
JSON和/或二进制有效负载。
作业可以使用最方便的。
-
作业的自动保持连接。
长时间运行的作业将自动“保持连接”,以防止在它们仍在进行时重试。
-
并发限制。
指定每个运行者应处理的并发作业的最小和最大数量。
-
通过属性宏内置作业注册。
可以轻松将作业注册到运行者,并在每个作业的基础上指定默认配置。
-
隐式通道。
在作业发送和处理时隐式创建和销毁通道,因此无需设置。
-
通道组。
由于通道名称和通道参数的分离,可以轻松一次性订阅多个通道。
-
基于NOTIFY的轮询。
当处理的作业很少时,这可以节省资源。
入门指南
数据库模式
此crate期望某些数据库表和存储过程存在。您可以将此crate中的迁移文件复制到自己的迁移文件夹中。
此crate创建的所有数据库项都以前缀mq
开头,以免与您的模式冲突。
定义作业
第一步是定义一个在作业队列上运行的函数。
use std::error::Error;
use sqlxmq::{job, CurrentJob};
// Arguments to the `#[job]` attribute allow setting default job options.
#[job(channel_name = "foo")]
async fn example_job(
// The first argument should always be the current job.
mut current_job: CurrentJob,
// Additional arguments are optional, but can be used to access context
// provided via [`JobRegistry::set_context`].
message: &'static str,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
// Decode a JSON payload
let who: Option<String> = current_job.json()?;
// Do some work
println!("{}, {}!", message, who.as_deref().unwrap_or("world"));
// Mark the job as complete
current_job.complete().await?;
Ok(())
}
监听作业
接下来,我们需要创建一个作业运行者:这是监听新作业并执行它们的。
use std::error::Error;
use sqlxmq::JobRegistry;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// You'll need to provide a Postgres connection pool.
let pool = connect_to_db().await?;
// Construct a job registry from our single job.
let mut registry = JobRegistry::new(&[example_job]);
// Here is where you can configure the registry
// registry.set_error_handler(...)
// And add context
registry.set_context("Hello");
let runner = registry
// Create a job runner using the connection pool.
.runner(&pool)
// Here is where you can configure the job runner
// Aim to keep 10-20 jobs running at a time.
.set_concurrency(10, 20)
// Start the job runner in the background.
.run()
.await?;
// The job runner will continue listening and running
// jobs until `runner` is dropped.
Ok(())
}
启动作业
最后一步是实际运行作业。
example_job.builder()
// This is where we can override job configuration
.set_channel_name("bar")
.set_json("John")?
.spawn(&pool)
.await?;
关于README的说明
大部分的readme是自动从crate文档中复制的[cargo-readme-sync][]。这样,readme始终与文档同步,并且示例已经过测试。
因此,如果您想在<!-- cargo-sync-readme start -->
和<!-- cargo-sync-readme end -->
标记之间更改readme的一部分,请不要直接编辑README.md
,而是更改src/lib.rs
顶部的文档,然后与readme同步
cargo sync-readme
(确保已安装cargo命令)
cargo install cargo-sync-readme
依赖项
~1.5MB
~35K SLoC