#tokio #cron #postgresql #nats #scheduler #cron-job #task-scheduler

tokio-cron-scheduler

使用类似于cron的注解在tokio上安排任务,在某一瞬间执行或以固定的时间间隔重复执行。任务可以选择使用PostgreSQL或Nats进行持久化。

43个版本

0.11.0 2024年8月9日
0.10.2 2024年4月26日
0.10.0 2024年1月29日
0.9.4 2023年2月13日
0.2.1 2021年3月25日

#8 in 日期和时间

Download history 11517/week @ 2024-05-03 14837/week @ 2024-05-10 14308/week @ 2024-05-17 12902/week @ 2024-05-24 13592/week @ 2024-05-31 14356/week @ 2024-06-07 12655/week @ 2024-06-14 12872/week @ 2024-06-21 11297/week @ 2024-06-28 11774/week @ 2024-07-05 13401/week @ 2024-07-12 13052/week @ 2024-07-19 13096/week @ 2024-07-26 10707/week @ 2024-08-02 12143/week @ 2024-08-09 9632/week @ 2024-08-16

48,089 每月下载量
用于 24 个crate(直接使用18个)

MIT/Apache

240KB
5.5K SLoC

tokio-cron-scheduler

在异步tokio环境中使用cron-like调度。还可以立即安排任务或以固定的时间间隔重复执行。任务数据可以选择使用PostgreSQL或Nats进行持久化。

灵感来源于 https://github.com/lholden/job_scheduler

用法

有关更多详细信息,请参阅 文档

请确保将job_scheduler crate添加到您的 Cargo.toml

[dependencies]
tokio-cron-scheduler = "*"

使用cron库中Schedule类型的FromStr impl创建一个工作安排。

调度格式如下

sec   min   hour   day of month   month   day of week   year
*     *     *      *              *       *             *

时间指定为 UTC 而不是您的本地时区。注意,年份可以省略。如果您想使用您的时区,请在工作创建调用中附加 _tz(例如 Job::new_async 与 Job::new_async_tz)。

5,8,10这样的逗号分隔值代表多个时间值。例如,一个调度为0 2,14,26 * * * *将在每小时的第2分钟、第14分钟和第26分钟执行。

范围可以用短横线指定。以下计划 0 0 * 5-10 * * 每小时执行一次,但仅在月份的第5天到第10天。

星期几可以用缩写或全名指定。以下计划 0 0 6 * * Sun,Sat 在星期天和星期六早上6点执行。

对于每个作业,你可以在作业开始、停止和删除时收到通知。因为这些通知是使用 tokio::spawn 定时执行的,如果任务很快完成,则这些通知的顺序没有保证。

简单用法示例

use std::time::Duration;
use tokio_cron_scheduler::{Job, JobScheduler, JobSchedulerError};

#[tokio::main]
async fn main() -> Result<(), JobSchedulerError> {
    let mut sched = JobScheduler::new().await?;

    // Add basic cron job
    sched.add(
        Job::new("1/10 * * * * *", |_uuid, _l| {
            println!("I run every 10 seconds");
        })?
    ).await?;

    // Add async job
    sched.add(
        Job::new_async("1/7 * * * * *", |uuid, mut l| {
            Box::pin(async move {
                println!("I run async every 7 seconds");

                // Query the next execution time for this job
                let next_tick = l.next_tick_for_job(uuid).await;
                match next_tick {
                    Ok(Some(ts)) => println!("Next time for 7s job is {:?}", ts),
                    _ => println!("Could not get next tick for 7s job"),
                }
            })
        })?
    ).await?;

    // Add one-shot job with given duration
    sched.add(
        Job::new_one_shot(Duration::from_secs(18), |_uuid, _l| {
            println!("I only run once");
        })?
    ).await?;

    // Create repeated job with given duration, make it mutable to edit it afterwards
    let mut jj = Job::new_repeated(Duration::from_secs(8), |_uuid, _l| {
        println!("I run repeatedly every 8 seconds");
    })?;

    // Add actions to be executed when the jobs starts/stop etc.
    jj.on_start_notification_add(&sched, Box::new(|job_id, notification_id, type_of_notification| {
        Box::pin(async move {
            println!("Job {:?} was started, notification {:?} ran ({:?})", job_id, notification_id, type_of_notification);
        })
    })).await?;

    jj.on_stop_notification_add(&sched, Box::new(|job_id, notification_id, type_of_notification| {
        Box::pin(async move {
            println!("Job {:?} was completed, notification {:?} ran ({:?})", job_id, notification_id, type_of_notification);
        })
    })).await?;

    jj.on_removed_notification_add(&sched, Box::new(|job_id, notification_id, type_of_notification| {
        Box::pin(async move {
            println!("Job {:?} was removed, notification {:?} ran ({:?})", job_id, notification_id, type_of_notification);
        })
    })).await?;
    sched.add(jj).await?;

    // Feature 'signal' must be enabled
    sched.shutdown_on_ctrl_c();

    // Add code to be run during/after shutdown
    sched.set_shutdown_handler(Box::new(|| {
        Box::pin(async move {
            println!("Shut down done");
        })
    }));

    // Start the scheduler
    sched.start().await?;

    // Wait while the jobs run
    tokio::time::sleep(Duration::from_secs(100)).await;

    Ok(())
}

时区变化

你可以使用 JobBuilder API 使用特定时区创建作业。chrono-tz 不会被包含在依赖中,所以如果你想要轻松创建 Timezone 结构体,你需要将其添加到你的 Cargo.toml 中。

    let job = JobBuilder::new()
.with_timezone(chrono_tz::Africa::Johannesburg)
.with_cron_job_type()
.with_schedule("*/2 * * * * *")
.unwrap()
.with_run_async(Box::new( | uuid, mut l| {
Box::pin(async move {
info ! ("JHB run async every 2 seconds id {:?}", uuid);
let next_tick = l.next_tick_for_job(uuid).await;
match next_tick {
Ok(Some(ts)) => info !("Next time for JHB 2s is {:?}", ts),
_ => warn !("Could not get next tick for 2s job"),
}
})
}))
.build()
.unwrap();

类似库

  • job_scheduler 启发这个库的crate
  • cron 我们使用的cron表达式解析器。
  • schedule-rs 是一个类似的rust库,实现了它自己的cron表达式解析器。

许可证

TokioCronScheduler 根据以下之一进行许可

自定义存储

MetadataStore 和 NotificationStore 特性可以实施并在 JobScheduler 中使用。

SimpleMetadataStore 和 SimpleNotificationStore 提供了一个基于 volatile hashmap 的默认版本。NatsMetadataStore 和 NatsNotificationStore 提供了一个使用 Nats 的持久版本。

贡献

除非你明确声明,否则根据 Apache-2.0 许可证定义,任何有意提交以包含在你提交的工作中的贡献,都应以上述双重许可,不附加任何额外条款或条件。

有关更多信息,请参阅 CONTRIBUTING 文件。

特性

has_bytes

自 0.7以来

启用 Prost 生成的数据结构由需要获取数据结构字节的存储使用。Nats 和 Postgres 存储依赖于此功能被启用。

postgres_storage

自 0.6以来

添加了 Postgres 元数据存储和通知存储(PostgresMetadataStore,PostgresNotificationStore)。使用 Postgres 数据库来存储元数据和通知数据。

查看 PostgreSQL 文档

postgres_native_tls

自 0.6以来

使用 postgres-native-tls crate 作为 PostgreSQL 连接的 TLS 提供者。

postgres_openssl

自 0.6以来

使用 postgres-openssl crate 作为 PostgreSQL 连接的 TLS 提供者。

nats_storage

自 0.6以来

添加了 Nats 元数据存储和通知存储(NatsMetadataStore,NatsNotificationStore)。使用 Nats 系统作为存储元数据和通知的方式。

查看 Nats 文档

signal

自 0.5以来

向调度器添加了 shutdown_on_signalshutdown_on_ctrl_c。在接收到信号时,两者都会关闭系统(停止调度器并删除所有任务)。

由于它利用了Tokio的信号处理,因此这仅在Unix系统上可用。

编写测试

在进行tokio::test时,请记住要在多线程上下文中运行它,否则测试将挂起在 scheduler.add()

例如


#[cfg(test)]
mod test {
    use tokio_cron_scheduler::{Job, JobScheduler};
    use tracing::{info, Level};
    use tracing_subscriber::FmtSubscriber;

    // Needs multi_thread to test, otherwise it hangs on scheduler.add()
    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
    // #[tokio::test]
    async fn test_schedule() {
        let subscriber = FmtSubscriber::builder()
            .with_max_level(Level::TRACE)
            .finish();
        tracing::subscriber::set_global_default(subscriber)
            .expect("Setting default subscriber failed");

        info!("Create scheduler");
        let scheduler = JobScheduler::new().await.unwrap();
        info!("Add job");
        scheduler
            .add(
                Job::new_async("*/1  * * * * *", |_, _| {
                    Box::pin(async {
                        info!("Run every seconds");
                    })
                })
                    .unwrap(),
            )
            .await
            .expect("Should be able to add a job");

        scheduler.start().await.unwrap();

        tokio::time::sleep(core::time::Duration::from_secs(20)).await;
    }
}

示例

简单

运行基于内存的基于hashmap的存储

 cargo run --example simple --features="tracing-subscriber"

postgres

首先需要一个运行的PostgreSQL实例

docker run --rm -it -p 5432:5432 -e POSTGRES_USER="postgres" -e POSTGRES_PASSWORD="" -e POSTGRES_HOST_AUTH_METHOD="trust" postgres:14.1

然后运行示例

POSTGRES_INIT_METADATA=true POSTGRES_INIT_NOTIFICATIONS=true cargo run --example postgres --features="postgres_storage tracing-subscriber"

nats

首先需要一个启用Jetstream的运行的Nats实例

docker run --rm -it -p 4222:4222 -p 6222:6222 -p 7222:7222 -p 8222:8222 nats -js -DV

然后运行示例

cargo run --example nats --features="nats_storage tracing-subscriber"

设计

作业活动

Job activity

创建作业

Create job

创建通知

Create notification

删除作业

Delete job

删除通知

Delete notification

依赖关系

~5–21MB
~301K SLoC