14个版本

0.3.0-alpha.52024年1月2日
0.3.0-alpha.22024年1月1日
0.2.2 2023年10月15日
0.2.1 2023年5月16日
0.1.0 2022年12月12日

#1367 in 网络编程

MPL-2.0 许可证

410KB
8K SLoC

📟MQRSTT

Crates.io Docs dependency status codecov

MQRSTT 是一个提供同步和异步(smol和tokio)实现的 MQTTv5 客户端。

由于这个crate旨在与运行时无关,因此用户需要提供自己的数据流。对于异步方法,流必须实现 smol 或 tokio 的 AsyncReadExtAsyncWriteExt 特性。对于同步方法,流必须实现 std::io::Readstd::io::Write 特性。

特性

  • MQTT v5
  • 运行时无关(Smol,Tokio)
  • 同步
  • TLS/TCP
  • 精简
  • 心跳依赖于实际通信

待办事项

  • 无标准库(需要大量工作以使用无堆分配并依赖于栈)
  • 更多的测试
  • 更多的文档

MSRV

从0.3开始,tokio和smol变体将需要MSRV:1.75,因为异步函数特性。

TCP & TLS 示例

注意

  • 您的处理程序不应等待时间过长
  • 在遇到错误或断开连接时创建新连接
  • 处理程序仅获得传入的数据包

TLS

TLS示例在README中太大。 TLS示例

Smol示例

use mqrstt::{
    MqttClient,
    ConnectOptions,
    new_smol,
    packets::{self, Packet},
    AsyncEventHandler,
    smol::NetworkStatus,
};
use bytes::Bytes;
pub struct PingPong {
    pub client: MqttClient,
}
impl AsyncEventHandler for PingPong {
    // Handlers only get INCOMING packets. This can change later.
    async fn handle(&mut self, event: packets::Packet) -> () {
        match event {
            Packet::Publish(p) => {
                if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {
                    if payload.to_lowercase().contains("ping") {
                        self.client
                            .publish(
                                p.topic.clone(),
                                p.qos,
                                p.retain,
                                Bytes::from_static(b"pong"),
                            )
                            .await
                            .unwrap();
                        println!("Received Ping, Send pong!");
                    }
                }
            },
            Packet::ConnAck(_) => { println!("Connected!") },
            _ => (),
        }
    }
}
smol::block_on(async {
    let options = ConnectOptions::new("mqrsttSmolExample");
    let (mut network, client) = new_smol(options);
    let stream = smol::net::TcpStream::connect(("broker.emqx.io", 1883))
        .await
        .unwrap();
    
    let mut pingpong = PingPong {
        client: client.clone(),
    };

    network.connect(stream, &mut pingpong).await.unwrap();

    // This subscribe is only processed when we run the network
    client.subscribe("mqrstt").await.unwrap();

    let (n, t) = futures::join!(
        async {
            loop {
                return match network.poll(&mut pingpong).await {
                    Ok(NetworkStatus::Active) => continue,
                    otherwise => otherwise,
                };
            }
        },
        async {
            smol::Timer::after(std::time::Duration::from_secs(30)).await;
            client.disconnect().await.unwrap();
        }
    );
    assert!(n.is_ok());
});

Tokio示例

use mqrstt::{
    MqttClient,
    ConnectOptions,
    new_tokio,
    packets::{self, Packet},
    AsyncEventHandler,
    tokio::NetworkStatus,
};
use tokio::time::Duration;
use bytes::Bytes;

pub struct PingPong {
    pub client: MqttClient,
}
impl AsyncEventHandler for PingPong {
    // Handlers only get INCOMING packets. This can change later.
    async fn handle(&mut self, event: packets::Packet) -> () {
        match event {
            Packet::Publish(p) => {
                if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {
                    if payload.to_lowercase().contains("ping") {
                        self.client
                            .publish(
                                p.topic.clone(),
                                p.qos,
                                p.retain,
                                Bytes::from_static(b"pong"),
                            )
                            .await
                            .unwrap();
                        println!("Received Ping, Send pong!");
                    }
                }
            },
            Packet::ConnAck(_) => { println!("Connected!") },
            _ => (),
        }
    }
}

#[tokio::main]
async fn main() {
    let options = ConnectOptions::new("TokioTcpPingPongExample");
    
    let (mut network, client) = new_tokio(options);
    
    let stream = tokio::net::TcpStream::connect(("broker.emqx.io", 1883))
        .await
        .unwrap();

    let mut pingpong = PingPong {
        client: client.clone(),
    };
    
    network.connect(stream, &mut pingpong).await.unwrap();
    
    client.subscribe("mqrstt").await.unwrap();
    

    let (n, _) = tokio::join!(
        async {
            loop {
                return match network.poll(&mut pingpong).await {
                    Ok(NetworkStatus::Active) => continue,
                    otherwise => otherwise,
                };
            }
        },
        async {
            tokio::time::sleep(Duration::from_secs(30)).await;
            client.disconnect().await.unwrap();
        }
    );
    assert!(n.is_ok());
}

同步示例

use mqrstt::{
    MqttClient,
    ConnectOptions,
    new_sync,
    packets::{self, Packet},
    EventHandler,
    sync::NetworkStatus,
};
use std::net::TcpStream;
use bytes::Bytes;

pub struct PingPong {
    pub client: MqttClient,
}

impl EventHandler for PingPong {
    // Handlers only get INCOMING packets. This can change later.
    fn handle(&mut self, event: packets::Packet) -> () {
        match event {
            Packet::Publish(p) => {
                if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {
                    if payload.to_lowercase().contains("ping") {
                        self.client
                            .publish_blocking(
                                p.topic.clone(),
                                p.qos,
                                p.retain,
                                Bytes::from_static(b"pong"),
                            ).unwrap();
                        println!("Received Ping, Send pong!");
                    }
                }
            },
            Packet::ConnAck(_) => { println!("Connected!") },
            _ => (),
        }
    }
}


let mut client_id: String = "SyncTcpPingReqTestExample".to_string();
let options = ConnectOptions::new(client_id);

let address = "broker.emqx.io";
let port = 1883;

let (mut network, client) = new_sync(options);

// IMPORTANT: Set nonblocking to true! No progression will be made when stream reads block!
let stream = TcpStream::connect((address, port)).unwrap();
stream.set_nonblocking(true).unwrap();

let mut pingpong = PingPong {
    client: client.clone(),
};

network.connect(stream, &mut pingpong).unwrap();

let res_join_handle = std::thread::spawn(move ||
    loop {
        match network.poll(&mut pingpong) {
            Ok(NetworkStatus::ActivePending) => {
                std::thread::sleep(std::time::Duration::from_millis(100));
            },
            Ok(NetworkStatus::ActiveReady) => {
                std::thread::sleep(std::time::Duration::from_millis(100));
            },
            otherwise => return otherwise,
        }
    }
);

std::thread::sleep(std::time::Duration::from_secs(30));
client.disconnect_blocking().unwrap();
let join_res = res_join_handle.join();
assert!(join_res.is_ok());
let res = join_res.unwrap();
assert!(res.is_ok());

常见问题解答

  • 没有太多经常被问到的问题,所以请提问! :)
  • 欢迎功能请求

许可证

许可下

  • Mozilla公共许可证,版本2.0, (MPL-2.0)

贡献

除非您明确声明,否则您有意提交给作品以包含在内的任何贡献,均应按照MPL-2.0许可,不附加任何其他条款或条件。

依赖关系

~7–17MB
~224K SLoC