1 个不稳定版本
0.1.0 | 2019 年 4 月 6 日 |
---|
#1739 在 数据库接口
39KB
741 行
DBQ
使用 Postgres 9.5+ 存储队列的作业队列和作业处理库
概述
作业队列和后台处理通常需要额外的技术,如 Redis 或 RabbitMQ。虽然这些是很好的解决方案,但对于许多作业处理需求来说可能过于复杂。dbq 的目标是使用您已有的堆栈中的 Postgres 数据库,使作业队列和后台处理简单而高效。dbq 提供了在特定排队技术中常见的强大持久性和功能:支持多个队列、失败时重试、重试时退避,以及死信存储。
示例
extern crate dbq;
extern crate postgres;
extern crate serde_json;
use std::error::Error;
use std::result::Result;
use std::thread;
use std::time::Duration;
// A simple handler that prints "Hello!" for any job it runs
#[derive(Clone)]
struct HelloHandler {}
fn main() -> Result<(), Box<Error>> {
let db_conn_params = "postgres://postgres:password@localhost/db";
let conn = postgres::Connection::connect(db_conn_params, postgres::TlsMode::None)?;
// Schema config allows for changing the database schema and table names
// Defaults are no schema (default is used) and tables are prefixed with "dbq_"
let schema_config = dbq::SchemaConfig::default();
// Run the migrations on start. Migrations are idempotent and should be run
// on startup
dbq::run_migrations(&schema_config, &conn, Some(646_271)).unwrap();
let queue = dbq::Queue::new(schema_config, "de_lancie_q".to_string());
// Enqueue a job
queue.enqueue("job_a", serde_json::Value::Null, 3, &conn)?;
// Start a worker pool
let workers_config =
dbq::WorkerPoolConfig::new(queue, db_conn_params, HelloHandler {})?;
let workers = dbq::WorkerPool::start(workers_config);
// Give a worker time to find and start the job
thread::sleep(Duration::new(1, 0));
// Shutdown the worker pool waiting for all currently executing jobs to finish
workers.join();
Ok(())
}
impl dbq::Handler for HelloHandler {
type Error = std::io::Error;
fn handle(&self, _ctx: dbq::JobContext) -> Result<(), Self::Error> {
println!("Hello!");
Ok(())
}
}
工作原理
dbq
在 Postgres 中创建两个表,用于存储队列和“死信”作业。这些表的名字和模式可以使用 SchemaConfig
进行配置。将作业入队就像在队列表中插入一样简单。使用 WorkerPool
进行作业处理。WorkerPool
为每个工作进程创建一个 OS 线程,每个工作进程持续轮询队列以查看是否有可用的作业。每个工作进程通过使用 FOR UPDATE
锁定行并作为事务的一部分运行作业来声明作业。工作进程使用 SKIP LOCKED
轮询队列,以忽略任何由其他工作进程正在处理的作业。如果作业失败,则更新队列中的作业以便进行重试,或者如果达到了最大尝试次数,则将其移动到死信表。如果作业成功,则从队列中删除作业。
每个作业都是队列的一部分,并由一个字符串类和 JSON 参数参数化。该类标识作业的类型,而参数是作业运行所需的任何内容。
但是数据库队列并不是一个好主意...
虽然数据库队列可能仍然不是非常高的吞吐量用例的最佳选择,但 Postgres 中较新的数据库功能(特别是 SKIP LOCKED
)使数据库队列成为低到中等吞吐量应用的非常合理的选择。以下是数据库队列和特定排队技术之间的一些主要权衡
数据库队列的优点
- 使用事务将作业中的所有数据库更改同步到作业的成功或失败
- 强持久性保证意味着您的作业与您的其他数据一样安全(持久性通常在其他排队技术中作为可选功能提供)
- 无需在堆栈中添加其他技术。使用您已有的Postgres数据库!
数据库队列的缺点
- 读取和写入可扩展性受数据库限制(几乎总是单个节点进行写入)
- 持久性可能不是所有用例都需要
- 机器资源与其他所有数据库查询和语句共享
依赖项
~6–15MB
~199K SLoC