#amqp #lapin #wrapper #object #channel #creation #manager

amqp-manager

Lapin 包装器,封装了连接/通道的使用以及 amqp 对象的创建

17 个版本 (6 个稳定版)

1.3.1 2021 年 11 月 15 日
1.2.0 2021 年 6 月 28 日
1.0.0 2021 年 3 月 11 日
0.7.0 2021 年 5 月 21 日
0.2.1 2020 年 11 月 20 日

#1260 in 异步

Download history 7/week @ 2024-04-02 11/week @ 2024-04-09 2/week @ 2024-06-11

每月 67 次下载

MIT 许可证

20KB
201 代码行

docs crates.io-version tests audit crates.io-license

Lapin 包装器,封装了连接/通道的使用以及 amqp 对象的创建。

用法

use amqp_manager::prelude::*;
use deadpool_lapin::{Config, Runtime};
use futures::FutureExt;

#[tokio::main]
async fn main() {
    let pool = Config {
        url: Some("amqp://guest:[email protected]:5672//".to_string()),
        ..Default::default()
    }
        .create_pool(Some(Runtime::Tokio1))
        .expect("Should create DeadPool instance");
    let manager = AmqpManager::new(pool);
    let session = manager
        .create_session_with_confirm_select()
        .await
        .expect("Should create AmqpSession instance");

    let create_queue_op = CreateQueue {
        options: QueueDeclareOptions {
            auto_delete: true,
            ..Default::default()
        },
        ..Default::default()
    };
    let queue = session.create_queue(create_queue_op.clone()).await.expect("create_queue");

    let confirmation = session
        .publish_to_routing_key(PublishToRoutingKey {
            routing_key: queue.name().as_str(),
            payload: "Hello World".as_bytes(),
            ..Default::default()
        })
        .await
        .expect("publish_to_queue");
    assert!(confirmation.is_ack());

    session
        .create_consumer_with_delegate(
            CreateConsumer {
                queue_name: queue.name().as_str(),
                ..Default::default()
            },
            move |delivery: DeliveryResult| async {
                if let Ok(Some((channel, delivery))) = delivery {
                    let payload = std::str::from_utf8(&delivery.data).unwrap();
                    assert_eq!(payload, "Hello World");
                    channel
                        .basic_ack(delivery.delivery_tag, BasicAckOptions::default())
                        .map(|_| ())
                        .await;
                }
            },
        )
        .await
        .expect("create_consumer");

    let queue = session.create_queue(create_queue_op.clone()).await.expect("create_queue");
    assert_eq!(queue.message_count(), 0, "Messages has been consumed");
}

构建时要求

请参阅 lapin 和 deadpool 仓库中关于其要求的详细信息。

依赖项

~7–19MB
~286K SLoC