#消息队列 #任务队列 #队列 #任务 #消息 #持久化

bonsaimq

基于bonsaidb的消息/任务队列,类似于sqlxmq

4个版本 (2个重大更改)

0.3.0 2023年12月21日
0.2.0 2022年6月20日
0.1.1 2022年5月30日
0.1.0 2022年5月29日

#648 in 异步

每月下载量 25次

MIT 许可证

47KB
839

Bonsaimq

crates.io page docs.rs page license: MIT

基于 bonsaidb 的简单数据库消息队列。

该项目深受 sqlxmq 的影响。

警告:该项目处于早期alpha阶段,不应在生产环境中使用!

用法

使用以下命令导入项目

cargo add bonsaimq

# adjust the version to the latest version:
bonsaimq = "0.3.0"
# or
bonsaimq = { git = "https://github.com/FlixCoder/bonsaimq" }

然后您可以按如下方式使用消息/任务队列

  • 您需要任务处理器,它们是异步函数,接收一个类型为 CurrentJob 的参数并返回空。 CurrentJob 允许与任务交互以检索任务输入或完成任务等。
  • 需要使用宏 job_regristy! 创建任务注册表,该注册表将消息名称/类型映射到任务处理器,并允许启动新任务。
  • 需要创建一个任务运行器并在bonsai数据库上运行。它在作用域内后台运行,根据传入的消息执行任务。它对任务注册表进行操作。

示例

除了以下简单示例外,请参阅 示例文件夹 中的示例并查看测试用例。

use bonsaidb::local::{
    config::{Builder, StorageConfiguration},
    AsyncDatabase,
};
use bonsaimq::{job_registry, CurrentJob, JobRegister, JobRunner, MessageQueueSchema};
use color_eyre::Result;

/// Example job function. It receives a handle to the current job, which gives
/// the ability to get the input payload, complete the job and more.
async fn greet(mut job: CurrentJob) -> color_eyre::Result<()> {
    // Load the JSON payload and make sure it is there.
    let name: String = job.payload_json().expect("input should be given")?;
    println!("Hello {name}!");
    job.complete().await?;
    Ok(())
}

// The JobRegistry provides a way to spawn new jobs and provides the interface
// for the JobRunner to find the functions to execute for the jobs.
job_registry!(JobRegistry, {
    Greetings: "greet" => greet,
});

#[tokio::main]
async fn main() -> Result<()> {
    // Open a local database for this example.
    let db_path = "simple-doc-example.bonsaidb";
    let db = AsyncDatabase::open::<MessageQueueSchema>(StorageConfiguration::new(db_path)).await?;

    // Start the job runner to execute jobs from the messages in the queue in the
    // database.
    let job_runner = JobRunner::new(db.clone()).run::<JobRegistry>();

    // Spawn new jobs via a message on the database queue.
    let job_id = JobRegistry::Greetings.builder().payload_json("cats")?.spawn(&db).await?;

    // Wait for job to finish execution, polling every 100 ms.
    bonsaimq::await_job(job_id, 100, &db).await?;

    // Clean up.
    job_runner.abort(); // Is done automatically on drop.
    tokio::fs::remove_dir_all(db_path).await?;
    Ok(())
}

代码检查

该项目使用大量clippy代码检查以提高代码质量和风格。

使用 cargo-lints 安装 cargo-lints,命令为 cargo install --git https://github.com/FlixCoder/cargo-lints。代码检查定义在 lints.toml 中,可以通过运行 cargo lints clippy --all-targets --workspace 来检查。

依赖项

~11-22MB
~307K SLoC