#cron #task-scheduling #delay #timer #scheduler #task-list #task-manager

delay_timer_wf

延迟任务的时间管理器。类似于crontab,但支持同步异步任务,并支持动态添加/取消/删除。

3个不稳定版本

0.13.0 2024年1月2日
0.12.1 2024年1月2日
0.12.0 2024年1月2日

#218 in 操作系统

Apache-2.0 OR MIT

185KB
3.5K SLoC

delay-timer

Build License Cargo Documentation

延迟任务的时间管理器。类似于crontab,但支持同步异步任务,并支持动态添加/取消/删除。

delay-timer是一个基于时间轮算法的任务管理器,可以轻松管理定时任务,或者定期执行任意任务,如闭包。

底层运行时基于可选的smol和tokio,你可以使用其中任何一个来构建你的应用程序。

最小支持的rustc版本是 1.56

除了简单的几秒内执行外,你也可以指定一个具体的日期,例如星期天凌晨4点执行备份任务。

支持配置任务的并行最大数量。
通过句柄动态取消正在运行的任务实例。

image

如果你在寻找一个分布式任务调度平台,请查看 delicate

示例


use anyhow::Result;
use delay_timer::prelude::*;

fn main() -> Result<()> {
   // Build an DelayTimer that uses the default configuration of the Smol runtime internally.
   let delay_timer = DelayTimerBuilder::default().build();

   // Develop a print job that runs in an asynchronous cycle.
   // A chain of task instances.
   let task_instance_chain = delay_timer.insert_task(build_task_async_print()?)?;

   // Get the running instance of task 1.
   let task_instance = task_instance_chain.next_with_wait()?;

   // Cancel running task instances.
   task_instance.cancel_with_wait()?;

   // Remove task which id is 1.
   delay_timer.remove_task(1)?;

   // No new tasks are accepted; running tasks are not affected.
   delay_timer.stop_delay_timer()?;

   Ok(())
}

fn build_task_async_print() -> Result<Task, TaskError> {
   let mut task_builder = TaskBuilder::default();

   let body = || async {
       println!("create_async_fn_body!");

       Timer::after(Duration::from_secs(3)).await;

       println!("create_async_fn_body:i'success");
   };

   task_builder
       .set_task_id(1)
       .set_frequency_repeated_by_cron_str("@secondly")
       .set_maximum_parallel_runnable_num(2)
       .spawn_async_routine(body)
}

在异步上下文中使用。


use delay_timer::prelude::*;

use anyhow::Result;

use smol::Timer;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<()> {
   // In addition to the mixed (smol & tokio) runtime
   // You can also share a tokio runtime with delayTimer, please see api `DelayTimerBuilder::tokio_runtime` for details.

   // Build an DelayTimer that uses the default configuration of the Smol runtime internally.
   let delay_timer = DelayTimerBuilder::default().build();

   // Develop a print job that runs in an asynchronous cycle.
   let task_instance_chain = delay_timer.insert_task(build_task_async_print()?)?;

   // Get the running instance of task 1.
   let task_instance = task_instance_chain.next_with_async_wait().await?;

   // Cancel running task instances.
   task_instance.cancel_with_async_wait().await?;


   // Remove task which id is 1.
   delay_timer.remove_task(1)?;

   // No new tasks are accepted; running tasks are not affected.
   delay_timer.stop_delay_timer()
}

fn build_task_async_print() -> Result<Task, TaskError> {
   let mut task_builder = TaskBuilder::default();

   let body = || async {
       println!("create_async_fn_body!");

       Timer::after(Duration::from_secs(3)).await;

       println!("create_async_fn_body:i'success");
   };

   task_builder
       .set_task_id(1)
       .set_frequency_repeated_by_cron_str("@secondly")
       .set_maximum_parallel_runnable_num(2)
       .spawn_async_routine(body)
}

捕获指定环境信息并构建闭包和任务

#[macro_use]
use delay_timer::prelude::*;

use std::sync::atomic::{
    AtomicUsize,
    Ordering::{Acquire, Release},
};
use std::sync::Arc;


let delay_timer = DelayTimer::new();
let share_num = Arc::new(AtomicUsize::new(0));
let share_num_bunshin = share_num.clone();

let body = move || {
    share_num_bunshin.fetch_add(1, Release);
};

let task = TaskBuilder::default()
    .set_frequency_count_down_by_cron_str(expression, 3)
    .set_task_id(1)
    .spawn_routine(body)?;

delay_timer.add_task(task)?;

构建自定义动态未来任务

#[macro_use]
use delay_timer::prelude::*;
use hyper::{Client, Uri};

fn build_task_customized_async_task() -> Result<Task, TaskError> {
   let id = 1;
   let name = String::from("someting");
   let mut task_builder = TaskBuilder::default();

   let body = move || {
       let name_ref = name.clone();
       async move {
           async_template(id, name_ref).await.expect("Request failed.");

           sleep(Duration::from_secs(3)).await;

           println!("create_async_fn_body:i'success");
       }
   };

   task_builder
       .set_frequency_repeated_by_cron_str("0,10,15,25,50 0/1 * * Jan-Dec * 2020-2100")
       .set_task_id(5)
       .set_maximum_running_time(5)
       .spawn_async_routine(body)
}


pub async fn async_template(id: i32, name: String) -> Result<()> {
   let url = format!("https://httpbin.org/get?id={}&name={}", id, name);
   let mut res = surf::get(url).await?;
   dbg!(res.body_string().await?);

   Ok(())
}

在[示例]目录中还有很多。

许可证

许可如下

待办事项列表

  • 支持tokio生态。
  • 禁用将引发panic的unwrap相关方法。
  • 当delayTimer释放时,线程和正在运行的任务退出。
  • 整理代码中的待办事项,补充测试和基准测试。
  • 批量操作。
  • 服务器报告。
  • 将delay_timer升级为多轮模式,不同的执行器处理不同的轮次,例如,对一个轮次减去圈数,对一个轮次运行任务。

贡献

除非你明确声明,否则任何旨在包含在作品中的贡献,如Apache-2.0许可证中定义,应按上述方式双许可,而不附加任何额外的条款或条件。

依赖项

~11–22MB
~308K SLoC