7个不稳定版本
0.9.1 | 2021年2月6日 |
---|---|
0.9.0 | 2021年2月4日 |
0.3.0 | 2019年10月30日 |
0.2.0 | 2019年9月22日 |
0.1.2 | 2019年5月25日 |
#164 在 数据库实现
每月26次下载
11KB
189 行
后台任务
这个crate提供了从通常同步的应用程序异步运行某些过程的工具。标准示例是Web服务,其中某些事情需要处理,但在用户等待浏览器响应时处理它们可能不是最佳体验。
用法
将后台任务添加到你的项目中
[dependencies]
actix-rt = "2.0.0"
background-jobs = "0.9.0"
anyhow = "1.0"
futures = "0.3"
serde = { version = "1.0", features = ["derive"] }
要开始使用后台任务,首先你应该定义一个任务。
任务是一系列执行操作所需的数据和操作逻辑的组合。它们实现Job
,serde::Serialize
和serde::DeserializeOwned
。
use background_jobs::Job;
use anyhow::Error;
use futures::future::{ok, Ready};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct MyJob {
some_usize: usize,
other_usize: usize,
}
impl MyJob {
pub fn new(some_usize: usize, other_usize: usize) -> Self {
MyJob {
some_usize,
other_usize,
}
}
}
impl Job for MyJob {
type State = ();
type Future = Ready<Result<(), Error>>;
const NAME: &'static str = "MyJob";
fn run(self, _: Self::State) -> Self::Future {
info!("args: {:?}", self);
ok(())
}
}
任务的run方法还接受一个额外的参数,即任务期望使用的状态。应用程序中定义的所有任务的状态必须相同。默认情况下,状态是一个空元组,但你可能希望传递一些Actix地址或其他内容。
让我们重新定义任务,使其关心一些应用程序状态。
#[derive(Clone, Debug)]
pub struct MyState {
pub app_name: String,
}
impl MyState {
pub fn new(app_name: &str) -> Self {
MyState {
app_name: app_name.to_owned(),
}
}
}
impl Job for MyJob {
type State = MyState;
type Future = Ready<Result<(), Error>>;
// The name of the job. It is super important that each job has a unique name,
// because otherwise one job will overwrite another job when they're being
// registered.
const NAME: &'static str = "MyJob";
// The queue that this processor belongs to
//
// Workers have the option to subscribe to specific queues, so this is important to
// determine which worker will call the processor
//
// Jobs can optionally override the queue they're spawned on
const QUEUE: &'static str = DEFAULT_QUEUE;
// The number of times background-jobs should try to retry a job before giving up
//
// Jobs can optionally override this value
const MAX_RETRIES: MaxRetries = MaxRetries::Count(1);
// The logic to determine how often to retry this job if it fails
//
// Jobs can optionally override this value
const BACKOFF: Backoff = Backoff::Exponential(2);
fn run(self, state: Self::State) -> Self::Future {
info!("{}: args, {:?}", state.app_name, self);
ok(())
}
}
运行任务
默认情况下,这个crate包含启用的background-jobs-actix
功能。这使用background-jobs-actix
crate来启动服务器和工作者,并提供创建新任务的方法。
background-jobs-actix
本身没有存储工作者状态的方法。这可以通过手动实现来自background-jobs-core
的Storage
trait来实现,或者可以使用提供的内存存储。
现在我们已经处理好了,回到示例
主函数
use background_jobs::{create_server, WorkerConfig};
use anyhow::Error;
#[actix_rt::main]
async fn main() -> Result<(), Error> {
env_logger::init();
// Set up our Storage
// For this example, we use the default in-memory storage mechanism
use background_jobs::memory_storage::Storage;
let storage = Storage::new();
// Start the application server. This guards access to to the jobs store
let queue_handle = create_server(storage);
// Configure and start our workers
WorkerConfig::new(move || MyState::new("My App"))
.register::<MyJob>()
.set_worker_count(DEFAULT_QUEUE, 16)
.start(queue_handle.clone());
// Queue our jobs
queue_handle.queue(MyJob::new(1, 2))?;
queue_handle.queue(MyJob::new(3, 4))?;
queue_handle.queue(MyJob::new(5, 6))?;
// Block on Actix
actix_rt::signal::ctrl_c().await?;
Ok(())
}
完整示例
有关完整示例项目的详细信息,请参阅示例文件夹
提供自己的服务器/工作者实现
如果您想根据这个想法创建自己的任务处理器,可以依赖于提供Job trait以及其他实现任务处理器和任务存储的有用类型的background-jobs-core
crate。
贡献
对于您发现的任何问题,请随时提交问题。请注意,任何贡献的代码都将根据GPLv3授权。
许可证
本作品根据合作软件许可证授权。这不是免费软件许可证,但可能被视为“源可用许可证”。对于大多数业余爱好者、自雇开发者、员工所有公司以及合作社,只要本软件根据CSL条款分发,就可以在大多数项目中使用此软件。有关更多信息,请参阅提供的LICENSE文件。如果不存在,许可证可以在网上找到,这里。如果您是免费软件项目,并希望在GNU Affero通用公共许可证的条款下使用此软件,请通过[email protected]联系我,我们可以解决这个问题。如果您希望在任何其他许可证下使用此项目,特别是在专有软件中,答案很可能是不行。
依赖项
~6–17MB
~200K SLoC