38 个版本 (21 个重大更新)
0.22.1 | 2020 年 3 月 11 日 |
---|---|
0.22.0 | 2019 年 6 月 20 日 |
0.21.3 | 2019 年 6 月 18 日 |
0.18.0 | 2019 年 3 月 3 日 |
0.6.0 | 2017 年 3 月 30 日 |
#847 在 数据库接口
每月 99 次下载
在 10 个 Crates 中使用 (通过 批量)
105KB
2.5K SLoC
已废弃 - 请使用 lapin 代替
lib.rs
:
lapin-async
此库旨在在事件循环中使用。该库通过 Connection 结构体 提供一个状态机,您可以通过您管理的 IO 来驱动它。
通常,您的代码将拥有套接字和缓冲区,并定期将输入和输出缓冲区传递给状态机,以便它接收消息并将新消息序列化以发送。然后您可以查询当前状态,以查看是否收到了新的消息供消费者使用。
示例
use env_logger;
use lapin_async as lapin;
use log::info;
use crate::lapin::{
BasicProperties, Channel, Connection, ConnectionProperties, ConsumerSubscriber,
message::Delivery,
options::*,
types::FieldTable,
};
#[derive(Clone,Debug)]
struct Subscriber {
channel: Channel,
}
impl ConsumerSubscriber for Subscriber {
fn new_delivery(&self, delivery: Delivery) {
self.channel.basic_ack(delivery.delivery_tag, BasicAckOptions::default()).as_error().expect("basic_ack");
}
fn drop_prefetched_messages(&self) {}
fn cancel(&self) {}
}
fn main() {
env_logger::init();
let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
let conn = Connection::connect(&addr, ConnectionProperties::default()).wait().expect("connection error");
info!("CONNECTED");
let channel_a = conn.create_channel().wait().expect("create_channel");
let channel_b = conn.create_channel().wait().expect("create_channel");
channel_a.queue_declare("hello", QueueDeclareOptions::default(), FieldTable::default()).wait().expect("queue_declare");
let queue = channel_b.queue_declare("hello", QueueDeclareOptions::default(), FieldTable::default()).wait().expect("queue_declare");
info!("will consume");
channel_b.basic_consume(&queue, "my_consumer", BasicConsumeOptions::default(), FieldTable::default(), Box::new(Subscriber { channel: channel_b.clone() })).wait().expect("basic_consume");
let payload = b"Hello world!";
loop {
channel_a.basic_publish("", "hello", BasicPublishOptions::default(), payload.to_vec(), BasicProperties::default()).wait().expect("basic_publish");
}
}
依赖项
~5–17MB
~275K SLoC