#task-queue #job-queue #job #sqlite #queue #task #jobs

effectum

基于SQLite的嵌入式任务队列

12个版本 (6个重大更新)

0.7.0 2024年7月23日
0.6.0 2024年3月28日
0.5.1 2024年3月26日
0.4.1 2023年12月27日
0.1.5 2022年12月9日

#149 in 异步

Download history 133/week @ 2024-07-23 14/week @ 2024-07-30

每月147次下载

MIT/Apache

255KB
6K SLoC

Effectum

一个基于SQLite的Rust作业队列库,不依赖于任何其他服务。

目前这个库可以嵌入到Rust应用程序中,但未来的目标包括绑定到其他语言以及作为独立服务器运行的能力,可通过HTTP和gRPC访问。这将设计为产品可以从嵌入式版本开始,使用最小的基础设施,然后在需要扩展时以最小的更改转移到服务器版本。

use effectum::{Error, Job, JobState, JobRunner, RunningJob, Queue, Worker};

#[derive(Debug)]
pub struct JobContext {
   // database pool or other things here
}

#[derive(Serialize, Deserialize)]
struct RemindMePayload {
  email: String,
  message: String,
}

async fn remind_me_job(job: RunningJob, context: Arc<JobContext>) -> Result<(), Error> {
    let payload: RemindMePayload = job.json_payload()?;
    // do something with the job
    Ok(())
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Error> {
  // Create a queue
  let queue = Queue::new(&PathBuf::from("effectum.db")).await?;

  // Define a type job for the queue.
  let a_job = JobRunner::builder("remind_me", remind_me_job).build();

  let context = Arc::new(JobContext{
    // database pool or other things here
  });

  // Create a worker to run jobs.
  let worker = Worker::builder(&queue, context)
    .max_concurrency(10)
    .jobs([a_job])
    .build();

  // Submit a job to the queue.
  let job_id = Job::builder("remind_me")
    .run_at(time::OffsetDateTime::now_utc() + std::time::Duration::from_secs(3600))
    .json_payload(&RemindMePayload {
        email: "[email protected]".to_string(),
        message: "Time to go!".to_string()
    })?
    .add_to(&queue)
    .await?;

  // See what's happening with the job.
  let status = queue.get_job_status(job_id).await?;
  assert_eq!(status.state, JobState::Pending);

  // Do other stuff...

  Ok(())
}

变更日志

完整开发笔记

路线图

已发布

  • 多种作业类型
  • 可以将具有更高优先级的作业添加到“跳过队列”
  • 工作进程可以并行运行多个作业
  • 安排未来作业
  • 自动重试失败的作业,使用指数退避
  • 检查点以允许在作业中途失败时智能恢复作业。
  • 在进程意外重启时立即安排重试正在运行的作业
  • 取消或修改挂起的作业
  • 支持周期性作业

即将推出

  • 可选的清理器,以防止“完成”作业数据无限期地累积

稍后推出

  • Node.js绑定
  • 通过gRPC作为独立服务器运行
  • 通过出盒模式与队列通信的辅助工具。

依赖关系

~38MB
~641K SLoC