#amqp #message #worker

apalis-amqp

使用apalis和Amqp的Rust消息队列工具

10个不稳定版本 (3个破坏性版本)

0.4.0-rc.12024年7月25日
0.3.0 2024年3月1日
0.2.2 2023年5月25日
0.2.0-alpha.22023年4月12日
0.1.1 2023年4月9日

#543数据库接口

Download history 33/week @ 2024-07-06 1/week @ 2024-07-13 75/week @ 2024-07-20 34/week @ 2024-07-27

每月143次下载

Apache-2.0

19KB
310

apalis-amqp

使用apalis和AMQP的消息队列


概述

apalis-amqp 是一个Rust crate,它提供了集成 apalis 与AMQP消息队列系统的实用工具。它包括用于推送和弹出消息的 AmqpBackend 实现以及用于从AMQP队列消费消息的 MessageQueue<M> 实现。

特性

  • apalis和AMQP消息队列系统之间的集成。
  • 轻松创建基于AMQP的消息队列。
  • 简单地将AMQP消息作为apalis消息消费。
  • 通过 tower 层支持消息确认和拒绝。
  • 支持所有apalis中间件,如速率限制、超时、过滤、sentry、prometheus等。
  • 支持ack消息,并允许将自定义结果保存到其他后端。

入门

在尝试连接之前,您需要一个可工作的amqp后端。我们可以轻松地使用Docker进行设置

设置RabbitMq

docker run -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=apalis -e RABBITMQ_DEFAULT_PASS=apalis  rabbitmq:3.8.4-management

设置rust代码

将apalis-amqp添加到您的Cargo.toml中

[dependencies]
apalis = "0.6"
apalis-amqp = "0.4"
serde = "1"

然后在main.rs中添加

 use apalis::prelude::*;
 use apalis_amqp::AmqpBackend;
 use serde::{Deserialize, Serialize};

 #[derive(Debug, Serialize, Deserialize)]
 struct TestMessage(usize);

 async fn test_message(message: TestMessage) {
     dbg!(message);
 }

 #[tokio::main]
 async fn main() {
     let env = std::env::var("AMQP_ADDR").unwrap();
     let mq = AmqpBackend::<TestMessage>::new_from_addr(&env).await.unwrap();
     // This can be in another place in the program
     mq.enqueue(TestMessage(42)).await.unwrap();
     Monitor::<TokioExecutor>::new()
         .register(
             WorkerBuilder::new("rango-amigo")
                 .backend(mq)
                 .build_fn(test_message),
         )
         .run()
         .await
         .unwrap();
 }

运行您的代码并获利!

许可

apalis-amqp采用Apache许可证。有关详细信息,请参阅LICENSE文件。

依赖关系

~11–23MB
~359K SLoC