28个版本 (18个破坏性更新)
0.19.0 | 2024年7月9日 |
---|---|
0.18.0 | 2024年2月5日 |
0.17.0 | 2024年1月18日 |
0.15.0 | 2023年4月28日 |
0.3.1 | 2018年11月20日 |
#197 在 数据库接口
每月117 次下载
在 7 个Crate中使用(4个直接使用)
110KB
1.5K SLoC
后台任务
此Crate提供从通常同步的应用程序异步运行一些过程的工具。标准示例是Web服务,其中某些事情需要被处理,但在用户等待浏览器响应时处理它们可能不是最好的体验。
用法
将后台任务添加到您的项目
[dependencies]
actix-rt = "2.2.0"
background-jobs = "0.15.0"
serde = { version = "1.0", features = ["derive"] }
要开始使用后台任务,首先您应该定义一个任务。
任务是一组执行操作所需的数据和该操作的逻辑的组合。它们实现了Job
,serde::Serialize
和serde::DeserializeOwned
。
use background_jobs::{Job, BoxError};
use std::future::{ready, Ready};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct MyJob {
some_usize: usize,
other_usize: usize,
}
impl MyJob {
pub fn new(some_usize: usize, other_usize: usize) -> Self {
MyJob {
some_usize,
other_usize,
}
}
}
impl Job for MyJob {
type State = ();
type Error = BoxError;
type Future = Ready<Result<(), BoxError>>;
const NAME: &'static str = "MyJob";
fn run(self, _: Self::State) -> Self::Future {
info!("args: {:?}", self);
ready(Ok(()))
}
}
任务的run方法接受一个额外的参数,即任务期望使用的状态。应用程序中定义的所有任务的状态必须相同。默认情况下,状态是一个空的元组,但您可能希望传入一些Actix地址或其他内容。
让我们重新定义任务,使其关注一些应用程序状态。
#[derive(Clone, Debug)]
pub struct MyState {
pub app_name: String,
}
impl MyState {
pub fn new(app_name: &str) -> Self {
MyState {
app_name: app_name.to_owned(),
}
}
}
impl Job for MyJob {
type State = MyState;
type Error = BoxError;
type Future = Ready<Result<(), BoxError>>;
// The name of the job. It is super important that each job has a unique name,
// because otherwise one job will overwrite another job when they're being
// registered.
const NAME: &'static str = "MyJob";
// The queue that this processor belongs to
//
// Workers have the option to subscribe to specific queues, so this is important to
// determine which worker will call the processor
//
// Jobs can optionally override the queue they're spawned on
const QUEUE: &'static str = DEFAULT_QUEUE;
// The number of times background-jobs should try to retry a job before giving up
//
// Jobs can optionally override this value
const MAX_RETRIES: MaxRetries = MaxRetries::Count(1);
// The logic to determine how often to retry this job if it fails
//
// Jobs can optionally override this value
const BACKOFF: Backoff = Backoff::Exponential(2);
fn run(self, state: Self::State) -> Self::Future {
info!("{}: args, {:?}", state.app_name, self);
ready(Ok(()))
}
}
运行任务
默认情况下,此Crate随background-jobs-actix
功能一起提供。这使用background-jobs-actix
crate启动一个服务器和工作者,并提供创建新任务的机制。
background-jobs-actix
本身没有存储工作者状态的机制。这可以通过实现来自background-jobs-core
的Storage
trait手动实现,或者可以使用提供的内存存储。
这样,我们就可以回到示例中去了
主程序
use background_jobs::{create_server, actix::WorkerConfig, BoxError};
#[actix_rt::main]
async fn main() -> Result<(), BoxError> {
// Set up our Storage
// For this example, we use the default in-memory storage mechanism
use background_jobs::memory_storage::{ActixTimer, Storage};
let storage = Storage::new(ActixTimer);
// Configure and start our workers
let queue_handle = WorkerConfig::new(storage, move || MyState::new("My App"))
.register::<MyJob>()
.set_worker_count(DEFAULT_QUEUE, 16)
.start();
// Queue our jobs
queue_handle.queue(MyJob::new(1, 2))?;
queue_handle.queue(MyJob::new(3, 4))?;
queue_handle.queue(MyJob::new(5, 6))?;
// Block on Actix
actix_rt::signal::ctrl_c().await?;
Ok(())
}
完整示例
有关完整示例项目,请参阅示例文件夹
使用自己的服务器/工作者实现
如果您想基于这个想法创建自己的作业处理器,可以依赖 background-jobs-core
包,它提供了 Job 特性以及一些其他有用的类型,用于实现作业处理器和作业存储。
贡献
欢迎您为任何发现的问题提交问题。请注意,任何贡献的代码都将根据 AGPLv3 许可协议授权。
许可证
版权所有 © 2022 Riley Trautman
background-jobs 是免费软件:您可以按照自由软件基金会发布的 GNU 通用公共许可证(第 3 版或您选择的任何较新版本)的条款重新分发和/或修改它。
background-jobs 以希望它将是有用的方式分发,但没有任何保证;甚至没有关于其适销性或适用于特定目的的暗示保证。有关详细信息,请参阅 GNU 通用公共许可证。此文件是 background-jobs 的一部分。
您应该已收到与 background-jobs 一起的 GNU 通用公共许可证副本。如果没有,请参阅 http://www.gnu.org/licenses/。
依赖项
~3–15MB
~176K SLoC