3 个不稳定版本
0.4.1 | 2024 年 3 月 13 日 |
---|---|
0.4.0 | 2023 年 5 月 9 日 |
0.3.0 | 2023 年 3 月 24 日 |
#180 在 异步
每月 39 次下载
92KB
1.5K SLoC
Rabbit auto 使用 https://github.com/CleverCloud/lapin.
该库提供消费者和发布者,在第一次成功连接后运行,直到手动关闭。如果在连接到 RabbitMQ 时断开连接,它们将等待重新建立连接而不会失败。
当前使用的异步运行时是 tokio,但它可以很容易地扩展(在库代码中)以使用其他运行时。
版本 0.3.0 有一个错误,阻止了自动重连。存在一些死锁,我无法追踪,因此我对整个过程进行了重构,并保持了大部分接口不变。但 configure
函数已更改,它不再是异步函数。目前最新版本仅支持 tokio。配置需要 FullExecutor 特性和 Reactor 特性。使用 tokio-reactor-trait 和 tokio-executor-trait 可以轻松实现。
use rabbit_auto::publisher::{Serialise, PublishWrapper, simple_confirm_options};
use rabbit_auto::comms::Comms;
use rabbit_auto::config::Config;
/// If the configure is not called, the library will kill the app using `std::process::exit()`
/// The configure function is no longer async!
Comms::configure(Config::new(
"app-tag", // the tag of the application for the rabbitmq
"rabbit-host-ip",
"rabbit user",
"rabbit password",
// reconnection interval when the connection is lost
Duration::from_secs(3),
tokio_executor_trait::Tokio::current(),
tokio_reactor_trait::Tokio,
);
// Publisher Sink which expects the MsgOut and routing key String
// the created publisher of type impl Sink<(MsgOut, String)> + Unpin,
let publisher = PublishWrapper::<(MsgOut, String), _>::with_dynamic_key(
"the exchange",
Some(simple_confirm_options()),
None, // basic publish options callback
None, // basic publish properties callback
).await;
// Publisher Sink which expects the MsgOut
// the created publisher of type impl Sink<MsgOut> + Unpin,
let publisher = PublishWrapper::<MsgOut, _>::with_static_key(
"the exchange",
"the routing key",
Some(simple_confirm_options()),
None, // basic publish options callback
None, // basic publish properties callback
).await;
// Creates a consumer which returns a Stream of items and auto ack objects.
// The item has to implement Deserialise trait so it is automatically deserialised.
// The auto ack object will automatically acks the delivery when the object drops.
let consumer = StreamBuilder {
tag: "the app id",
routing_key: "the routing key",
exchange: "the exchange",
..StreamBuilder::default()}
.create_auto_ack().await;
依赖项
~9–21MB
~328K SLoC