#rabbitmq #queue #mq #service #message-queue #heartbeat #activemq

stomp

一个完整的STOMP 1.2客户端实现。允许程序与ActiveMQ和RabbitMQ等消息队列服务交互。

21个版本 (8个重大变更)

使用旧的Rust 2015

0.11.0 2015年8月24日
0.10.2 2015年7月29日
0.10.1 2015年5月29日
0.8.3 2015年3月29日
0.3.3 2014年11月23日

#1360 in 异步

MIT 许可证

66KB
1.5K SLoC

stomp-rs

stomp-rsRust编程语言 提供了完整的 STOMP 1.2客户端实现。这允许使用Rust编写的程序与ActiveMQ、RabbitMQ、HornetQ和OpenMQ等消息队列服务交互。

  • 连接
  • 订阅
  • 发送
  • 确认(自动/客户端/客户端个体)
  • 事务
  • 收据
  • 断开连接
  • 心跳

stomp-rs 的API尚不稳定,可能在v1.0之前有所变动。

示例

连接 / 订阅 / 发送

extern crate stomp;
use stomp::frame::Frame;
use stomp::subscription::AckOrNack::Ack;

fn main() {
  
  let destination = "/topic/messages";
  let mut message_count: u64 = 0;

  let mut session = match stomp::session("127.0.0.1", 61613).start() {
      Ok(session) => session,
      Err(error)  => panic!("Could not connect to the server: {}", error)
   };
  
  session.subscription(destination, |frame: &Frame| {
    message_count += 1;
    println!("Received message #{}:\n{}", message_count, frame);
    Ack
  }).start();
  
  session.message(destination, "Animal").send();
  session.message(destination, "Vegetable").send();
  session.message(destination, "Mineral").send();
  
  session.listen(); // Loops infinitely, awaiting messages

  session.disconnect();
}

会话配置

use stomp::header::header::Header;
use stomp::connection::{HeartBeat, Credentials};
// ...
let mut session = match stomp::session("127.0.0.1", 61613)
  .with(Credentials("sullivan", "m1k4d0"))
  .with(HeartBeat(5000, 2000))
  .with(Header::new("custom-client-id", "hmspna4"))
  .start() {
      Ok(session) => session,
      Err(error)  => panic!("Could not connect to the server: {}", error)
   };

消息配置

use stomp::header::{Header, SuppressedHeader, ContentType};
// ...
session.message(destination, "Hypoteneuse".as_bytes())
  .with(ContentType("text/plain"))
  .with(Header::new("persistent", "true"))
  .with(SuppressedHeader("content-length")
  .send();

订阅配置

use stomp::subscription::AckMode;
use stomp::header::Header;
use stomp::frame::Frame;
// ...
  let id = session.subscription(destination, |frame: &Frame| {
    message_count += 1;
    println!("Received message #{}:\n{}", message_count, frame);
    Ack
  })
  .with(AckMode::Client)
  .with(Header::new("custom-subscription-header", "lozenge"))
  .start();

事务

match session.begin_transaction() {
  Ok(mut transaction) => {
    transaction.message(destination, "Animal").send();
    transaction.message(destination, "Vegetable").send();
    transaction.message(destination, "Mineral").send();
    transaction.commit();
},
  Err(error)  => panic!("Could not connect to the server: {}", error)
};

处理 RECEIPT 框架

如果您在消息中包含一个ReceiptHandler,客户端将请求服务器在成功处理框架后发送收据。

session.message(destination, "text/plain", "Hypoteneuse".as_bytes())
  .with(ReceiptHandler::new(|frame: &Frame| println!("Got a receipt for 'Hypoteneuse'.")))
  .send();

处理 ERROR 框架

要处理错误,您可以注册一个错误处理程序

session.on_error(|frame: &Frame| {
  panic!("ERROR frame received:\n{}", frame);
});

操作入站和出站框架

在某些情况下,经纪人可能施加规则或限制,这可能会使您需要以API未方便公开的方式直接修改框架。在这种情况下,您可以使用 on_before_sendon_before_receive 方法来指定在发送或接收每个框架之前执行此自定义逻辑的回调。

例如

// Require that all NACKs include a header specifying an optional requeue policy
session.on_before_send(|frame: &mut Frame| {
  if frame.command == "NACK" {
    frame.headers.push(Header::new("requeue", "false"));
  }
});

session.on_before_receive(|frame: &mut Frame| {
  if frame.command == "MESSAGE" {
    // Modify the frame
  }
});

Cargo.toml

[package]

name = "stomp_test"
version = "0.0.1"
authors = ["your_name_here"]

[[bin]]

name = "stomp_test"

[dependencies.stomp]

stomp = "*"

关键词: StompRustrust-langrustlangcargoActiveMQRabbitMQHornetQOpenMQ消息队列MQ

依赖项

~4MB
~73K SLoC