#任务队列 #PostgreSQL #任务 #工作器 #性能 # #异步任务

graphile_worker

高性能的 Rust/PostgreSQL 任务队列(也适用于将 PostgreSQL 触发器/函数生成的任务输出到不同的工作队列)

9 个版本 (4 个重大更新)

0.8.0 2024 年 5 月 29 日
0.7.2 2024 年 5 月 28 日
0.7.0 2024 年 2 月 28 日
0.6.2 2024 年 2 月 14 日
0.4.0 2024 年 1 月 31 日

#318 in 数据库接口

每月 30 次下载

自定义许可

280KB
5.5K SLoC

Graphile Worker RS

Codecov Crates.io Documentation

Rust 重写的 Graphile Worker。如果你喜欢这个库,请赞助 Benjie 项目,所有研究都由他完成,这个库只是 Rust 的重写 🦀。端口应该与 graphile-worker 大多数兼容(这意味着你可以与 Node.JS 并行运行)。

以下与 Graphile Worker 不同

  • 不支持批量任务(我个人不需要,但如果这不是你的情况,请创建一个 issue,我会看看我能做什么)
  • Graphile Worker 中,每个进程都有一个 worker_id。在 Rust 中只有一个 worker_id,然后工作在异步运行时线程中处理任务。

Rust 运行的 PostgreSQL 任务队列 - 允许您在“后台”运行任务(例如发送电子邮件、执行计算、生成 PDF 等),这样您的 HTTP 响应/应用程序代码就不会被阻塞。适用于任何基于 PostgreSQL 的应用程序。

将工作器添加到您的项目中

cargo add graphile_worker

创建任务并运行工作器

任务定义简单为一个异步函数和一个任务标识符

use serde::{Deserialize, Serialize};
use graphile_worker::{WorkerContext, TaskHandler};

#[derive(Deserialize, Serialize)]
struct SayHello {
    message: String,
}

impl TaskHandler for SayHello {
    const IDENTIFIER: &'static str = "say_hello";

    async fn run(self, _ctx: WorkerContext) -> Result<(), ()> {
        println!("Hello {} !", self.message);
        Ok(())
    }
}

#[tokio::main]
async fn main() -> Result<(), ()> {
    graphile_worker::WorkerOptions::default()
        .concurrency(2)
        .schema("example_simple_worker")
        .define_job::<SayHello>()
        .pg_pool(pg_pool)
        .init()
        .await?
        .run()
        .await?;

    Ok(())
}

通过 SQL 安排任务

连接到您的数据库并运行以下 SQL

SELECT graphile_worker.add_job('say_hello', json_build_object('name', 'Bobby Tables'));

通过 RUST 安排任务

#[tokio::main]
async fn main() -> Result<(), ()> {
    // ...
    let utils = worker.create_utils();

    // Using add_job
    utils.add_job(
        SayHello { name: "Bobby Tables".to_string() },
        Default::default(),
    ).await.unwrap();

    // You can also use `add_raw_job` if you don't have access to the task, or don't care about end 2 end safety
    utils.add_raw_job("say_hello", serde_json::json!({ "name": "Bobby Tables" }), Default::default()).await.unwrap();

    Ok(())
}

应用状态

您可以通过 extension 提供应用状态

use serde::{Deserialize, Serialize};
use graphile_worker::{WorkerContext, TaskHandler};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::Arc;

#[derive(Clone, Debug)]
struct AppState {
    run_count: Arc<AtomicUsize>,
}

#[derive(Deserialize, Serialize)]
pub struct CounterTask;

impl TaskHandler for CounterTask {
    const IDENTIFIER: &'static str = "counter_task";

    async fn run(self, ctx: WorkerContext) {
        let app_state = ctx.extensions().get::<AppState>().unwrap();
        let run_count = app_state.run_count.fetch_add(1, SeqCst);
        println!("Run count: {run_count}");
    }
}

#[tokio::main]
async fn main() -> Result<(), ()> {
    graphile_worker::WorkerOptions::default()
        .concurrency(2)
        .schema("example_simple_worker")
        .add_extension(AppState {
            run_count: Arc::new(AtomicUsize::new(0)),
        })
        .define_job::<CounterTask>()
        .pg_pool(pg_pool)
        .init()
        .await?
        .run()
        .await?;

    Ok(())
}

功能

  • 独立和嵌入式模式
  • 设计用于从 JavaScript 或直接在数据库中使用
  • 易于测试(建议:使用 run_once 工具)
  • 低延迟(通常从任务安排到执行的时间小于 3 毫秒,使用 LISTEN/NOTIFY 来通知任务插入)
  • 高性能(使用 SKIP LOCKED 来查找要执行的任务,从而实现更快的检索)
  • 小任务(使用显式的任务名称/有效负载,从而最小化序列化和反序列化开销)
  • 默认并行
  • 向同一名称的队列添加任务将按顺序运行它们
  • 自动以指数退避重新尝试失败的任务
  • 可自定义的重试次数(默认:约 3 天内的 25 次尝试)
  • 类似 crontab 的计划功能,用于重复性任务(带可选的回填)
  • 通过唯一的 job_key 进行任务去重
  • 使用“批量作业”向已入队的任务追加数据
  • 开源;宽松的MIT许可证
  • 执行用Rust编写的任务(这些任务可以调用任何其他语言或网络服务)
  • 原生用Rust编写
  • 如果您运行得非常紧凑,可以将Graphile Worker与您的服务器在同一Rust进程中运行,以降低成本和DevOps复杂度。

状态

已准备投入生产,但API可能有些粗糙,可能会发生变化。

需求

PostgreSQL 12+ 可能与较旧版本兼容,但尚未测试。

注意:需要Postgres 12才能使用 生成的始终 as (表达式) 功能

安装

cargo add graphile_worker

运行

graphile_worker 管理自己的数据库模式(graphile_worker_worker)。只需指向您的数据库,我们就会处理自己的迁移。

依赖项

~40–54MB
~1M SLoC