#任务队列 #SQLite #队列 #任务 #任务调度器 #调度器

aide-de-camp-sqlite

aide-de-camp中的Queue特质的SQLite实现

2个不稳定版本

0.2.0 2022年12月18日
0.1.1 2022年5月22日
0.1.0 2022年5月22日

#1298数据库接口

MIT/Apache

66KB
1.5K SLoC

aide-de-camp-sqlite

作业队列的SQLite后端实现。

注意:单个作业可能被发送给两个执行者。这是由于SQLite缺少行锁定以及BEGIN EXCLUSIVE TRANSACTION在此用例中表现不佳(非常慢)。这仅在高并发时才会成为问题,在这种情况下,你可能根本不想使用SQLite。换句话说,这并不是“恰好一次”类型的队列。

架构

CREATE TABLE IF NOT EXISTS adc_queue (
 jid TEXT PRIMARY KEY,
 queue TEXT NOT NULL default 'default',
 job_type TEXT not null,
 payload blob not null,
 retries int not null default 0,
 scheduled_at INTEGER not null,
 started_at INTEGER,
 enqueued_at INTEGER not null default (strftime('%s', 'now')),
 priority TINYINT not null default 0,
);

CREATE TABLE IF NOT EXISTS adc_dead_queue (
 jid TEXT PRIMARY KEY,
 queue TEXT NOT NULL,
 job_type TEXT not null,
 payload blob not null,
 retries int not null,
 scheduled_at INTEGER not null,
 started_at INTEGER not null,
 enqueued_at INTEGER not null,
 died_at INTEGER not null default (strftime('%s', 'now')),
 priority TINYINT not null default 0,
);

CREATE INDEX IF NOT EXISTS adc_queue_jobs ON adc_queue (
    scheduled_at asc,
    started_at asc,
    queue,
    job_type
);

该crate包含SQLx MIGRATOR,可用于管理架构。

注意SQLx不支持同一数据库中的多个迁移器。这意味着ADC应该有专门的架构/数据库,或者您负责应用迁移。

示例


use aide_de_camp_sqlite::{SqliteQueue, MIGRATOR};
use aide_de_camp::prelude::{Queue, JobProcessor, JobRunner, RunnerOptions, RunnerRouter, Duration, Xid, CancellationToken};
use async_trait::async_trait;
use sqlx::SqlitePool;

struct MyJob;

#[async_trait::async_trait]
impl JobProcessor for MyJob {
    type Payload = Vec<u32>;
    type Error = anyhow::Error;

    async fn handle(&self, jid: Xid, payload: Self::Payload, cancellation_token: CancellationToken) -> Result<(), Self::Error> {
        // Do work here
        Ok(())
    }

    fn name() -> &'static str {
        "my_job"
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {

    let pool = SqlitePool::connect(":memory:").await?;
    // Setup schema, alternatively you can add schema to your migrations.
    MIGRATOR.run(&pool).await?;
    let queue = SqliteQueue::with_pool(pool);

    // Add job the queue to run next
    let _jid = queue.schedule::<MyJob>(vec![1,2,3], 0).await?;

    // First create a job processor and router
    let router = {
        let mut r = RunnerRouter::default();
        r.add_job_handler(MyJob);
        r
    };
    // Setup runner to at most 10 jobs concurrently
    let mut runner = JobRunner::new(queue, router, 10, RunnerOptions::default());
    // Poll the queue every second, this will block unless something went really wrong.
    // The future supplied as the second parameter will tell the server to shut down when it completes.
    runner.run_with_shutdown(Duration::seconds(1), async move {
        // To avoid blocking this doctest, run for 10 milliseconds, then initiate shutdown.
        tokio::time::sleep(std::time::Duration::from_millis(10)).await;

        // In a real application, you may want to wait for a CTRL+C event or something similar.
        // You could do this with tokio using the signal module: tokio::signal::ctrl_c().await.expect("failed to install CTRL+C signal handler");
    }).await?;
    Ok(())
}

依赖项

~33–48MB
~832K SLoC