#worker #tokio #sidekiq #ruby #server-client #connection-pool

rusty-sidekiq

使用 tokio 实现的 rust sidekiq 服务器和客户端

23 个不稳定版本 (9 个破坏性更改)

0.10.5 2024年4月26日
0.10.0 2024年3月22日
0.8.2 2023年11月22日
0.7.3 2023年2月12日
0.3.0 2022年3月31日

#136数据库接口

Download history 2546/week @ 2024-05-04 2584/week @ 2024-05-11 2333/week @ 2024-05-18 2998/week @ 2024-05-25 3633/week @ 2024-06-01 3483/week @ 2024-06-08 4376/week @ 2024-06-15 4442/week @ 2024-06-22 4582/week @ 2024-06-29 4881/week @ 2024-07-06 4798/week @ 2024-07-13 4575/week @ 2024-07-20 4912/week @ 2024-07-27 5288/week @ 2024-08-03 5333/week @ 2024-08-10 4656/week @ 2024-08-17

20,844 每月下载量
6 个 Crates 中使用 (直接使用 3 个)

MIT 许可证

73KB
1.5K SLoC

Sidekiq.rs (又名 rusty-sidekiq)

crates.io MIT licensed Documentation

这是用 Rust 重新实现的 sidekiq。它可以与 sidekiq.rb 兼容,用于提交和处理作业。显然,sidekiq.rb 比这个仓库更成熟,但我希望您会喜欢使用它。这个库是用 tokio 构建的,因此默认是异步的。

工作进程

这个库使用 serde 来将工作进程参数强制为所需的强类型。下面是一个具有强类型参数的工作进程示例。它还有一个自定义选项,将在提交作业时使用。这些选项可以在入队时覆盖,这使得更改队列名称变得很容易,例如,如果您需要这样做。

use tracing::info;
use sidekiq::Result;

#[derive(Clone)]
struct PaymentReportWorker {}

impl PaymentReportWorker {
    fn new() -> Self {
        Self { }
    }

    async fn send_report(&self, user_guid: String) -> Result<()> {
        // TODO: Some actual work goes here...
        info!({"user_guid" = user_guid}, "Sending payment report to user");

        Ok(())
    }
}

#[derive(Deserialize, Debug, Serialize)]
struct PaymentReportArgs {
    user_guid: String,
}

#[async_trait]
impl Worker<PaymentReportArgs> for PaymentReportWorker {
    // Default worker options
    fn opts() -> sidekiq::WorkerOpts<Self> {
        sidekiq::WorkerOpts::new().queue("yolo")
    }

    // Worker implementation
    async fn perform(&self, args: PaymentReportArgs) -> Result<()> {
        self.send_report(args.user_guid).await
    }
}

创建作业

有几种插入作业的方法,但为了这个示例,我们将保持简单。给定一些工作进程,使用强类型参数进行插入。

PaymentReportWorker::perform_async(
    &mut redis,
    PaymentReportArgs {
        user_guid: "USR-123".into(),
    },
)
.await?;

您可以在入队时进行自定义覆盖。

PaymentReportWorker::opts()
    .queue("brolo")
    .perform_async(
        &mut redis,
        PaymentReportArgs {
            user_guid: "USR-123".into(),
        },
    )
    .await?;

或者,您可以使用 crate 级别的方法来获得更多控制。

sidekiq::perform_async(
    &mut redis,
    "PaymentReportWorker".into(),
    "yolo".into(),
    PaymentReportArgs {
        user_guid: "USR-123".to_string(),
    },
)
.await?;

examples/demo.rs 中查看更多示例。

唯一作业

唯一作业是通过 unique_for 选项支持的,该选项可以在工作进程中默认定义,也可以通过 SomeWorker::opts().unique_for(duration) 定义。请参阅 examples/unique.rs 示例,仅入队唯一作业,通过 (worker_name, queue_name, sha256_hash_of_job_args) 为某些定义的 ttl。注意:这是在底层使用 SET key value NX EX duration 作为“足够好”的作业锁。

启动服务器

以下是如何创建 Processor、注册工作进程、包含任何自定义中间件以及启动服务器的示例。

// Redis
let manager = sidekiq::RedisConnectionManager::new("redis://127.0.0.1/").unwrap();
let mut redis = bb8::Pool::builder().build(manager).await.unwrap();

// Sidekiq server
let mut p = Processor::new(
    redis,
    vec!["yolo".to_string(), "brolo".to_string()],
);

// Add known workers
p.register(PaymentReportWorker::new());

// Custom Middlewares
p.using(FilterExpiredUsersMiddleware::new())
    .await;

// Start the server
p.run().await;

周期性作业

系统默认支持周期性cron任务。您只需要指定一个有效的cron字符串和一个工作实例。您可以可选地提供参数、队列、重试标志以及当工作被提交时将记录的名称。

示例

// Clear out all periodic jobs and their schedules
periodic::destroy_all(redis).await?;

// Add a new periodic job
periodic::builder("0 0 8 * * *")?
    .name("Email clients with an oustanding balance daily at 8am UTC")
    .queue("reminders")
    .args(EmailReminderArgs {
        report_type: "outstanding_balance",
    })?
    .register(&mut p, EmailReminderWorker)
    .await?;

周期性任务不会自动删除。如果您的项目添加了周期性任务,然后后来又删除了periodic::builder调用,周期性任务仍然存在于redis中。您可以在程序开始时调用periodic::destroy_all(redis).await?以确保只执行由最新版本的程序添加的周期性任务。

实现依赖于redis中的有序集合。它存储周期性任务的json有效负载,其分数等于cron字符串的下一个计划的UTC时间。所有进程将定期轮询更改,并原子性地更新cron字符串的新下一个计划的UTC时间。成功原子性地更改分数的工作进程将入队一个新任务。未能成功更新分数的进程将继续。这个实现细节意味着周期性任务永远不会离开redis。另一个细节是,当解码并重新编码json时,可能不会产生与原始字符串相同的值。例如:{"a":"b","c":"d"}可能变为{"c":"d","a":b"}。为了保持json表示的一致性,在redis中更新周期性任务及其新分数时,将再次使用原始json字符串以保持一致性。

服务器中间件

sidekiq的一个伟大功能是其中间件模式。此库在rust中重新实现了sidekiq服务器中间件模式。以下示例假设您有一个仅针对付费客户执行工作的应用程序。下面的中间件将阻止已过期的客户的作业执行。关于实现的一个有趣的地方是我们可以依赖serde来条件性地检查工作进程的类型。例如,假设我只关心以用户为中心的工作进程,并且我通过其user_guid作为参数来识别这些工作进程。使用serde可以轻松验证您的参数。

use tracing::info;

struct FilterExpiredUsersMiddleware {}

impl FilterExpiredUsersMiddleware {
    fn new() -> Self {
        Self { }
    }
}

#[derive(Deserialize)]
struct FiltereExpiredUsersArgs {
    user_guid: String,
}

impl FiltereExpiredUsersArgs {
    fn is_expired(&self) -> bool {
        self.user_guid == "USR-123-EXPIRED"
    }
}

#[async_trait]
impl ServerMiddleware for FilterExpiredUsersMiddleware {
    async fn call(
        &self,
        chain: ChainIter,
        job: &Job,
        worker: Arc<WorkerRef>,
        redis: RedisPool,
    ) -> ServerResult {
        // Use serde to check if a user_guid is part of the job args.
        let args: Result<(FiltereExpiredUsersArgs,), serde_json::Error> =
            serde_json::from_value(job.args.clone());

        // If we can safely deserialize then attempt to filter based on user guid.
        if let Ok((filter,)) = args {
            if filter.is_expired() {
                error!({
                    "class" = job.class,
                    "jid" = job.jid,
                    "user_guid" = filter.user_guid },
                    "Detected an expired user, skipping this job"
                );
                return Ok(());
            }
        }

        // This customer is not expired, so we may continue.
        chain.next(job, worker, redis).await
    }
}

最佳实践

分离入队与获取连接池

虽然不是必需的,但建议使用单独的Redis连接池来将作业推送到Redis与获取作业。这有以下好处

  • 这些池可以有不同的大小,每个池都根据应用程序的资源使用/限制进行了优化。
  • 如果将sidekiq::Processor配置为拥有比连接池最大尺寸更多的工作任务,那么从队列中获取连接可能会有延迟。这对入队作业来说是个问题,因为通常希望入队尽可能快,以避免延迟其他操作的临界路径(例如,API请求)。使用单独的池进行入队,入队作业就不会受sidekiq::Processor对池的使用影响。
#[tokio::main]
async fn main() -> Result<()> {
    let manager = sidekiq::RedisConnectionManager::new("redis://127.0.0.1/").unwrap();
    let redis_enqueue = bb8::Pool::builder().build(manager).await.unwrap();
    let redis_fetch = bb8::Pool::builder().build(manager).await.unwrap();

    let p = Processor::new(
        redis_fetch,
        vec!["default".to_string()],
    );
    p.run().await;

    // ...

    ExampleWorker::perform_async(&redis_enqueue, ExampleArgs { foo: "bar".to_string() }).await?;

    Ok(())
}

自定义详情

为工作进程命名空间

使用redis-namespace gem与ruby sidekiq工作进程一起使用仍然非常常见。这个库通过在构建连接池时使用连接自定义器来支持对redis命令进行命名空间。

let manager = sidekiq::RedisConnectionManager::new("redis://127.0.0.1/")?;
let redis = bb8::Pool::builder()
    .connection_customizer(sidekiq::with_custom_namespace("my_cool_app".to_string()))
    .build(manager)
    .await?;

现在,这个库使用的所有命令都将使用前缀my_cool_app:,例如:ZDEL my_cool_app:scheduled {...}

将数据库连接传递给工作进程

工作进程经常需要访问其他软件组件,如数据库连接、http客户端等。只要它们实现了Clone,就可以在您的worker结构体上定义这些组件。示例

use tracing::debug;
use sidekiq::Result;

#[derive(Clone)]
struct ExampleWorker {
    redis: RedisPool,
}


#[async_trait]
impl Worker<()> for ExampleWorker {
    async fn perform(&self, args: PaymentReportArgs) -> Result<()> {
        use redis::AsyncCommands;

        // And then they are available here...
        let times_called: usize = self
            .redis
            .get()
            .await?
            .unnamespaced_borrow_mut()
            .incr("example_of_accessing_the_raw_redis_connection", 1)
            .await?;

        debug!({"times_called" = times_called}, "Called this worker");
    }
}

#[tokio::main]
async fn main() -> Result<()> {
// ...
    let mut p = Processor::new(
        redis.clone(),
        vec!["low_priority".to_string()],
    );

    p.register(ExampleWorker{ redis: redis.clone() });
}

为嵌套ruby模块下的工作进程自定义工作进程名称

您可能会发现模块下的工作进程与模块下的ruby工作进程不匹配。嵌套的rusty-sidekiq工作进程workers::MyWorker在注册为某些"类名"时,只会保留最终的类型名MyWorker。这意味着,如果使用类Workers::MyWorker将ruby工作进程入队,workers::MyWorker类型将不会处理该项工作。这是因为默认情况下,类名是在编译时基于工作进程结构体名称生成的。要覆盖此行为,可以重新定义默认特质方法之一

pub struct MyWorker;
use sidekiq::Result;

#[async_trait]
impl Worker<()> for MyWorker {
    async fn perform(&self, _args: ()) -> Result<()> {
        Ok(())
    }

    fn class_name() -> String
    where
        Self: Sized,
    {
        "Workers::MyWorker".to_string()
    }
}

现在,当ruby将Workers::MyWorker作业入队时,它将被rust-sidekiq处理。

自定义由sidekiq::Processor产生的worker任务数量

如果一个应用的工作负载主要是IO密集型(查询数据库、发送网络请求并等待响应等),其工作进程将花费大量时间空闲await等待未来完成。这反过来意味着CPU会大量闲置(如果主机上没有其他任务运行),导致未充分利用可用的CPU资源。

默认情况下,sidekiq::Processor产生的worker任务数量是主机的CPU数量,但可以根据应用程序的需求进行配置,从而更有效地使用CPU资源。

#[tokio::main]
async fn main() -> Result<()> {
    // ...
    let num_workers = usize::from_str(&env::var("NUM_WORKERS").unwrap()).unwrap();
    let config: ProcessorConfig = Default::default();
    let config = config.num_workers(num_workers);
    let processor = Processor::new(redis_fetch, queues.clone())
        .with_config(config);
    // ...
}

许可证

MIT

依赖项

~14–26MB
~396K SLoC