14个版本

0.5.0 2023年10月15日
0.4.1 2022年6月30日
0.3.5 2022年5月20日
0.3.4 2021年12月1日
0.1.1 2021年3月30日

过程宏 中排名第1046

Download history 4069/week @ 2024-03-15 3750/week @ 2024-03-22 1861/week @ 2024-03-29 1905/week @ 2024-04-05 2421/week @ 2024-04-12 2413/week @ 2024-04-19 2681/week @ 2024-04-26 1954/week @ 2024-05-03 1238/week @ 2024-05-10 1030/week @ 2024-05-17 964/week @ 2024-05-24 651/week @ 2024-05-31 533/week @ 2024-06-07 280/week @ 2024-06-14 322/week @ 2024-06-21 181/week @ 2024-06-28

每月下载量1,453
2 个crate中使用 (通过 sqlxmq)

MIT/Apache

18KB
197

CI Status Documentation crates.io

sqlxmq

基于 sqlxPostgreSQL 的作业队列。

这个库允许CRUD应用程序在不复杂化其部署的情况下运行后台作业。唯一的运行时依赖项是 PostgreSQL,因此这对于已经使用 PostgreSQL 数据库的应用程序来说非常理想。

虽然使用SQL数据库作为作业队列意味着在交付作业的延迟上做出妥协,但常规作业队列中存在的一些问题在这里被完全避免。

在大多数其他作业队列中,正在进行的作业状态通常不在正常数据库备份范围内。即使作业 确实 被备份,也没有办法在手动解决冲突的情况下,将数据库和作业队列恢复到一致的时间点。

通过在数据库中存储作业,现有的备份程序将存储正在进行的作业和持久数据的完美一致状态。此外,作业可以作为其他事务的一部分生成和完成,这使得编写正确的应用程序代码变得容易。

利用 PostgreSQL 的强大功能,这个作业队列提供了其他作业队列中没有的几个功能。

功能

  • 一次性发送/接收多个作业。

    这减少了数据库查询的数量。

  • 发送在未来日期和时间执行的作业。

    避免了需要单独的调度系统。

  • 可靠的作业交付。

  • 具有指数退避的自动重试。

    重试次数和初始退避参数可配置。

  • 事务性发送作业。

    避免在事务回滚时发送虚假作业。

  • 事务性完成作业。

    如果作业的所有副作用都是对数据库的更新,这可以提供真正的单次执行作业。

  • 作业的事务性检查点。

    长时间运行的作业可以将状态检查点保存下来,以避免在出现故障时需要从头开始:下一次重试可以从最后一个检查点继续。

  • 选择加入严格排序的作业交付。

    如果为作业启用了此选项,则同一通道内的作业将按顺序严格处理。

  • 公平作业交付。

    有很多作业准备运行的通道不会饿死作业更少的通道。

  • 选择加入两阶段提交。

    这在有序通道中特别有用,其中可以在作业顺序中“预留”位置,但不会在稍后提交。

  • JSON和/或二进制有效负载。

    作业可以使用最方便的。

  • 作业的自动保持连接。

    长时间运行的作业将自动“保持连接”,以防止在它们仍在进行时重试。

  • 并发限制。

    指定每个运行者应处理的并发作业的最小和最大数量。

  • 通过属性宏内置作业注册。

    可以轻松将作业注册到运行者,并在每个作业的基础上指定默认配置。

  • 隐式通道。

    在作业发送和处理时隐式创建和销毁通道,因此无需设置。

  • 通道组。

    由于通道名称和通道参数的分离,可以轻松一次性订阅多个通道。

  • 基于NOTIFY的轮询。

    当处理的作业很少时,这可以节省资源。

入门指南

数据库模式

此crate期望某些数据库表和存储过程存在。您可以将此crate中的迁移文件复制到自己的迁移文件夹中。

此crate创建的所有数据库项都以前缀mq开头,以免与您的模式冲突。

定义作业

第一步是定义一个在作业队列上运行的函数。

use std::error::Error;

use sqlxmq::{job, CurrentJob};

// Arguments to the `#[job]` attribute allow setting default job options.
#[job(channel_name = "foo")]
async fn example_job(
    // The first argument should always be the current job.
    mut current_job: CurrentJob,
    // Additional arguments are optional, but can be used to access context
    // provided via [`JobRegistry::set_context`].
    message: &'static str,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
    // Decode a JSON payload
    let who: Option<String> = current_job.json()?;

    // Do some work
    println!("{}, {}!", message, who.as_deref().unwrap_or("world"));

    // Mark the job as complete
    current_job.complete().await?;

    Ok(())
}

监听作业

接下来,我们需要创建一个作业运行者:这是监听新作业并执行它们的。

use std::error::Error;

use sqlxmq::JobRegistry;


#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // You'll need to provide a Postgres connection pool.
    let pool = connect_to_db().await?;

    // Construct a job registry from our single job.
    let mut registry = JobRegistry::new(&[example_job]);
    // Here is where you can configure the registry
    // registry.set_error_handler(...)

    // And add context
    registry.set_context("Hello");

    let runner = registry
        // Create a job runner using the connection pool.
        .runner(&pool)
        // Here is where you can configure the job runner
        // Aim to keep 10-20 jobs running at a time.
        .set_concurrency(10, 20)
        // Start the job runner in the background.
        .run()
        .await?;

    // The job runner will continue listening and running
    // jobs until `runner` is dropped.
    Ok(())
}

启动作业

最后一步是实际运行作业。

example_job.builder()
    // This is where we can override job configuration
    .set_channel_name("bar")
    .set_json("John")?
    .spawn(&pool)
    .await?;

关于README的说明

大部分的readme是自动从crate文档中复制的[cargo-readme-sync][]。这样,readme始终与文档同步,并且示例已经过测试。

因此,如果您想在<!-- cargo-sync-readme start --><!-- cargo-sync-readme end -->标记之间更改readme的一部分,请不要直接编辑README.md,而是更改src/lib.rs顶部的文档,然后与readme同步

cargo sync-readme

(确保已安装cargo命令)

cargo install cargo-sync-readme

依赖项

~1.5MB
~35K SLoC