1个不稳定版本

0.1.0 2024年6月5日

#249 in 异步


用于 rexecutor-sqlx

MIT/Apache

315KB
4.5K SLoC

CI codecov

Rexecutor

基于tokio运行时的Rust健壮作业执行库。

例如,查看postgres示例

设置Rexecutor

要创建Rexecutor实例,您需要有一个Backend的实现。

Rexecutor库仅提供了一个内存实现backend::memory::InMemoryBackend,主要用于测试目的。例如,可以使用实现Backend的单独crate,如rexecutor-sqlx

创建执行器

作业通过创建一个结构体/枚举并为其实现Executor来定义。

示例定义执行器

您可以如下定义和入队一个作业

use rexecutor::prelude::*;
use chrono::{Utc, TimeDelta};
use rexecutor::backend::memory::InMemoryBackend;
use rexecutor::assert_enqueued;
let backend = InMemoryBackend::new().paused();
Rexecutor::new(backend).set_global_backend().unwrap();
struct EmailJob;

#[async_trait::async_trait]
impl Executor for EmailJob {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "email_job";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        println!("{} running, with args: {}", Self::NAME, job.data);
        /// Do something important with an email
        ExecutionResult::Done
    }
}

tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
    let _ = EmailJob::builder()
        .with_data("[email protected]".to_owned())
        .schedule_in(TimeDelta::hours(3))
        .enqueue()
        .await;

    assert_enqueued!(
        with_data: "[email protected]".to_owned(),
        scheduled_after: Utc::now() + TimeDelta::minutes(170),
        scheduled_before: Utc::now() + TimeDelta::minutes(190),
        for_executor: EmailJob
    );
});

唯一的作业

可以通过某些标准确保作业的唯一性。这可以通过Executor::UNIQUENESS_CRITERIA或通过插入作业时使用job::builder::JobBuilder::unique作为Executor的实现的一部分来定义。

例如,为了确保每五分钟只运行一个唯一的作业,可以使用以下唯一性标准。

use rexecutor::prelude::*;
use chrono::{Utc, TimeDelta};
use rexecutor::backend::memory::InMemoryBackend;
use rexecutor::assert_enqueued;
let backend = InMemoryBackend::new().paused();
Rexecutor::new(backend).set_global_backend().unwrap();
struct UniqueJob;

#[async_trait::async_trait]
impl Executor for UniqueJob {
    type Data = ();
    type Metadata = ();
    const NAME: &'static str = "unique_job";
    const MAX_ATTEMPTS: u16 = 1;
    const UNIQUENESS_CRITERIA: Option<UniquenessCriteria<'static>> = Some(
        UniquenessCriteria::by_executor()
            .and_within(TimeDelta::seconds(300))
            .on_conflict(Replace::priority().for_statuses(&JobStatus::ALL)),
    );
    async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        println!("{} running, with args: {:?}", Self::NAME, job.data);
        // Do something important
        ExecutionResult::Done
    }
}

tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
    let _ = UniqueJob::builder().enqueue().await;
    let _ = UniqueJob::builder().enqueue().await;

    // Only one of jobs was enqueued
    assert_enqueued!(
        1 job,
        scheduled_before: Utc::now(),
        for_executor: UniqueJob
    );
});

此外,还可以指定在存在冲突作业时应采取什么行动。在上面的例子中,优先级被覆盖。有关如何使用唯一性的更多详细信息,请参阅job::uniqueness_criteria::UniquenessCriteria

覆盖Executor默认值

在定义一个Executor时,您可以通过Executor::MAX_ATTEMPTS指定最大尝试次数。然而,在插入作业时,可以通过调用job::builder::JobBuilder::with_max_attempts来覆盖此值(如果没有调用,最大尝试次数将等于Executor::MAX_ATTEMPTS)。

类似地,执行器可以通过Executor::UNIQUENESS_CRITERIA定义作业唯一性标准。然而,使用job::builder::JobBuilder::unique可以覆盖特定作业的此值。

设置执行器以运行

对于您想要运行的每个执行器,应该调用Rexecutor::with_executor。明确这一点将打开在集群中特定节点作为某些排队作业的工作节点运行,而其他节点不负责它们执行的可能性。

示例设置执行器

use rexecutor::prelude::*;
use std::str::FromStr;
use chrono::TimeDelta;
use rexecutor::backend::memory::InMemoryBackend;
pub(crate) struct RefreshWorker;
pub(crate) struct EmailScheduler;
pub(crate) struct RegistrationWorker;

#[async_trait::async_trait]
impl Executor for RefreshWorker {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "refresh_worker";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        ExecutionResult::Done
    }
}
#[async_trait::async_trait]
impl Executor for EmailScheduler {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "email_scheduler";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        ExecutionResult::Done
    }
}
#[async_trait::async_trait]
impl Executor for RegistrationWorker {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "registration_worker";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        ExecutionResult::Done
    }
}
tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
    let backend = InMemoryBackend::new();
    Rexecutor::new(backend)
        .with_executor::<RefreshWorker>()
        .with_executor::<EmailScheduler>()
        .with_executor::<RegistrationWorker>();
});

排队作业

通常,作业将通过使用Executor::builder返回的job::builder::JobBuilder进行排队。

在排队作业时,可以指定作业的数据和元数据。此外,可以覆盖Executor的默认值。

覆盖Executor默认值

在定义一个Executor时,您可以通过Executor::MAX_ATTEMPTS指定最大尝试次数。然而,在插入作业时,可以通过调用job::builder::JobBuilder::with_max_attempts来覆盖此值(如果没有调用,最大尝试次数将等于Executor::MAX_ATTEMPTS)。

类似地,执行器可以通过Executor::UNIQUENESS_CRITERIA定义作业唯一性标准。然而,使用job::builder::JobBuilder::unique可以覆盖特定作业的此值。

示例排队一个作业

use rexecutor::prelude::*;
use std::sync::Arc;
use chrono::{Utc, TimeDelta};
use rexecutor::backend::memory::InMemoryBackend;
use rexecutor::assert_enqueued;
pub(crate) struct ExampleExecutor;

#[async_trait::async_trait]
impl Executor for ExampleExecutor {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "simple_executor";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        ExecutionResult::Done
    }
}
tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
    let backend = Arc::new(InMemoryBackend::new().paused());
    Rexecutor::new(backend.clone()).set_global_backend().unwrap();

    ExampleExecutor::builder()
        .with_max_attempts(2)
        .with_tags(vec!["initial_job", "delayed"])
        .with_data("First job".into())
        .schedule_in(TimeDelta::hours(2))
        .enqueue_to_backend(&backend)
        .await
        .unwrap();

    assert_enqueued!(
        to: backend,
        with_data: "First job".to_owned(),
        tagged_with: ["initial_job", "delayed"],
        scheduled_after: Utc::now() + TimeDelta::minutes(110),
        scheduled_before: Utc::now() + TimeDelta::minutes(130),
        for_executor: ExampleExecutor
    );
});

编译时cron作业调度

对于需要按特定时间表运行的作业,这可能很有用。可以使用Rexecutor::with_cron_executorRexecutor::with_cron_executor_for_timezone设置此类作业。后者用于指定作业应调度的特定时区。

示例设置UTC cron作业

要设置每天午夜运行的cron作业,可以使用以下代码。

use rexecutor::prelude::*;
use rexecutor::backend::{Backend, memory::InMemoryBackend};
struct CronJob;
#[async_trait::async_trait]
impl Executor for CronJob {
    type Data = String;
    type Metadata = ();
    const NAME: &'static str = "cron_job";
    const MAX_ATTEMPTS: u16 = 1;
    async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        /// Do something important
        ExecutionResult::Done
    }
}
tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
    let schedule = cron::Schedule::try_from("0 0 0 * * *").unwrap();

    let backend = InMemoryBackend::new();
    Rexecutor::new(backend).with_cron_executor::<CronJob>(schedule, "important data".to_owned());
});

修剪作业

作业完成后、被取消或被丢弃后,能够清理很有用。要设置作业修剪器,应调用Rexecutor::with_job_pruner,并传递给定的PrunerConfig

鉴于作业可以完成的多种方式,通常需要能够对旧作业的清理进行细粒度控制。PrunerConfig允许此类控制。

在构建 PrunerConfig 时,提供一个 cron::Schedule 以指定剪枝器何时运行。

根据系统的负载/吞吐量,剪枝器可以被安排每年运行一次,甚至每小时运行多次。

示例配置作业剪枝器

use rexecutor::prelude::*;
use std::str::FromStr;
use chrono::TimeDelta;
use rexecutor::backend::memory::InMemoryBackend;
pub(crate) struct RefreshWorker;
pub(crate) struct EmailScheduler;
pub(crate) struct RegistrationWorker;

#[async_trait::async_trait]
impl Executor for RefreshWorker {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "refresh_worker";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        ExecutionResult::Done
    }
}
#[async_trait::async_trait]
impl Executor for EmailScheduler {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "email_scheduler";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        ExecutionResult::Done
    }
}
#[async_trait::async_trait]
impl Executor for RegistrationWorker {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "registration_worker";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        ExecutionResult::Done
    }
}
let config = PrunerConfig::new(cron::Schedule::from_str("0 0 * * * *").unwrap())
    .with_max_concurrency(Some(2))
    .with_pruner(
        Pruner::max_age(TimeDelta::days(31), JobStatus::Complete)
            .only::<RefreshWorker>()
            .and::<EmailScheduler>(),
    )
    .with_pruner(
        Pruner::max_length(200, JobStatus::Discarded)
            .except::<RefreshWorker>()
            .and::<EmailScheduler>(),
    );

tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
    let backend = InMemoryBackend::new();
    Rexecutor::new(backend)
        .with_executor::<RefreshWorker>()
        .with_executor::<EmailScheduler>()
        .with_executor::<RegistrationWorker>()
        .with_job_pruner(config);
});

关闭reexecutor

为了避免作业在执行中途被杀掉,使用优雅关闭非常重要。这可以通过使用 Rexecutor::graceful_shutdown 明确调用,或者通过使用通过 Rexecutor::drop_guard 获得的 DropGuard 来实现。

使用 Rexecutor::graceful_shutdownRexecutor::drop_guard 可以确保在关闭 reexecutor 之前,所有正在执行的任务都有足够的时间完成。

使用 DropGuard 的示例

use rexecutor::prelude::*;
use std::str::FromStr;
use chrono::TimeDelta;
use rexecutor::backend::memory::InMemoryBackend;
pub(crate) struct RefreshWorker;
pub(crate) struct EmailScheduler;
pub(crate) struct RegistrationWorker;

#[async_trait::async_trait]
impl Executor for RefreshWorker {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "refresh_worker";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        ExecutionResult::Done
    }
}
#[async_trait::async_trait]
impl Executor for EmailScheduler {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "email_scheduler";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        ExecutionResult::Done
    }
}
#[async_trait::async_trait]
impl Executor for RegistrationWorker {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "registration_worker";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        ExecutionResult::Done
    }
}
tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
    let backend = InMemoryBackend::new();
    // Note this must be given a name to ensure it is dropped at the end of the scope.
    // See https://doc.rust-lang.net.cn/book/ch18-03-pattern-syntax.html#ignoring-an-unused-variable-by-starting-its-name-with-_
    let _guard = Rexecutor::new(backend)
        .with_executor::<RefreshWorker>()
        .with_executor::<EmailScheduler>()
        .with_executor::<RegistrationWorker>()
        .drop_guard();
});

全局后端

Rexecutor 可以使用全局后端运行。这允许使用方便的 job::builder::JobBuilder::enqueue 方法,该方法不需要将后端引用传递到需要入队作业的代码中。

可以使用 Rexecutor::set_global_backend 设置全局后端,这应该只调用一次,否则将返回错误。

事实上,对于单个 Rexecutor 实例,不可能调用两次,以下代码片段将无法编译

use rexecutor::prelude::*;
let backend = rexecutor::backend::memory::InMemoryBackend::new();
Rexecutor::new(backend).set_global_backend().set_global_backend();

注意,使用全局后端具有许多全局变量的缺点,尤其是它可以使单元测试更加困难。

行为准则

我们遵循 Rust 行为准则

目前,管理团队由 John Bell 一个人组成。我们欢迎更多成员加入:如果您想加入管理团队,请联系 John Bell。

许可证

该项目采用 MIT 许可证

依赖关系

~6–14MB
~158K SLoC