117个版本 (12个重大更新)

0.12.0 2024年8月7日
0.11.1 2024年7月11日
0.11.0 2024年6月13日
0.10.0 2024年2月24日
0.0.6 2022年3月16日

#73 in 网络编程

Download history 3057/week @ 2024-05-03 4166/week @ 2024-05-10 3471/week @ 2024-05-17 2580/week @ 2024-05-24 2357/week @ 2024-05-31 2430/week @ 2024-06-07 2410/week @ 2024-06-14 3456/week @ 2024-06-21 1903/week @ 2024-06-28 2194/week @ 2024-07-05 1222/week @ 2024-07-12 1225/week @ 2024-07-19 1992/week @ 2024-07-26 2501/week @ 2024-08-02 2121/week @ 2024-08-09 2551/week @ 2024-08-16

9,409 每月下载量
用于 5 crates

MIT/Apache

1.5MB
36K SLoC

fe2o3-amqp

基于serde和tokio的AMQP 1.0协议的Rust实现。

crate_version docs_version discord

功能标志

default = []
功能 描述
"rustls" 启用与tokio-rustlsrustls的TLS集成
"native-tls" 启用与tokio-native-tlsnative-tls的TLS集成
"acceptor" 启用ConnectionAcceptorSessionAcceptorLinkAcceptor
"transaction" 启用ControllerTransactionOwnedTransactioncontrol_link_acceptor
"scram" 启用SCRAM认证
"tracing" 启用使用tracing进行日志记录
"log" 启用使用log进行日志记录

快速入门

  1. 客户端
  2. 监听器
  3. WebSocket绑定

更多示例,包括如何与Azure Service Bus一起使用,可以在GitHub仓库上找到。

客户端

以下是一个使用本地代理(TestAmqpBroker)监听本地的示例。代理通过以下命令执行

./TestAmqpBroker.exe amqp://localhost:5672 /creds:guest:guest /queues:q1

以下代码需要在依赖关系中添加tokio异步运行时。

use fe2o3_amqp::{Connection, Session, Sender, Receiver};
use fe2o3_amqp::types::messaging::Outcome;

#[tokio::main]
async fn main() {
    let mut connection = Connection::open(
        "connection-1",                     // container id
        "amqp://guest:guest@localhost:5672" // url
    ).await.unwrap();

    let mut session = Session::begin(&mut connection).await.unwrap();

    // Create a sender
    let mut sender = Sender::attach(
        &mut session,           // Session
        "rust-sender-link-1",   // link name
        "q1"                    // target address
    ).await.unwrap();

    // Create a receiver
    let mut receiver = Receiver::attach(
        &mut session,
        "rust-receiver-link-1", // link name
        "q1"                    // source address
    ).await.unwrap();

    // Send a message to the broker and wait for outcome (Disposition)
    let outcome: Outcome = sender.send("hello AMQP").await.unwrap();
    outcome.accepted_or_else(|state| state).unwrap(); // Handle delivery outcome

    // Send a message with batchable field set to true
    let fut = sender.send_batchable("hello batchable AMQP").await.unwrap();
    let outcome: Outcome = fut.await.unwrap(); // Wait for outcome (Disposition)
    outcome.accepted_or_else(|state| state).unwrap(); // Handle delivery outcome

    // Receive the message from the broker
    let delivery = receiver.recv::<String>().await.unwrap();
    receiver.accept(&delivery).await.unwrap();

    sender.close().await.unwrap(); // Detach sender with closing Detach performatives
    receiver.close().await.unwrap(); // Detach receiver with closing Detach performatives
    session.end().await.unwrap(); // End the session
    connection.close().await.unwrap(); // Close the connection
}

监听器

use tokio::net::TcpListener;
use fe2o3_amqp::acceptor::{ConnectionAcceptor, SessionAcceptor, LinkAcceptor, LinkEndpoint};

#[tokio::main]
async fn main() {
    let tcp_listener = TcpListener::bind("localhost:5672").await.unwrap();
    let connection_acceptor = ConnectionAcceptor::new("example-listener");

    while let Ok((stream, addr)) = tcp_listener.accept().await {
        let mut connection = connection_acceptor.accept(stream).await.unwrap();
        let handle = tokio::spawn(async move {
            let session_acceptor = SessionAcceptor::new();
            while let Ok(mut session) = session_acceptor.accept(&mut connection).await{
                let handle = tokio::spawn(async move {
                    let link_acceptor = LinkAcceptor::new();
                    match link_acceptor.accept(&mut session).await.unwrap() {
                        LinkEndpoint::Sender(sender) => { },
                        LinkEndpoint::Receiver(recver) => { },
                    }
                });
            }
        });
    }
}

WebSocket

需要fe2o3-amqp-ws进行WebSocket绑定

use fe2o3_amqp::{
    types::{messaging::Outcome, primitives::Value},
    Connection, Delivery, Receiver, Sender, Session,
};
use fe2o3_amqp_ws::WebSocketStream;

#[tokio::main]
async fn main() {
    let (ws_stream, _response) = WebSocketStream::connect("ws://127.0.0.1:5673")
        .await
        .unwrap();
    let mut connection = Connection::builder()
        .container_id("connection-1")
        .open_with_stream(ws_stream)
        .await
        .unwrap();

    connection.close().await.unwrap();
}

更多示例

更多发送和接收的示例可以在GitHub仓库中找到。请注意,大多数示例都需要运行本地代理。Windows上可以使用的一个代理是TestAmqpBroker

WebAssembly支持

从“0.8.11”版本开始添加了对wasm32-unknown-unknown目标的实验性支持,并需要使用fe2o3-amqp-ws来与代理建立WebSocket连接。浏览器标签中发送和接收消息的示例可以在examples/wasm32-in-browser中找到。

组件

名称 描述
serde_amqp_derive AMQP1.0协议中定义的类型描述的自定义派生宏
serde_amqp AMQP1.0序列化和反序列化程序以及原始类型
fe2o3-amqp-types AMQP1.0数据类型
fe2o3-amqp AMQP1.0的ConnectionSessionLink的实现
fe2o3-amqp-ext 扩展类型和实现
fe2o3-amqp-ws fe2o3-amqp传输提供WebSocket绑定
fe2o3-amqp-management AMQP1.0管理的实验性实现
fe2o3-amqp-cbs AMQP1.0 CBS的实验性实现

支持的最低rust版本

1.75.0

许可证:MIT/Apache-2.0

依赖项

~7-22MB
~365K SLoC