#job-processing #job-queue #postgresql #background-processing #background-jobs #queueing #table

dbq

使用 Postgres 9.5+ 存储队列的作业队列和作业处理库

1 个不稳定版本

0.1.0 2019 年 4 月 6 日

#1739数据库接口

MIT 许可证

39KB
741

DBQ

使用 Postgres 9.5+ 存储队列的作业队列和作业处理库

文档

概述

作业队列和后台处理通常需要额外的技术,如 Redis 或 RabbitMQ。虽然这些是很好的解决方案,但对于许多作业处理需求来说可能过于复杂。dbq 的目标是使用您已有的堆栈中的 Postgres 数据库,使作业队列和后台处理简单而高效。dbq 提供了在特定排队技术中常见的强大持久性和功能:支持多个队列、失败时重试、重试时退避,以及死信存储。

示例

extern crate dbq;
extern crate postgres;
extern crate serde_json;

use std::error::Error;
use std::result::Result;
use std::thread;
use std::time::Duration;

// A simple handler that prints "Hello!" for any job it runs
#[derive(Clone)]
struct HelloHandler {}

fn main() -> Result<(), Box<Error>> {
    let db_conn_params = "postgres://postgres:password@localhost/db";
    let conn = postgres::Connection::connect(db_conn_params, postgres::TlsMode::None)?;

    // Schema config allows for changing the database schema and table names
    // Defaults are no schema (default is used) and tables are prefixed with "dbq_"
    let schema_config = dbq::SchemaConfig::default();
    // Run the migrations on start. Migrations are idempotent and should be run
    // on startup
    dbq::run_migrations(&schema_config, &conn, Some(646_271)).unwrap();

    let queue = dbq::Queue::new(schema_config, "de_lancie_q".to_string());

    // Enqueue a job
    queue.enqueue("job_a", serde_json::Value::Null, 3, &conn)?;

    // Start a worker pool
    let workers_config =
        dbq::WorkerPoolConfig::new(queue, db_conn_params, HelloHandler {})?;
    let workers = dbq::WorkerPool::start(workers_config);

    // Give a worker time to find and start the job
    thread::sleep(Duration::new(1, 0));
    // Shutdown the worker pool waiting for all currently executing jobs to finish
    workers.join();
    Ok(())
}

impl dbq::Handler for HelloHandler {
    type Error = std::io::Error;

    fn handle(&self, _ctx: dbq::JobContext) -> Result<(), Self::Error> {
        println!("Hello!");
        Ok(())
    }
}

工作原理

dbq 在 Postgres 中创建两个表,用于存储队列和“死信”作业。这些表的名字和模式可以使用 SchemaConfig 进行配置。将作业入队就像在队列表中插入一样简单。使用 WorkerPool 进行作业处理。WorkerPool 为每个工作进程创建一个 OS 线程,每个工作进程持续轮询队列以查看是否有可用的作业。每个工作进程通过使用 FOR UPDATE 锁定行并作为事务的一部分运行作业来声明作业。工作进程使用 SKIP LOCKED 轮询队列,以忽略任何由其他工作进程正在处理的作业。如果作业失败,则更新队列中的作业以便进行重试,或者如果达到了最大尝试次数,则将其移动到死信表。如果作业成功,则从队列中删除作业。

每个作业都是队列的一部分,并由一个字符串类和 JSON 参数参数化。该类标识作业的类型,而参数是作业运行所需的任何内容。

但是数据库队列并不是一个好主意...

虽然数据库队列可能仍然不是非常高的吞吐量用例的最佳选择,但 Postgres 中较新的数据库功能(特别是 SKIP LOCKED)使数据库队列成为低到中等吞吐量应用的非常合理的选择。以下是数据库队列和特定排队技术之间的一些主要权衡

数据库队列的优点

  • 使用事务将作业中的所有数据库更改同步到作业的成功或失败
  • 强持久性保证意味着您的作业与您的其他数据一样安全(持久性通常在其他排队技术中作为可选功能提供)
  • 无需在堆栈中添加其他技术。使用您已有的Postgres数据库!

数据库队列的缺点

  • 读取和写入可扩展性受数据库限制(几乎总是单个节点进行写入)
  • 持久性可能不是所有用例都需要
  • 机器资源与其他所有数据库查询和语句共享

依赖项

~6–15MB
~199K SLoC