43个版本
0.11.0 | 2024年8月9日 |
---|---|
0.10.2 | 2024年4月26日 |
0.10.0 | 2024年1月29日 |
0.9.4 | 2023年2月13日 |
0.2.1 | 2021年3月25日 |
#8 in 日期和时间
48,089 每月下载量
用于 24 个crate(直接使用18个)
240KB
5.5K SLoC
tokio-cron-scheduler
在异步tokio环境中使用cron-like调度。还可以立即安排任务或以固定的时间间隔重复执行。任务数据可以选择使用PostgreSQL或Nats进行持久化。
灵感来源于 https://github.com/lholden/job_scheduler
用法
有关更多详细信息,请参阅 文档。
请确保将job_scheduler crate添加到您的 Cargo.toml
[dependencies]
tokio-cron-scheduler = "*"
使用cron库中Schedule
类型的FromStr impl创建一个工作安排。
调度格式如下
sec min hour day of month month day of week year
* * * * * * *
时间指定为 UTC
而不是您的本地时区。注意,年份可以省略。如果您想使用您的时区,请在工作创建调用中附加 _tz
(例如 Job::new_async 与 Job::new_async_tz)。
如5,8,10
这样的逗号分隔值代表多个时间值。例如,一个调度为0 2,14,26 * * * *
将在每小时的第2分钟、第14分钟和第26分钟执行。
范围可以用短横线指定。以下计划 0 0 * 5-10 * *
每小时执行一次,但仅在月份的第5天到第10天。
星期几可以用缩写或全名指定。以下计划 0 0 6 * * Sun,Sat
在星期天和星期六早上6点执行。
对于每个作业,你可以在作业开始、停止和删除时收到通知。因为这些通知是使用 tokio::spawn 定时执行的,如果任务很快完成,则这些通知的顺序没有保证。
简单用法示例
use std::time::Duration;
use tokio_cron_scheduler::{Job, JobScheduler, JobSchedulerError};
#[tokio::main]
async fn main() -> Result<(), JobSchedulerError> {
let mut sched = JobScheduler::new().await?;
// Add basic cron job
sched.add(
Job::new("1/10 * * * * *", |_uuid, _l| {
println!("I run every 10 seconds");
})?
).await?;
// Add async job
sched.add(
Job::new_async("1/7 * * * * *", |uuid, mut l| {
Box::pin(async move {
println!("I run async every 7 seconds");
// Query the next execution time for this job
let next_tick = l.next_tick_for_job(uuid).await;
match next_tick {
Ok(Some(ts)) => println!("Next time for 7s job is {:?}", ts),
_ => println!("Could not get next tick for 7s job"),
}
})
})?
).await?;
// Add one-shot job with given duration
sched.add(
Job::new_one_shot(Duration::from_secs(18), |_uuid, _l| {
println!("I only run once");
})?
).await?;
// Create repeated job with given duration, make it mutable to edit it afterwards
let mut jj = Job::new_repeated(Duration::from_secs(8), |_uuid, _l| {
println!("I run repeatedly every 8 seconds");
})?;
// Add actions to be executed when the jobs starts/stop etc.
jj.on_start_notification_add(&sched, Box::new(|job_id, notification_id, type_of_notification| {
Box::pin(async move {
println!("Job {:?} was started, notification {:?} ran ({:?})", job_id, notification_id, type_of_notification);
})
})).await?;
jj.on_stop_notification_add(&sched, Box::new(|job_id, notification_id, type_of_notification| {
Box::pin(async move {
println!("Job {:?} was completed, notification {:?} ran ({:?})", job_id, notification_id, type_of_notification);
})
})).await?;
jj.on_removed_notification_add(&sched, Box::new(|job_id, notification_id, type_of_notification| {
Box::pin(async move {
println!("Job {:?} was removed, notification {:?} ran ({:?})", job_id, notification_id, type_of_notification);
})
})).await?;
sched.add(jj).await?;
// Feature 'signal' must be enabled
sched.shutdown_on_ctrl_c();
// Add code to be run during/after shutdown
sched.set_shutdown_handler(Box::new(|| {
Box::pin(async move {
println!("Shut down done");
})
}));
// Start the scheduler
sched.start().await?;
// Wait while the jobs run
tokio::time::sleep(Duration::from_secs(100)).await;
Ok(())
}
时区变化
你可以使用 JobBuilder
API 使用特定时区创建作业。chrono-tz 不会被包含在依赖中,所以如果你想要轻松创建 Timezone
结构体,你需要将其添加到你的 Cargo.toml 中。
let job = JobBuilder::new()
.with_timezone(chrono_tz::Africa::Johannesburg)
.with_cron_job_type()
.with_schedule("*/2 * * * * *")
.unwrap()
.with_run_async(Box::new( | uuid, mut l| {
Box::pin(async move {
info ! ("JHB run async every 2 seconds id {:?}", uuid);
let next_tick = l.next_tick_for_job(uuid).await;
match next_tick {
Ok(Some(ts)) => info !("Next time for JHB 2s is {:?}", ts),
_ => warn !("Could not get next tick for 2s job"),
}
})
}))
.build()
.unwrap();
类似库
- job_scheduler 启发这个库的crate
- cron 我们使用的cron表达式解析器。
- schedule-rs 是一个类似的rust库,实现了它自己的cron表达式解析器。
许可证
TokioCronScheduler 根据以下之一进行许可
- Apache License, Version 2.0, (LICENSE-APACHE 或 http://www.apache.org/licenses/LICENSE-2.0)
- MIT license (LICENSE-MIT 或 http://opensource.org/licenses/MIT)
自定义存储
MetadataStore 和 NotificationStore 特性可以实施并在 JobScheduler 中使用。
SimpleMetadataStore 和 SimpleNotificationStore 提供了一个基于 volatile hashmap 的默认版本。NatsMetadataStore 和 NatsNotificationStore 提供了一个使用 Nats 的持久版本。
贡献
除非你明确声明,否则根据 Apache-2.0 许可证定义,任何有意提交以包含在你提交的工作中的贡献,都应以上述双重许可,不附加任何额外条款或条件。
有关更多信息,请参阅 CONTRIBUTING 文件。
特性
has_bytes
自 0.7以来
启用 Prost 生成的数据结构由需要获取数据结构字节的存储使用。Nats 和 Postgres 存储依赖于此功能被启用。
postgres_storage
自 0.6以来
添加了 Postgres 元数据存储和通知存储(PostgresMetadataStore,PostgresNotificationStore)。使用 Postgres 数据库来存储元数据和通知数据。
postgres_native_tls
自 0.6以来
使用 postgres-native-tls crate 作为 PostgreSQL 连接的 TLS 提供者。
postgres_openssl
自 0.6以来
使用 postgres-openssl crate 作为 PostgreSQL 连接的 TLS 提供者。
nats_storage
自 0.6以来
添加了 Nats 元数据存储和通知存储(NatsMetadataStore,NatsNotificationStore)。使用 Nats 系统作为存储元数据和通知的方式。
查看 Nats 文档
signal
自 0.5以来
向调度器添加了 shutdown_on_signal
和 shutdown_on_ctrl_c
。在接收到信号时,两者都会关闭系统(停止调度器并删除所有任务)。
由于它利用了Tokio的信号处理,因此这仅在Unix系统上可用。
编写测试
在进行tokio::test时,请记住要在多线程上下文中运行它,否则测试将挂起在 scheduler.add()
。
例如
#[cfg(test)]
mod test {
use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::{info, Level};
use tracing_subscriber::FmtSubscriber;
// Needs multi_thread to test, otherwise it hangs on scheduler.add()
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// #[tokio::test]
async fn test_schedule() {
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::TRACE)
.finish();
tracing::subscriber::set_global_default(subscriber)
.expect("Setting default subscriber failed");
info!("Create scheduler");
let scheduler = JobScheduler::new().await.unwrap();
info!("Add job");
scheduler
.add(
Job::new_async("*/1 * * * * *", |_, _| {
Box::pin(async {
info!("Run every seconds");
})
})
.unwrap(),
)
.await
.expect("Should be able to add a job");
scheduler.start().await.unwrap();
tokio::time::sleep(core::time::Duration::from_secs(20)).await;
}
}
示例
简单
运行基于内存的基于hashmap的存储
cargo run --example simple --features="tracing-subscriber"
postgres
首先需要一个运行的PostgreSQL实例
docker run --rm -it -p 5432:5432 -e POSTGRES_USER="postgres" -e POSTGRES_PASSWORD="" -e POSTGRES_HOST_AUTH_METHOD="trust" postgres:14.1
然后运行示例
POSTGRES_INIT_METADATA=true POSTGRES_INIT_NOTIFICATIONS=true cargo run --example postgres --features="postgres_storage tracing-subscriber"
nats
首先需要一个启用Jetstream的运行的Nats实例
docker run --rm -it -p 4222:4222 -p 6222:6222 -p 7222:7222 -p 8222:8222 nats -js -DV
然后运行示例
cargo run --example nats --features="nats_storage tracing-subscriber"
设计
作业活动
创建作业
创建通知
删除作业
删除通知
依赖关系
~5–21MB
~301K SLoC