1 个不稳定版本
0.1.0 | 2024年6月5日 |
---|
#909 in 异步
375KB
6K SLoC
Rexecutor
基于tokio运行时构建的 Rust 的健壮作业执行库。
例如,查看 PostgreSQL 示例。
设置 Rexecutor
要创建 Rexecutor
的实例,你需要有一个 Backend
的实现。
rexecutor 库仅提供了一个内存实现 backend::memory::InMemoryBackend
,主要用于测试目的。相反,应使用实现 Backend 的单独 crate,例如 rexecutor-sqlx
创建执行器
作业通过创建一个 struct/enum 并为其实现 Executor
来定义。
示例定义执行器
您可以如下定义和入队一个作业
use rexecutor::prelude::*;
use chrono::{Utc, TimeDelta};
use rexecutor::backend::memory::InMemoryBackend;
use rexecutor::assert_enqueued;
let backend = InMemoryBackend::new().paused();
Rexecutor::new(backend).set_global_backend().unwrap();
struct EmailJob;
#[async_trait::async_trait]
impl Executor for EmailJob {
type Data = String;
type Metadata = String;
const NAME: &'static str = "email_job";
const MAX_ATTEMPTS: u16 = 2;
async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
println!("{} running, with args: {}", Self::NAME, job.data);
/// Do something important with an email
ExecutionResult::Done
}
}
tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
let _ = EmailJob::builder()
.with_data("[email protected]".to_owned())
.schedule_in(TimeDelta::hours(3))
.enqueue()
.await;
assert_enqueued!(
with_data: "[email protected]".to_owned(),
scheduled_after: Utc::now() + TimeDelta::minutes(170),
scheduled_before: Utc::now() + TimeDelta::minutes(190),
for_executor: EmailJob
);
});
唯一的作业
可以通过某些标准确保作业的唯一性。这可以作为 Executor
实现的一部分定义,通过 Executor::UNIQUENESS_CRITERIA
或通过插入作业时使用 job::builder::JobBuilder::unique
来定义。
例如,为了确保每五分钟只运行一个唯一的作业,可以使用以下唯一性标准。
use rexecutor::prelude::*;
use chrono::{Utc, TimeDelta};
use rexecutor::backend::memory::InMemoryBackend;
use rexecutor::assert_enqueued;
let backend = InMemoryBackend::new().paused();
Rexecutor::new(backend).set_global_backend().unwrap();
struct UniqueJob;
#[async_trait::async_trait]
impl Executor for UniqueJob {
type Data = ();
type Metadata = ();
const NAME: &'static str = "unique_job";
const MAX_ATTEMPTS: u16 = 1;
const UNIQUENESS_CRITERIA: Option<UniquenessCriteria<'static>> = Some(
UniquenessCriteria::by_executor()
.and_within(TimeDelta::seconds(300))
.on_conflict(Replace::priority().for_statuses(&JobStatus::ALL)),
);
async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
println!("{} running, with args: {:?}", Self::NAME, job.data);
// Do something important
ExecutionResult::Done
}
}
tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
let _ = UniqueJob::builder().enqueue().await;
let _ = UniqueJob::builder().enqueue().await;
// Only one of jobs was enqueued
assert_enqueued!(
1 job,
scheduled_before: Utc::now(),
for_executor: UniqueJob
);
});
此外,还可以指定在存在冲突作业时应采取的操作。在上面的示例中,优先级被覆盖。有关如何使用唯一性的更多详细信息,请参阅 job::uniqueness_criteria::UniquenessCriteria
。
覆盖 Executor
默认值
在定义一个Executor
时,您可以通过Executor::MAX_ATTEMPTS
指定最大尝试次数。然而,在插入一个作业时,可以通过调用job::builder::JobBuilder::with_max_attempts
来覆盖此值(如果没有调用,最大尝试次数将与Executor::MAX_ATTEMPTS
相等)。
同样,执行器可以通过Executor::UNIQUENESS_CRITERIA
定义作业的唯一性标准。然而,使用job::builder::JobBuilder::unique
可以为特定作业覆盖此值。
设置执行器以运行
对于您想要运行的每个执行器,应调用Rexecutor::with_executor
。明确这一点可以打开集群中特定节点作为某些已排队作业的工作节点的可能性,而其他节点则不负责执行。
设置执行器的示例
use rexecutor::prelude::*;
use std::str::FromStr;
use chrono::TimeDelta;
use rexecutor::backend::memory::InMemoryBackend;
pub(crate) struct RefreshWorker;
pub(crate) struct EmailScheduler;
pub(crate) struct RegistrationWorker;
#[async_trait::async_trait]
impl Executor for RefreshWorker {
type Data = String;
type Metadata = String;
const NAME: &'static str = "refresh_worker";
const MAX_ATTEMPTS: u16 = 2;
async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
ExecutionResult::Done
}
}
#[async_trait::async_trait]
impl Executor for EmailScheduler {
type Data = String;
type Metadata = String;
const NAME: &'static str = "email_scheduler";
const MAX_ATTEMPTS: u16 = 2;
async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
ExecutionResult::Done
}
}
#[async_trait::async_trait]
impl Executor for RegistrationWorker {
type Data = String;
type Metadata = String;
const NAME: &'static str = "registration_worker";
const MAX_ATTEMPTS: u16 = 2;
async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
ExecutionResult::Done
}
}
tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
let backend = InMemoryBackend::new();
Rexecutor::new(backend)
.with_executor::<RefreshWorker>()
.with_executor::<EmailScheduler>()
.with_executor::<RegistrationWorker>();
});
排队作业
通常,作业将通过使用Executor::builder
返回的job::builder::JobBuilder
来排队。
在排队作业时,可以指定作业的数据和元数据。此外,还可以覆盖Executor
的默认值。
覆盖 Executor
默认值
在定义一个Executor
时,您可以通过Executor::MAX_ATTEMPTS
指定最大尝试次数。然而,在插入一个作业时,可以通过调用job::builder::JobBuilder::with_max_attempts
来覆盖此值(如果没有调用,最大尝试次数将与Executor::MAX_ATTEMPTS
相等)。
同样,执行器可以通过Executor::UNIQUENESS_CRITERIA
定义作业的唯一性标准。然而,使用job::builder::JobBuilder::unique
可以为特定作业覆盖此值。
排队作业的示例
use rexecutor::prelude::*;
use std::sync::Arc;
use chrono::{Utc, TimeDelta};
use rexecutor::backend::memory::InMemoryBackend;
use rexecutor::assert_enqueued;
pub(crate) struct ExampleExecutor;
#[async_trait::async_trait]
impl Executor for ExampleExecutor {
type Data = String;
type Metadata = String;
const NAME: &'static str = "simple_executor";
const MAX_ATTEMPTS: u16 = 2;
async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
ExecutionResult::Done
}
}
tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
let backend = Arc::new(InMemoryBackend::new().paused());
Rexecutor::new(backend.clone()).set_global_backend().unwrap();
ExampleExecutor::builder()
.with_max_attempts(2)
.with_tags(vec!["initial_job", "delayed"])
.with_data("First job".into())
.schedule_in(TimeDelta::hours(2))
.enqueue_to_backend(&backend)
.await
.unwrap();
assert_enqueued!(
to: backend,
with_data: "First job".to_owned(),
tagged_with: ["initial_job", "delayed"],
scheduled_after: Utc::now() + TimeDelta::minutes(110),
scheduled_before: Utc::now() + TimeDelta::minutes(130),
for_executor: ExampleExecutor
);
});
编译时cron作业调度
拥有在给定计划上运行的作业可能很有用。可以使用Rexecutor::with_cron_executor
或Rexecutor::with_cron_executor_for_timezone
设置此类作业。后者用于指定作业应调度的特定时区。
设置UTC cron作业的示例
要设置每天午夜运行的cron作业,可以使用以下代码。
use rexecutor::prelude::*;
use rexecutor::backend::{Backend, memory::InMemoryBackend};
struct CronJob;
#[async_trait::async_trait]
impl Executor for CronJob {
type Data = String;
type Metadata = ();
const NAME: &'static str = "cron_job";
const MAX_ATTEMPTS: u16 = 1;
async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
/// Do something important
ExecutionResult::Done
}
}
tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
let schedule = cron::Schedule::try_from("0 0 0 * * *").unwrap();
let backend = InMemoryBackend::new();
Rexecutor::new(backend).with_cron_executor::<CronJob>(schedule, "important data".to_owned());
});
修剪作业
作业完成后、已取消或已丢弃后,能够清理是很有用的。要设置作业修剪器,应调用Rexecutor::with_job_pruner
,并传入给定的PrunerConfig
。
鉴于作业可以完成的多种方式,通常很有用,可以精细控制旧作业应如何清理。PrunerConfig
提供了这种控制。
在构建PrunerConfig
时,提供一个cron::Schedule
以指定修剪器应运行的时间。
根据系统的负载/吞吐量,修剪器可以计划从每年一次到每小时多次运行。
配置作业修剪器的示例
use rexecutor::prelude::*;
use std::str::FromStr;
use chrono::TimeDelta;
use rexecutor::backend::memory::InMemoryBackend;
pub(crate) struct RefreshWorker;
pub(crate) struct EmailScheduler;
pub(crate) struct RegistrationWorker;
#[async_trait::async_trait]
impl Executor for RefreshWorker {
type Data = String;
type Metadata = String;
const NAME: &'static str = "refresh_worker";
const MAX_ATTEMPTS: u16 = 2;
async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
ExecutionResult::Done
}
}
#[async_trait::async_trait]
impl Executor for EmailScheduler {
type Data = String;
type Metadata = String;
const NAME: &'static str = "email_scheduler";
const MAX_ATTEMPTS: u16 = 2;
async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
ExecutionResult::Done
}
}
#[async_trait::async_trait]
impl Executor for RegistrationWorker {
type Data = String;
type Metadata = String;
const NAME: &'static str = "registration_worker";
const MAX_ATTEMPTS: u16 = 2;
async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
ExecutionResult::Done
}
}
let config = PrunerConfig::new(cron::Schedule::from_str("0 0 * * * *").unwrap())
.with_max_concurrency(Some(2))
.with_pruner(
Pruner::max_age(TimeDelta::days(31), JobStatus::Complete)
.only::<RefreshWorker>()
.and::<EmailScheduler>(),
)
.with_pruner(
Pruner::max_length(200, JobStatus::Discarded)
.except::<RefreshWorker>()
.and::<EmailScheduler>(),
);
tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
let backend = InMemoryBackend::new();
Rexecutor::new(backend)
.with_executor::<RefreshWorker>()
.with_executor::<EmailScheduler>()
.with_executor::<RegistrationWorker>()
.with_job_pruner(config);
});
关闭reexecutor
为了避免作业在执行过程中被杀死,使用优雅的关闭方式非常重要。这可以通过显式调用 Rexecutor::graceful_shutdown
或通过使用 DropGuard
(通过 Rexecutor::drop_guard
获取)来实现。
使用 Rexecutor::graceful_shutdown
或 Rexecutor::drop_guard
将确保所有正在执行的作业在关闭 reexecutor 之前都有足够的时间完成。
使用 DropGuard
的示例
use rexecutor::prelude::*;
use std::str::FromStr;
use chrono::TimeDelta;
use rexecutor::backend::memory::InMemoryBackend;
pub(crate) struct RefreshWorker;
pub(crate) struct EmailScheduler;
pub(crate) struct RegistrationWorker;
#[async_trait::async_trait]
impl Executor for RefreshWorker {
type Data = String;
type Metadata = String;
const NAME: &'static str = "refresh_worker";
const MAX_ATTEMPTS: u16 = 2;
async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
ExecutionResult::Done
}
}
#[async_trait::async_trait]
impl Executor for EmailScheduler {
type Data = String;
type Metadata = String;
const NAME: &'static str = "email_scheduler";
const MAX_ATTEMPTS: u16 = 2;
async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
ExecutionResult::Done
}
}
#[async_trait::async_trait]
impl Executor for RegistrationWorker {
type Data = String;
type Metadata = String;
const NAME: &'static str = "registration_worker";
const MAX_ATTEMPTS: u16 = 2;
async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
ExecutionResult::Done
}
}
tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
let backend = InMemoryBackend::new();
// Note this must be given a name to ensure it is dropped at the end of the scope.
// See https://doc.rust-lang.net.cn/book/ch18-03-pattern-syntax.html#ignoring-an-unused-variable-by-starting-its-name-with-_
let _guard = Rexecutor::new(backend)
.with_executor::<RefreshWorker>()
.with_executor::<EmailScheduler>()
.with_executor::<RegistrationWorker>()
.drop_guard();
});
全局后端
Rexecutor 可以使用全局后端运行。这允许使用方便的 job::builder::JobBuilder::enqueue
方法,该方法不需要将后端引用传递给需要入队作业的代码。
可以使用 Rexecutor::set_global_backend
设置全局后端,这应该只调用一次,否则将返回错误。
实际上,对于单个 Rexecutor
实例,不可能调用两次,以下代码片段将无法编译
use rexecutor::prelude::*;
let backend = rexecutor::backend::memory::InMemoryBackend::new();
Rexecutor::new(backend).set_global_backend().set_global_backend();
注意,使用全局后端有许多与任何全局变量相同的缺点,特别是它可能会使单元测试更加困难。
行为准则
我们遵循 Rust 代码准则。
目前,管理团队仅由 John Bell 组成。我们欢迎更多成员加入:如果您想加入管理团队,请联系 John Bell。
许可
该项目采用 MIT 许可证。
依赖项
~37–50MB
~874K SLoC