21个版本 (8个重大变更)
使用旧的Rust 2015
0.11.0 | 2015年8月24日 |
---|---|
0.10.2 | 2015年7月29日 |
0.10.1 | 2015年5月29日 |
0.8.3 | 2015年3月29日 |
0.3.3 | 2014年11月23日 |
#1360 in 异步
66KB
1.5K SLoC
stomp-rs
stomp-rs
为 Rust编程语言 提供了完整的 STOMP 1.2客户端实现。这允许使用Rust编写的程序与ActiveMQ、RabbitMQ、HornetQ和OpenMQ等消息队列服务交互。
- 连接
- 订阅
- 发送
- 确认(自动/客户端/客户端个体)
- 事务
- 收据
- 断开连接
- 心跳
stomp-rs
的API尚不稳定,可能在v1.0之前有所变动。
示例
连接 / 订阅 / 发送
extern crate stomp;
use stomp::frame::Frame;
use stomp::subscription::AckOrNack::Ack;
fn main() {
let destination = "/topic/messages";
let mut message_count: u64 = 0;
let mut session = match stomp::session("127.0.0.1", 61613).start() {
Ok(session) => session,
Err(error) => panic!("Could not connect to the server: {}", error)
};
session.subscription(destination, |frame: &Frame| {
message_count += 1;
println!("Received message #{}:\n{}", message_count, frame);
Ack
}).start();
session.message(destination, "Animal").send();
session.message(destination, "Vegetable").send();
session.message(destination, "Mineral").send();
session.listen(); // Loops infinitely, awaiting messages
session.disconnect();
}
会话配置
use stomp::header::header::Header;
use stomp::connection::{HeartBeat, Credentials};
// ...
let mut session = match stomp::session("127.0.0.1", 61613)
.with(Credentials("sullivan", "m1k4d0"))
.with(HeartBeat(5000, 2000))
.with(Header::new("custom-client-id", "hmspna4"))
.start() {
Ok(session) => session,
Err(error) => panic!("Could not connect to the server: {}", error)
};
消息配置
use stomp::header::{Header, SuppressedHeader, ContentType};
// ...
session.message(destination, "Hypoteneuse".as_bytes())
.with(ContentType("text/plain"))
.with(Header::new("persistent", "true"))
.with(SuppressedHeader("content-length")
.send();
订阅配置
use stomp::subscription::AckMode;
use stomp::header::Header;
use stomp::frame::Frame;
// ...
let id = session.subscription(destination, |frame: &Frame| {
message_count += 1;
println!("Received message #{}:\n{}", message_count, frame);
Ack
})
.with(AckMode::Client)
.with(Header::new("custom-subscription-header", "lozenge"))
.start();
事务
match session.begin_transaction() {
Ok(mut transaction) => {
transaction.message(destination, "Animal").send();
transaction.message(destination, "Vegetable").send();
transaction.message(destination, "Mineral").send();
transaction.commit();
},
Err(error) => panic!("Could not connect to the server: {}", error)
};
处理 RECEIPT 框架
如果您在消息中包含一个ReceiptHandler,客户端将请求服务器在成功处理框架后发送收据。
session.message(destination, "text/plain", "Hypoteneuse".as_bytes())
.with(ReceiptHandler::new(|frame: &Frame| println!("Got a receipt for 'Hypoteneuse'.")))
.send();
处理 ERROR 框架
要处理错误,您可以注册一个错误处理程序
session.on_error(|frame: &Frame| {
panic!("ERROR frame received:\n{}", frame);
});
操作入站和出站框架
在某些情况下,经纪人可能施加规则或限制,这可能会使您需要以API未方便公开的方式直接修改框架。在这种情况下,您可以使用 on_before_send
和 on_before_receive
方法来指定在发送或接收每个框架之前执行此自定义逻辑的回调。
例如
// Require that all NACKs include a header specifying an optional requeue policy
session.on_before_send(|frame: &mut Frame| {
if frame.command == "NACK" {
frame.headers.push(Header::new("requeue", "false"));
}
});
session.on_before_receive(|frame: &mut Frame| {
if frame.command == "MESSAGE" {
// Modify the frame
}
});
Cargo.toml
[package]
name = "stomp_test"
version = "0.0.1"
authors = ["your_name_here"]
[[bin]]
name = "stomp_test"
[dependencies.stomp]
stomp = "*"
关键词: Stomp
、Rust
、rust-lang
、rustlang
、cargo
、ActiveMQ
、RabbitMQ
、HornetQ
、OpenMQ
、消息队列
、MQ
依赖项
~4MB
~73K SLoC