23 个不稳定版本 (9 个破坏性更改)
0.10.5 | 2024年4月26日 |
---|---|
0.10.0 | 2024年3月22日 |
0.8.2 | 2023年11月22日 |
0.7.3 | 2023年2月12日 |
0.3.0 | 2022年3月31日 |
#136 在 数据库接口 中
20,844 每月下载量
在 6 个 Crates 中使用 (直接使用 3 个)
73KB
1.5K SLoC
Sidekiq.rs (又名 rusty-sidekiq
)
这是用 Rust 重新实现的 sidekiq。它可以与 sidekiq.rb 兼容,用于提交和处理作业。显然,sidekiq.rb 比这个仓库更成熟,但我希望您会喜欢使用它。这个库是用 tokio 构建的,因此默认是异步的。
工作进程
这个库使用 serde 来将工作进程参数强制为所需的强类型。下面是一个具有强类型参数的工作进程示例。它还有一个自定义选项,将在提交作业时使用。这些选项可以在入队时覆盖,这使得更改队列名称变得很容易,例如,如果您需要这样做。
use tracing::info;
use sidekiq::Result;
#[derive(Clone)]
struct PaymentReportWorker {}
impl PaymentReportWorker {
fn new() -> Self {
Self { }
}
async fn send_report(&self, user_guid: String) -> Result<()> {
// TODO: Some actual work goes here...
info!({"user_guid" = user_guid}, "Sending payment report to user");
Ok(())
}
}
#[derive(Deserialize, Debug, Serialize)]
struct PaymentReportArgs {
user_guid: String,
}
#[async_trait]
impl Worker<PaymentReportArgs> for PaymentReportWorker {
// Default worker options
fn opts() -> sidekiq::WorkerOpts<Self> {
sidekiq::WorkerOpts::new().queue("yolo")
}
// Worker implementation
async fn perform(&self, args: PaymentReportArgs) -> Result<()> {
self.send_report(args.user_guid).await
}
}
创建作业
有几种插入作业的方法,但为了这个示例,我们将保持简单。给定一些工作进程,使用强类型参数进行插入。
PaymentReportWorker::perform_async(
&mut redis,
PaymentReportArgs {
user_guid: "USR-123".into(),
},
)
.await?;
您可以在入队时进行自定义覆盖。
PaymentReportWorker::opts()
.queue("brolo")
.perform_async(
&mut redis,
PaymentReportArgs {
user_guid: "USR-123".into(),
},
)
.await?;
或者,您可以使用 crate 级别的方法来获得更多控制。
sidekiq::perform_async(
&mut redis,
"PaymentReportWorker".into(),
"yolo".into(),
PaymentReportArgs {
user_guid: "USR-123".to_string(),
},
)
.await?;
在 examples/demo.rs
中查看更多示例。
唯一作业
唯一作业是通过 unique_for
选项支持的,该选项可以在工作进程中默认定义,也可以通过 SomeWorker::opts().unique_for(duration)
定义。请参阅 examples/unique.rs
示例,仅入队唯一作业,通过 (worker_name, queue_name, sha256_hash_of_job_args) 为某些定义的 ttl
。注意:这是在底层使用 SET key value NX EX duration
作为“足够好”的作业锁。
启动服务器
以下是如何创建 Processor
、注册工作进程、包含任何自定义中间件以及启动服务器的示例。
// Redis
let manager = sidekiq::RedisConnectionManager::new("redis://127.0.0.1/").unwrap();
let mut redis = bb8::Pool::builder().build(manager).await.unwrap();
// Sidekiq server
let mut p = Processor::new(
redis,
vec!["yolo".to_string(), "brolo".to_string()],
);
// Add known workers
p.register(PaymentReportWorker::new());
// Custom Middlewares
p.using(FilterExpiredUsersMiddleware::new())
.await;
// Start the server
p.run().await;
周期性作业
系统默认支持周期性cron任务。您只需要指定一个有效的cron字符串和一个工作实例。您可以可选地提供参数、队列、重试标志以及当工作被提交时将记录的名称。
示例
// Clear out all periodic jobs and their schedules
periodic::destroy_all(redis).await?;
// Add a new periodic job
periodic::builder("0 0 8 * * *")?
.name("Email clients with an oustanding balance daily at 8am UTC")
.queue("reminders")
.args(EmailReminderArgs {
report_type: "outstanding_balance",
})?
.register(&mut p, EmailReminderWorker)
.await?;
周期性任务不会自动删除。如果您的项目添加了周期性任务,然后后来又删除了periodic::builder
调用,周期性任务仍然存在于redis中。您可以在程序开始时调用periodic::destroy_all(redis).await?
以确保只执行由最新版本的程序添加的周期性任务。
实现依赖于redis中的有序集合。它存储周期性任务的json有效负载,其分数等于cron字符串的下一个计划的UTC时间。所有进程将定期轮询更改,并原子性地更新cron字符串的新下一个计划的UTC时间。成功原子性地更改分数的工作进程将入队一个新任务。未能成功更新分数的进程将继续。这个实现细节意味着周期性任务永远不会离开redis。另一个细节是,当解码并重新编码json时,可能不会产生与原始字符串相同的值。例如:{"a":"b","c":"d"}
可能变为{"c":"d","a":b"}
。为了保持json表示的一致性,在redis中更新周期性任务及其新分数时,将再次使用原始json字符串以保持一致性。
服务器中间件
sidekiq的一个伟大功能是其中间件模式。此库在rust中重新实现了sidekiq服务器中间件模式。以下示例假设您有一个仅针对付费客户执行工作的应用程序。下面的中间件将阻止已过期的客户的作业执行。关于实现的一个有趣的地方是我们可以依赖serde来条件性地检查工作进程的类型。例如,假设我只关心以用户为中心的工作进程,并且我通过其user_guid
作为参数来识别这些工作进程。使用serde可以轻松验证您的参数。
use tracing::info;
struct FilterExpiredUsersMiddleware {}
impl FilterExpiredUsersMiddleware {
fn new() -> Self {
Self { }
}
}
#[derive(Deserialize)]
struct FiltereExpiredUsersArgs {
user_guid: String,
}
impl FiltereExpiredUsersArgs {
fn is_expired(&self) -> bool {
self.user_guid == "USR-123-EXPIRED"
}
}
#[async_trait]
impl ServerMiddleware for FilterExpiredUsersMiddleware {
async fn call(
&self,
chain: ChainIter,
job: &Job,
worker: Arc<WorkerRef>,
redis: RedisPool,
) -> ServerResult {
// Use serde to check if a user_guid is part of the job args.
let args: Result<(FiltereExpiredUsersArgs,), serde_json::Error> =
serde_json::from_value(job.args.clone());
// If we can safely deserialize then attempt to filter based on user guid.
if let Ok((filter,)) = args {
if filter.is_expired() {
error!({
"class" = job.class,
"jid" = job.jid,
"user_guid" = filter.user_guid },
"Detected an expired user, skipping this job"
);
return Ok(());
}
}
// This customer is not expired, so we may continue.
chain.next(job, worker, redis).await
}
}
最佳实践
分离入队与获取连接池
虽然不是必需的,但建议使用单独的Redis连接池来将作业推送到Redis与获取作业。这有以下好处
- 这些池可以有不同的大小,每个池都根据应用程序的资源使用/限制进行了优化。
- 如果将
sidekiq::Processor
配置为拥有比连接池最大尺寸更多的工作任务,那么从队列中获取连接可能会有延迟。这对入队作业来说是个问题,因为通常希望入队尽可能快,以避免延迟其他操作的临界路径(例如,API请求)。使用单独的池进行入队,入队作业就不会受sidekiq::Processor
对池的使用影响。
#[tokio::main]
async fn main() -> Result<()> {
let manager = sidekiq::RedisConnectionManager::new("redis://127.0.0.1/").unwrap();
let redis_enqueue = bb8::Pool::builder().build(manager).await.unwrap();
let redis_fetch = bb8::Pool::builder().build(manager).await.unwrap();
let p = Processor::new(
redis_fetch,
vec!["default".to_string()],
);
p.run().await;
// ...
ExampleWorker::perform_async(&redis_enqueue, ExampleArgs { foo: "bar".to_string() }).await?;
Ok(())
}
自定义详情
为工作进程命名空间
使用redis-namespace
gem与ruby sidekiq工作进程一起使用仍然非常常见。这个库通过在构建连接池时使用连接自定义器来支持对redis命令进行命名空间。
let manager = sidekiq::RedisConnectionManager::new("redis://127.0.0.1/")?;
let redis = bb8::Pool::builder()
.connection_customizer(sidekiq::with_custom_namespace("my_cool_app".to_string()))
.build(manager)
.await?;
现在,这个库使用的所有命令都将使用前缀my_cool_app:
,例如:ZDEL my_cool_app:scheduled {...}
。
将数据库连接传递给工作进程
工作进程经常需要访问其他软件组件,如数据库连接、http客户端等。只要它们实现了Clone
,就可以在您的worker结构体上定义这些组件。示例
use tracing::debug;
use sidekiq::Result;
#[derive(Clone)]
struct ExampleWorker {
redis: RedisPool,
}
#[async_trait]
impl Worker<()> for ExampleWorker {
async fn perform(&self, args: PaymentReportArgs) -> Result<()> {
use redis::AsyncCommands;
// And then they are available here...
let times_called: usize = self
.redis
.get()
.await?
.unnamespaced_borrow_mut()
.incr("example_of_accessing_the_raw_redis_connection", 1)
.await?;
debug!({"times_called" = times_called}, "Called this worker");
}
}
#[tokio::main]
async fn main() -> Result<()> {
// ...
let mut p = Processor::new(
redis.clone(),
vec!["low_priority".to_string()],
);
p.register(ExampleWorker{ redis: redis.clone() });
}
为嵌套ruby模块下的工作进程自定义工作进程名称
您可能会发现模块下的工作进程与模块下的ruby工作进程不匹配。嵌套的rusty-sidekiq工作进程workers::MyWorker
在注册为某些"类名"时,只会保留最终的类型名MyWorker
。这意味着,如果使用类Workers::MyWorker
将ruby工作进程入队,workers::MyWorker
类型将不会处理该项工作。这是因为默认情况下,类名是在编译时基于工作进程结构体名称生成的。要覆盖此行为,可以重新定义默认特质方法之一
pub struct MyWorker;
use sidekiq::Result;
#[async_trait]
impl Worker<()> for MyWorker {
async fn perform(&self, _args: ()) -> Result<()> {
Ok(())
}
fn class_name() -> String
where
Self: Sized,
{
"Workers::MyWorker".to_string()
}
}
现在,当ruby将Workers::MyWorker
作业入队时,它将被rust-sidekiq处理。
自定义由sidekiq::Processor
产生的worker任务数量
如果一个应用的工作负载主要是IO密集型(查询数据库、发送网络请求并等待响应等),其工作进程将花费大量时间空闲await
等待未来完成。这反过来意味着CPU会大量闲置(如果主机上没有其他任务运行),导致未充分利用可用的CPU资源。
默认情况下,sidekiq::Processor
产生的worker任务数量是主机的CPU数量,但可以根据应用程序的需求进行配置,从而更有效地使用CPU资源。
#[tokio::main]
async fn main() -> Result<()> {
// ...
let num_workers = usize::from_str(&env::var("NUM_WORKERS").unwrap()).unwrap();
let config: ProcessorConfig = Default::default();
let config = config.num_workers(num_workers);
let processor = Processor::new(redis_fetch, queues.clone())
.with_config(config);
// ...
}
许可证
MIT
依赖项
~14–26MB
~396K SLoC