14个版本
0.3.0-alpha.5 | 2024年1月2日 |
---|---|
0.3.0-alpha.2 | 2024年1月1日 |
0.2.2 | 2023年10月15日 |
0.2.1 | 2023年5月16日 |
0.1.0 | 2022年12月12日 |
#1367 in 网络编程
410KB
8K SLoC
📟MQRSTT
MQRSTT
是一个提供同步和异步(smol和tokio)实现的 MQTTv5 客户端。
由于这个crate旨在与运行时无关,因此用户需要提供自己的数据流。对于异步方法,流必须实现 smol 或 tokio 的 AsyncReadExt
和 AsyncWriteExt
特性。对于同步方法,流必须实现 std::io::Read
和 std::io::Write
特性。
特性
- MQTT v5
- 运行时无关(Smol,Tokio)
- 同步
- TLS/TCP
- 精简
- 心跳依赖于实际通信
待办事项
- 无标准库(需要大量工作以使用无堆分配并依赖于栈)
- 更多的测试
- 更多的文档
MSRV
从0.3开始,tokio和smol变体将需要MSRV:1.75,因为异步函数特性。
TCP & TLS 示例
注意
- 您的处理程序不应等待时间过长
- 在遇到错误或断开连接时创建新连接
- 处理程序仅获得传入的数据包
TLS
TLS示例在README中太大。 TLS示例。
Smol示例
use mqrstt::{
MqttClient,
ConnectOptions,
new_smol,
packets::{self, Packet},
AsyncEventHandler,
smol::NetworkStatus,
};
use bytes::Bytes;
pub struct PingPong {
pub client: MqttClient,
}
impl AsyncEventHandler for PingPong {
// Handlers only get INCOMING packets. This can change later.
async fn handle(&mut self, event: packets::Packet) -> () {
match event {
Packet::Publish(p) => {
if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {
if payload.to_lowercase().contains("ping") {
self.client
.publish(
p.topic.clone(),
p.qos,
p.retain,
Bytes::from_static(b"pong"),
)
.await
.unwrap();
println!("Received Ping, Send pong!");
}
}
},
Packet::ConnAck(_) => { println!("Connected!") },
_ => (),
}
}
}
smol::block_on(async {
let options = ConnectOptions::new("mqrsttSmolExample");
let (mut network, client) = new_smol(options);
let stream = smol::net::TcpStream::connect(("broker.emqx.io", 1883))
.await
.unwrap();
let mut pingpong = PingPong {
client: client.clone(),
};
network.connect(stream, &mut pingpong).await.unwrap();
// This subscribe is only processed when we run the network
client.subscribe("mqrstt").await.unwrap();
let (n, t) = futures::join!(
async {
loop {
return match network.poll(&mut pingpong).await {
Ok(NetworkStatus::Active) => continue,
otherwise => otherwise,
};
}
},
async {
smol::Timer::after(std::time::Duration::from_secs(30)).await;
client.disconnect().await.unwrap();
}
);
assert!(n.is_ok());
});
Tokio示例
use mqrstt::{
MqttClient,
ConnectOptions,
new_tokio,
packets::{self, Packet},
AsyncEventHandler,
tokio::NetworkStatus,
};
use tokio::time::Duration;
use bytes::Bytes;
pub struct PingPong {
pub client: MqttClient,
}
impl AsyncEventHandler for PingPong {
// Handlers only get INCOMING packets. This can change later.
async fn handle(&mut self, event: packets::Packet) -> () {
match event {
Packet::Publish(p) => {
if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {
if payload.to_lowercase().contains("ping") {
self.client
.publish(
p.topic.clone(),
p.qos,
p.retain,
Bytes::from_static(b"pong"),
)
.await
.unwrap();
println!("Received Ping, Send pong!");
}
}
},
Packet::ConnAck(_) => { println!("Connected!") },
_ => (),
}
}
}
#[tokio::main]
async fn main() {
let options = ConnectOptions::new("TokioTcpPingPongExample");
let (mut network, client) = new_tokio(options);
let stream = tokio::net::TcpStream::connect(("broker.emqx.io", 1883))
.await
.unwrap();
let mut pingpong = PingPong {
client: client.clone(),
};
network.connect(stream, &mut pingpong).await.unwrap();
client.subscribe("mqrstt").await.unwrap();
let (n, _) = tokio::join!(
async {
loop {
return match network.poll(&mut pingpong).await {
Ok(NetworkStatus::Active) => continue,
otherwise => otherwise,
};
}
},
async {
tokio::time::sleep(Duration::from_secs(30)).await;
client.disconnect().await.unwrap();
}
);
assert!(n.is_ok());
}
同步示例
use mqrstt::{
MqttClient,
ConnectOptions,
new_sync,
packets::{self, Packet},
EventHandler,
sync::NetworkStatus,
};
use std::net::TcpStream;
use bytes::Bytes;
pub struct PingPong {
pub client: MqttClient,
}
impl EventHandler for PingPong {
// Handlers only get INCOMING packets. This can change later.
fn handle(&mut self, event: packets::Packet) -> () {
match event {
Packet::Publish(p) => {
if let Ok(payload) = String::from_utf8(p.payload.to_vec()) {
if payload.to_lowercase().contains("ping") {
self.client
.publish_blocking(
p.topic.clone(),
p.qos,
p.retain,
Bytes::from_static(b"pong"),
).unwrap();
println!("Received Ping, Send pong!");
}
}
},
Packet::ConnAck(_) => { println!("Connected!") },
_ => (),
}
}
}
let mut client_id: String = "SyncTcpPingReqTestExample".to_string();
let options = ConnectOptions::new(client_id);
let address = "broker.emqx.io";
let port = 1883;
let (mut network, client) = new_sync(options);
// IMPORTANT: Set nonblocking to true! No progression will be made when stream reads block!
let stream = TcpStream::connect((address, port)).unwrap();
stream.set_nonblocking(true).unwrap();
let mut pingpong = PingPong {
client: client.clone(),
};
network.connect(stream, &mut pingpong).unwrap();
let res_join_handle = std::thread::spawn(move ||
loop {
match network.poll(&mut pingpong) {
Ok(NetworkStatus::ActivePending) => {
std::thread::sleep(std::time::Duration::from_millis(100));
},
Ok(NetworkStatus::ActiveReady) => {
std::thread::sleep(std::time::Duration::from_millis(100));
},
otherwise => return otherwise,
}
}
);
std::thread::sleep(std::time::Duration::from_secs(30));
client.disconnect_blocking().unwrap();
let join_res = res_join_handle.join();
assert!(join_res.is_ok());
let res = join_res.unwrap();
assert!(res.is_ok());
常见问题解答
- 没有太多经常被问到的问题,所以请提问! :)
- 欢迎功能请求
许可证
许可下
- Mozilla公共许可证,版本2.0, (MPL-2.0)
贡献
除非您明确声明,否则您有意提交给作品以包含在内的任何贡献,均应按照MPL-2.0许可,不附加任何其他条款或条件。
依赖关系
~7–17MB
~224K SLoC