33个版本 (破坏性更新)
0.24.0 | 2024年2月27日 |
---|---|
0.23.0 | 2023年10月10日 |
0.22.0 | 2023年6月7日 |
0.20.0 | 2023年1月17日 |
0.0.5 | 2020年7月26日 |
#19 in 网络编程
每月下载量 87,908
在79个crate中使用(57直接使用)
450KB
10K SLoC
rumqttc
一个力求稳健、高效且易于使用的纯Rust MQTT客户端。该库由异步(使用tokio)事件循环支持,使用户能够根据代理发送和接收MQTT消息。
示例
简单的同步发布和订阅
use rumqttc::{MqttOptions, Client, QoS};
use std::time::Duration;
use std::thread;
let mut mqttoptions = MqttOptions::new("rumqtt-sync", "test.mosquitto.org", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));
let (mut client, mut connection) = Client::new(mqttoptions, 10);
client.subscribe("hello/rumqtt", QoS::AtMostOnce).unwrap();
thread::spawn(move || for i in 0..10 {
client.publish("hello/rumqtt", QoS::AtLeastOnce, false, vec![i; i as usize]).unwrap();
thread::sleep(Duration::from_millis(100));
});
// Iterate to poll the eventloop for connection progress
for (i, notification) in connection.iter().enumerate() {
println!("Notification = {:?}", notification);
}
简单的异步发布和订阅
use rumqttc::{MqttOptions, AsyncClient, QoS};
use tokio::{task, time};
use std::time::Duration;
use std::error::Error;
let mut mqttoptions = MqttOptions::new("rumqtt-async", "test.mosquitto.org", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));
let (mut client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
client.subscribe("hello/rumqtt", QoS::AtMostOnce).await.unwrap();
task::spawn(async move {
for i in 0..10 {
client.publish("hello/rumqtt", QoS::AtLeastOnce, false, vec![i; i as usize]).await.unwrap();
time::sleep(Duration::from_millis(100)).await;
}
});
while let Ok(notification) = eventloop.poll().await {
println!("Received = {:?}", notification);
}
功能快速概览
- 事件循环并发地协调出入包,并处理状态
- 在必要时ping代理,并检测客户端半开连接
- 出包节流(待办事项)
- 基于队列大小的出包流控
- 通过继续
eventloop.poll()/connection.iter()
循环实现自动重连 - 在网络状况不佳时,对客户端API提供自然背压
- 支持WebSocket
- 使用TLS进行安全传输
简而言之,维护稳健连接所需的一切
由于事件循环在外部轮询(在库和Eventloop
外部循环中)并且Eventloop
可访问,用户可以
- 根据主题分发传入消息
- 在需要时停止它
- 访问内部状态,用于优雅关闭或修改重连前的选项等用例
重要注意事项
-
在
connection.iter()
/eventloop.poll()
上循环是运行事件循环和取得进展的必要条件。它产生进来的和出去的活动通知,允许您按需定制。 -
在
connection.iter()
/eventloop.poll()
循环内阻塞将会阻止连接进展。
依赖项
~3–16MB
~219K SLoC