38 个版本 (21 个重大更新)

0.22.1 2020 年 3 月 11 日
0.22.0 2019 年 6 月 20 日
0.21.3 2019 年 6 月 18 日
0.18.0 2019 年 3 月 3 日
0.6.0 2017 年 3 月 30 日

#847数据库接口

Download history 43/week @ 2024-03-11 23/week @ 2024-03-18 14/week @ 2024-03-25 147/week @ 2024-04-01 11/week @ 2024-04-08 23/week @ 2024-04-15 24/week @ 2024-04-22 23/week @ 2024-04-29 25/week @ 2024-05-06 20/week @ 2024-05-13 19/week @ 2024-05-20 22/week @ 2024-05-27 28/week @ 2024-06-03 21/week @ 2024-06-10 29/week @ 2024-06-17 19/week @ 2024-06-24

每月 99 次下载
10 个 Crates 中使用 (通过 批量)

MIT 许可

105KB
2.5K SLoC

已废弃 - 请使用 lapin 代替


lib.rs:

lapin-async

此库旨在在事件循环中使用。该库通过 Connection 结构体 提供一个状态机,您可以通过您管理的 IO 来驱动它。

通常,您的代码将拥有套接字和缓冲区,并定期将输入和输出缓冲区传递给状态机,以便它接收消息并将新消息序列化以发送。然后您可以查询当前状态,以查看是否收到了新的消息供消费者使用。

示例

use env_logger;
use lapin_async as lapin;
use log::info;

use crate::lapin::{
  BasicProperties, Channel, Connection, ConnectionProperties, ConsumerSubscriber,
  message::Delivery,
  options::*,
  types::FieldTable,
};

#[derive(Clone,Debug)]
struct Subscriber {
  channel: Channel,
}

impl ConsumerSubscriber for Subscriber {
  fn new_delivery(&self, delivery: Delivery) {
    self.channel.basic_ack(delivery.delivery_tag, BasicAckOptions::default()).as_error().expect("basic_ack");
  }
  fn drop_prefetched_messages(&self) {}
  fn cancel(&self) {}
}

fn main() {
  env_logger::init();

  let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
  let conn = Connection::connect(&addr, ConnectionProperties::default()).wait().expect("connection error");

  info!("CONNECTED");

  let channel_a = conn.create_channel().wait().expect("create_channel");
  let channel_b = conn.create_channel().wait().expect("create_channel");

  channel_a.queue_declare("hello", QueueDeclareOptions::default(), FieldTable::default()).wait().expect("queue_declare");
  let queue = channel_b.queue_declare("hello", QueueDeclareOptions::default(), FieldTable::default()).wait().expect("queue_declare");

  info!("will consume");
  channel_b.basic_consume(&queue, "my_consumer", BasicConsumeOptions::default(), FieldTable::default(), Box::new(Subscriber { channel: channel_b.clone() })).wait().expect("basic_consume");

  let payload = b"Hello world!";

  loop {
    channel_a.basic_publish("", "hello", BasicPublishOptions::default(), payload.to_vec(), BasicProperties::default()).wait().expect("basic_publish");
  }
}

依赖项

~5–17MB
~275K SLoC