#任务调度器 #tokio #钩子 #运行时 #重试 #异步 #执行

tokio-scheduler-rs

使用tokio运行时编写的另一个JobScheduler,支持自动重试、钩子和自定义存储

12个版本 (稳定)

1.1.7 2024年6月14日
1.1.6 2023年10月25日
1.1.4 2023年9月1日
1.1.2 2023年8月27日
0.1.4 2022年12月30日

#176 in 异步

MIT 协议

390KB
748

TOKIO-SCHEDULER-RS

另一个JobScheduler

功能

  • 完全异步
  • 使用tokio运行时编写
  • 最大可定制性
  • 支持钩子
  • 支持自动重试
  • 支持分布式任务执行(您需要自行实现)
  • 默认支持tracing

示例

use std::sync::Arc;

use tokio_scheduler_rs::{job_hook::JobHook,job_hook::JobHookReturn,async_trait,DefaultJobExecutor, JobScheduler, MemoryJobStorage, JobContext, JobFuture,Value,ScheduleJob};

struct ExampleJob;

impl ScheduleJob for ExampleJob{
    fn get_job_name(&self) -> String {
        String::from("ExampleJob")
    }

    fn execute(&self, ctx: JobContext) -> JobFuture {
        Box::pin(async move{
            println!("Hello, World! My JobId is {}",ctx.get_id());
            Ok(Value::default())
        })
    }
}

struct ExampleHook;

#[async_trait]
impl JobHook for ExampleHook {
    async fn on_execute(&self, name: &str, id: &str, args: &Option<Value>) -> JobHookReturn {
        println!(
            "Task: {} with id: {} and args: {:#?} is going to execute!",
            name, id, args
        );
        JobHookReturn::NoAction
        // If you want to Cancel this running ONLY THIS TIME:
        // JobHookReturn::CancelRunning
        // or you want to Cancel this running and remove this schedule forever:
        // JobHookReturn::RemoveJob
    }
    async fn on_complete(
        &self,
        name: &str,
        id: &str,
        args: &Option<Value>,
        result: &anyhow::Result<Value>,
        retry_times: u64,
    ) -> JobHookReturn {
        println!(
            "Task: {} with id: {} and args: {:#?} is complete! Result is: {:#?}, retry time is: {}",
            name, id, args, result, retry_times
        );
        JobHookReturn::NoAction
        // If you want to Cancel this running and remove this schedule forever:
        // JobHookReturn::RemoveJob
        // Or if you want to retry this job:
        // JobHookReturn::RetryJob
    }
    async fn on_success(
        &self,
        name: &str,
        id: &str,
        args: &Option<Value>,
        return_vaule: &Value,
        retry_times: u64,
    ) -> JobHookReturn {
        println!(
            "Task: {} with id: {} and args: {:#?} is complete! ReturnValue is: {:#?}, retry time is: {}",
            name, id, args, return_vaule, retry_times
        );
        JobHookReturn::NoAction
        // If you want to Cancel this running and remove this schedule forever:
        // JobHookReturn::RemoveJob
        // Or if you want to retry this job:
        // JobHookReturn::RetryJob
    }
    async fn on_fail(
        &self,
        name: &str,
        id: &str,
        args: &Option<Value>,
        error: &anyhow::Error,
        retry_times: u64,
    ) -> JobHookReturn {
        println!(
            "Task: {} with id: {} and args: {:#?} is complete! Error is: {:#?}, retry time is: {}",
            name, id, args, error, retry_times
        );
        JobHookReturn::NoAction
        // If you want to Cancel this running and remove this schedule forever:
        // JobHookReturn::RemoveJob
        // Or if you want to retry this job:
        // JobHookReturn::RetryJob
    }
}

#[tokio::main]
async fn main() {
    // Create a new `job_storage`, you can impl it by yourself.
    // !!!  PLEASE NOTICE THAT MEMORYJOBSTORAGE SHOULD NOT BE USED IN PRODUCTION  !!!
    let job_storage = Arc::new(MemoryJobStorage::new(chrono::Utc));
    // Create a new `job_executor`.
    // You should register your job hook here
    let job_executor = DefaultJobExecutor::new(
        job_storage.to_owned(),
        Some(10),
        Some(Box::new(ExampleHook)),
        30
    );
    let scheduler = JobScheduler::new(job_storage, job_executor);

    // Register a job
    scheduler.register_job(Box::new(ExampleJob)).await.unwrap();

    // Set a schedule with given cron expression.
    // !!! PLEASE NOTICE THAT YOU MUST REGISTER THE JOB FIRST !!!
    scheduler
        .add_job(&ExampleJob.get_job_name(), "* * * * * * *", &None)
        .await
        .unwrap();

    // Don't forget to start it.
    println!("Start scheduler");
    scheduler.start();

    tokio::time::sleep(std::time::Duration::from_secs(10)).await;

    // Wait for all jobs are processed and stop the schedule.
    // The `JobExecutor` will stop execute NEW job once you execute this.
    println!("Stop scheduler");
    scheduler.wait_for_stop().await;
}

示例

您可以在examples目录中查看示例。

贡献

如果您有想法,可以创建一个pull request或提交一个issue。

欢迎任何形式的贡献!

分布式任务执行

如您所见,在JobExecutor trait中,我们定义了两个函数:startstop

因此,您可以定义自己的JobExecutor,轮询JobStorage并将任务信息通过任何方式传输到远程。

在远程机器上,您应该定义具有给定nameJob,然后使用从原始JobExecutor机器接收到的任务信息(new JobContext)执行它。

DistributedJob

路线图

  • 添加更多测试
  • 开箱即用的分布式任务执行系统

许可

MIT

依赖

~7–15MB
~176K SLoC