117个版本 (12个重大更新)
0.12.0 | 2024年8月7日 |
---|---|
0.11.1 | 2024年7月11日 |
0.11.0 | 2024年6月13日 |
0.10.0 | 2024年2月24日 |
0.0.6 |
|
#73 in 网络编程
9,409 每月下载量
用于 5 crates
1.5MB
36K SLoC
fe2o3-amqp
基于serde和tokio的AMQP 1.0协议的Rust实现。
功能标志
default = []
功能 | 描述 |
---|---|
"rustls" |
启用与tokio-rustls 和rustls 的TLS集成 |
"native-tls" |
启用与tokio-native-tls 和native-tls 的TLS集成 |
"acceptor" |
启用ConnectionAcceptor 、SessionAcceptor 和LinkAcceptor |
"transaction" |
启用Controller 、Transaction 、OwnedTransaction 和control_link_acceptor |
"scram" |
启用SCRAM认证 |
"tracing" |
启用使用tracing 进行日志记录 |
"log" |
启用使用log 进行日志记录 |
快速入门
更多示例,包括如何与Azure Service Bus一起使用,可以在GitHub仓库上找到。
客户端
以下是一个使用本地代理(TestAmqpBroker
)监听本地的示例。代理通过以下命令执行
./TestAmqpBroker.exe amqp://localhost:5672 /creds:guest:guest /queues:q1
以下代码需要在依赖关系中添加tokio
异步运行时。
use fe2o3_amqp::{Connection, Session, Sender, Receiver};
use fe2o3_amqp::types::messaging::Outcome;
#[tokio::main]
async fn main() {
let mut connection = Connection::open(
"connection-1", // container id
"amqp://guest:guest@localhost:5672" // url
).await.unwrap();
let mut session = Session::begin(&mut connection).await.unwrap();
// Create a sender
let mut sender = Sender::attach(
&mut session, // Session
"rust-sender-link-1", // link name
"q1" // target address
).await.unwrap();
// Create a receiver
let mut receiver = Receiver::attach(
&mut session,
"rust-receiver-link-1", // link name
"q1" // source address
).await.unwrap();
// Send a message to the broker and wait for outcome (Disposition)
let outcome: Outcome = sender.send("hello AMQP").await.unwrap();
outcome.accepted_or_else(|state| state).unwrap(); // Handle delivery outcome
// Send a message with batchable field set to true
let fut = sender.send_batchable("hello batchable AMQP").await.unwrap();
let outcome: Outcome = fut.await.unwrap(); // Wait for outcome (Disposition)
outcome.accepted_or_else(|state| state).unwrap(); // Handle delivery outcome
// Receive the message from the broker
let delivery = receiver.recv::<String>().await.unwrap();
receiver.accept(&delivery).await.unwrap();
sender.close().await.unwrap(); // Detach sender with closing Detach performatives
receiver.close().await.unwrap(); // Detach receiver with closing Detach performatives
session.end().await.unwrap(); // End the session
connection.close().await.unwrap(); // Close the connection
}
监听器
use tokio::net::TcpListener;
use fe2o3_amqp::acceptor::{ConnectionAcceptor, SessionAcceptor, LinkAcceptor, LinkEndpoint};
#[tokio::main]
async fn main() {
let tcp_listener = TcpListener::bind("localhost:5672").await.unwrap();
let connection_acceptor = ConnectionAcceptor::new("example-listener");
while let Ok((stream, addr)) = tcp_listener.accept().await {
let mut connection = connection_acceptor.accept(stream).await.unwrap();
let handle = tokio::spawn(async move {
let session_acceptor = SessionAcceptor::new();
while let Ok(mut session) = session_acceptor.accept(&mut connection).await{
let handle = tokio::spawn(async move {
let link_acceptor = LinkAcceptor::new();
match link_acceptor.accept(&mut session).await.unwrap() {
LinkEndpoint::Sender(sender) => { },
LinkEndpoint::Receiver(recver) => { },
}
});
}
});
}
}
WebSocket
需要fe2o3-amqp-ws
进行WebSocket绑定
use fe2o3_amqp::{
types::{messaging::Outcome, primitives::Value},
Connection, Delivery, Receiver, Sender, Session,
};
use fe2o3_amqp_ws::WebSocketStream;
#[tokio::main]
async fn main() {
let (ws_stream, _response) = WebSocketStream::connect("ws://127.0.0.1:5673")
.await
.unwrap();
let mut connection = Connection::builder()
.container_id("connection-1")
.open_with_stream(ws_stream)
.await
.unwrap();
connection.close().await.unwrap();
}
更多示例
更多发送和接收的示例可以在GitHub仓库中找到。请注意,大多数示例都需要运行本地代理。Windows上可以使用的一个代理是TestAmqpBroker。
WebAssembly支持
从“0.8.11”版本开始添加了对wasm32-unknown-unknown
目标的实验性支持,并需要使用fe2o3-amqp-ws
来与代理建立WebSocket连接。浏览器标签中发送和接收消息的示例可以在examples/wasm32-in-browser中找到。
组件
名称 | 描述 |
---|---|
serde_amqp_derive |
AMQP1.0协议中定义的类型描述的自定义派生宏 |
serde_amqp |
AMQP1.0序列化和反序列化程序以及原始类型 |
fe2o3-amqp-types |
AMQP1.0数据类型 |
fe2o3-amqp |
AMQP1.0的Connection 、Session 和Link 的实现 |
fe2o3-amqp-ext |
扩展类型和实现 |
fe2o3-amqp-ws |
为fe2o3-amqp 传输提供WebSocket绑定 |
fe2o3-amqp-management |
AMQP1.0管理的实验性实现 |
fe2o3-amqp-cbs |
AMQP1.0 CBS的实验性实现 |
支持的最低rust版本
1.75.0
许可证:MIT/Apache-2.0
依赖项
~7-22MB
~365K SLoC