#job-queue #task-queue #queue #job #task #sqlite #jobs

prefect

基于SQLite的嵌入式任务队列(重命名为effectum)

6个版本

0.2.0 2022年12月3日
0.1.4 2022年11月26日

#2189 in 异步

MIT/Apache

155KB
3.5K SLoC

本项目已更名为 effectum,以避免与在类似领域运营的Prefect公司产生混淆。


lib.rs:

一个基于SQLite的任务队列库,允许在不需外部依赖的情况下运行后台作业。

use prefect::{Error, Job, JobState, JobRunner, RunningJob, Queue, Worker};

#[derive(Debug)]
pub struct JobContext {
   // database pool or other things here
}

#[derive(Serialize, Deserialize)]
struct RemindMePayload {
  email: String,
  message: String,
}

async fn remind_me_job(job: RunningJob, context: Arc<JobContext>) -> Result<(), Error> {
    let payload: RemindMePayload = job.json_payload()?;
    // do something with the job
    Ok(())
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Error> {
  // Create a queue
  let queue = Queue::new(Path::new("prefect.db")).await?;

  // Define a job type for the queue.
  let a_job = JobRunner::builder("remind_me", remind_me_job).build();

  let context = Arc::new(JobContext{
    // database pool or other things here
  });

  // Create a worker to run jobs.
  let worker = Worker::builder(&queue, context)
    .max_concurrency(10)
    .jobs([a_job])
    .build();

  // Submit a job to the queue.
  let job_id = Job::builder("remind_me")
    .run_at(time::OffsetDateTime::now_utc() + std::time::Duration::from_secs(3600))
    .json_payload(&RemindMePayload {
        email: "[email protected]".to_string(),
        message: "Time to go!".to_string()
    })?
    .add_to(&queue)
    .await?;

  // See what's happening with the job.
  let status = queue.get_job_status(job_id).await?;
  assert_eq!(status.state, JobState::Pending);

  // Do other stuff...

  Ok(())
}

依赖项

~38MB
~627K SLoC