2个不稳定版本
0.2.0 | 2022年12月18日 |
---|---|
0.1.1 | 2022年5月22日 |
0.1.0 |
|
#1298 在 数据库接口
66KB
1.5K SLoC
aide-de-camp-sqlite
作业队列的SQLite后端实现。
注意:单个作业可能被发送给两个执行者。这是由于SQLite缺少行锁定以及BEGIN EXCLUSIVE TRANSACTION
在此用例中表现不佳(非常慢)。这仅在高并发时才会成为问题,在这种情况下,你可能根本不想使用SQLite。换句话说,这并不是“恰好一次”类型的队列。
架构
CREATE TABLE IF NOT EXISTS adc_queue (
jid TEXT PRIMARY KEY,
queue TEXT NOT NULL default 'default',
job_type TEXT not null,
payload blob not null,
retries int not null default 0,
scheduled_at INTEGER not null,
started_at INTEGER,
enqueued_at INTEGER not null default (strftime('%s', 'now')),
priority TINYINT not null default 0,
);
CREATE TABLE IF NOT EXISTS adc_dead_queue (
jid TEXT PRIMARY KEY,
queue TEXT NOT NULL,
job_type TEXT not null,
payload blob not null,
retries int not null,
scheduled_at INTEGER not null,
started_at INTEGER not null,
enqueued_at INTEGER not null,
died_at INTEGER not null default (strftime('%s', 'now')),
priority TINYINT not null default 0,
);
CREATE INDEX IF NOT EXISTS adc_queue_jobs ON adc_queue (
scheduled_at asc,
started_at asc,
queue,
job_type
);
该crate包含SQLx MIGRATOR
,可用于管理架构。
注意:SQLx不支持同一数据库中的多个迁移器。这意味着ADC应该有专门的架构/数据库,或者您负责应用迁移。
示例
use aide_de_camp_sqlite::{SqliteQueue, MIGRATOR};
use aide_de_camp::prelude::{Queue, JobProcessor, JobRunner, RunnerOptions, RunnerRouter, Duration, Xid, CancellationToken};
use async_trait::async_trait;
use sqlx::SqlitePool;
struct MyJob;
#[async_trait::async_trait]
impl JobProcessor for MyJob {
type Payload = Vec<u32>;
type Error = anyhow::Error;
async fn handle(&self, jid: Xid, payload: Self::Payload, cancellation_token: CancellationToken) -> Result<(), Self::Error> {
// Do work here
Ok(())
}
fn name() -> &'static str {
"my_job"
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = SqlitePool::connect(":memory:").await?;
// Setup schema, alternatively you can add schema to your migrations.
MIGRATOR.run(&pool).await?;
let queue = SqliteQueue::with_pool(pool);
// Add job the queue to run next
let _jid = queue.schedule::<MyJob>(vec![1,2,3], 0).await?;
// First create a job processor and router
let router = {
let mut r = RunnerRouter::default();
r.add_job_handler(MyJob);
r
};
// Setup runner to at most 10 jobs concurrently
let mut runner = JobRunner::new(queue, router, 10, RunnerOptions::default());
// Poll the queue every second, this will block unless something went really wrong.
// The future supplied as the second parameter will tell the server to shut down when it completes.
runner.run_with_shutdown(Duration::seconds(1), async move {
// To avoid blocking this doctest, run for 10 milliseconds, then initiate shutdown.
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
// In a real application, you may want to wait for a CTRL+C event or something similar.
// You could do this with tokio using the signal module: tokio::signal::ctrl_c().await.expect("failed to install CTRL+C signal handler");
}).await?;
Ok(())
}
依赖项
~33–48MB
~832K SLoC