4个版本 (2个重大更改)
0.3.0 | 2023年12月21日 |
---|---|
0.2.0 | 2022年6月20日 |
0.1.1 | 2022年5月30日 |
0.1.0 | 2022年5月29日 |
#648 in 异步
每月下载量 25次
47KB
839 行
Bonsaimq
基于 bonsaidb 的简单数据库消息队列。
该项目深受 sqlxmq 的影响。
警告:该项目处于早期alpha阶段,不应在生产环境中使用!
用法
使用以下命令导入项目
cargo add bonsaimq
或
# adjust the version to the latest version:
bonsaimq = "0.3.0"
# or
bonsaimq = { git = "https://github.com/FlixCoder/bonsaimq" }
然后您可以按如下方式使用消息/任务队列
- 您需要任务处理器,它们是异步函数,接收一个类型为
CurrentJob
的参数并返回空。CurrentJob
允许与任务交互以检索任务输入或完成任务等。 - 需要使用宏
job_regristy!
创建任务注册表,该注册表将消息名称/类型映射到任务处理器,并允许启动新任务。 - 需要创建一个任务运行器并在bonsai数据库上运行。它在作用域内后台运行,根据传入的消息执行任务。它对任务注册表进行操作。
示例
除了以下简单示例外,请参阅 示例文件夹 中的示例并查看测试用例。
use bonsaidb::local::{
config::{Builder, StorageConfiguration},
AsyncDatabase,
};
use bonsaimq::{job_registry, CurrentJob, JobRegister, JobRunner, MessageQueueSchema};
use color_eyre::Result;
/// Example job function. It receives a handle to the current job, which gives
/// the ability to get the input payload, complete the job and more.
async fn greet(mut job: CurrentJob) -> color_eyre::Result<()> {
// Load the JSON payload and make sure it is there.
let name: String = job.payload_json().expect("input should be given")?;
println!("Hello {name}!");
job.complete().await?;
Ok(())
}
// The JobRegistry provides a way to spawn new jobs and provides the interface
// for the JobRunner to find the functions to execute for the jobs.
job_registry!(JobRegistry, {
Greetings: "greet" => greet,
});
#[tokio::main]
async fn main() -> Result<()> {
// Open a local database for this example.
let db_path = "simple-doc-example.bonsaidb";
let db = AsyncDatabase::open::<MessageQueueSchema>(StorageConfiguration::new(db_path)).await?;
// Start the job runner to execute jobs from the messages in the queue in the
// database.
let job_runner = JobRunner::new(db.clone()).run::<JobRegistry>();
// Spawn new jobs via a message on the database queue.
let job_id = JobRegistry::Greetings.builder().payload_json("cats")?.spawn(&db).await?;
// Wait for job to finish execution, polling every 100 ms.
bonsaimq::await_job(job_id, 100, &db).await?;
// Clean up.
job_runner.abort(); // Is done automatically on drop.
tokio::fs::remove_dir_all(db_path).await?;
Ok(())
}
代码检查
该项目使用大量clippy代码检查以提高代码质量和风格。
使用 cargo-lints
安装 cargo-lints,命令为 cargo install --git https://github.com/FlixCoder/cargo-lints
。代码检查定义在 lints.toml
中,可以通过运行 cargo lints clippy --all-targets --workspace
来检查。
依赖项
~11-22MB
~307K SLoC