3 个不稳定版本

0.4.1 2024 年 3 月 13 日
0.4.0 2023 年 5 月 9 日
0.3.0 2023 年 3 月 24 日

#180异步

每月 39 次下载

MIT 许可证

92KB
1.5K SLoC

Rabbit auto 使用 https://github.com/CleverCloud/lapin.
该库提供消费者和发布者,在第一次成功连接后运行,直到手动关闭。如果在连接到 RabbitMQ 时断开连接,它们将等待重新建立连接而不会失败。

当前使用的异步运行时是 tokio,但它可以很容易地扩展(在库代码中)以使用其他运行时。

版本 0.3.0 有一个错误,阻止了自动重连。存在一些死锁,我无法追踪,因此我对整个过程进行了重构,并保持了大部分接口不变。但 configure 函数已更改,它不再是异步函数。目前最新版本仅支持 tokio。配置需要 FullExecutor 特性和 Reactor 特性。使用 tokio-reactor-trait 和 tokio-executor-trait 可以轻松实现。


use rabbit_auto::publisher::{Serialise, PublishWrapper, simple_confirm_options};
use rabbit_auto::comms::Comms;
use rabbit_auto::config::Config;

/// If the configure is not called, the library will kill the app using `std::process::exit()`
/// The configure function is no longer async!
Comms::configure(Config::new(
    "app-tag", // the tag of the application for the rabbitmq
    "rabbit-host-ip",
    "rabbit user",
    "rabbit password",
    // reconnection interval when the connection is lost
    Duration::from_secs(3),
    tokio_executor_trait::Tokio::current(),
    tokio_reactor_trait::Tokio,
);

// Publisher Sink which expects the MsgOut and routing key String
// the created publisher of type impl Sink<(MsgOut, String)> + Unpin,
let publisher = PublishWrapper::<(MsgOut, String), _>::with_dynamic_key(
    "the exchange",
    Some(simple_confirm_options()),
    None, // basic publish options callback
    None, // basic publish properties callback 
).await;

// Publisher Sink which expects the MsgOut
// the created publisher of type impl Sink<MsgOut> + Unpin,
let publisher = PublishWrapper::<MsgOut, _>::with_static_key(
    "the exchange",
    "the routing key",
    Some(simple_confirm_options()),
    None, // basic publish options callback
    None, // basic publish properties callback 
).await;

// Creates a consumer which returns a Stream of items and auto ack objects.
// The item has to implement Deserialise trait so it is automatically deserialised.
// The auto ack object will automatically acks the delivery when the object drops.
let consumer = StreamBuilder {
        tag: "the app id",
        routing_key: "the routing key",
        exchange: "the exchange",
        ..StreamBuilder::default()}
    .create_auto_ack().await;

依赖项

~9–21MB
~328K SLoC