#mqtt-client #mqtt #iot #coap #pub-sub #iot-devices #send-receive

rumqttc

为您的连接设备提供高效且稳健的MQTT客户端

33个版本 (破坏性更新)

0.24.0 2024年2月27日
0.23.0 2023年10月10日
0.22.0 2023年6月7日
0.20.0 2023年1月17日
0.0.5 2020年7月26日

#19 in 网络编程

Download history 28786/week @ 2024-04-23 23825/week @ 2024-04-30 20753/week @ 2024-05-07 24783/week @ 2024-05-14 22280/week @ 2024-05-21 19487/week @ 2024-05-28 22918/week @ 2024-06-04 19316/week @ 2024-06-11 17666/week @ 2024-06-18 19157/week @ 2024-06-25 15790/week @ 2024-07-02 20274/week @ 2024-07-09 20955/week @ 2024-07-16 21687/week @ 2024-07-23 21994/week @ 2024-07-30 20153/week @ 2024-08-06

每月下载量 87,908
79个crate中使用(57直接使用)

Apache-2.0

450KB
10K SLoC

rumqttc

crates.io page docs.rs page

一个力求稳健、高效且易于使用的纯Rust MQTT客户端。该库由异步(使用tokio)事件循环支持,使用户能够根据代理发送和接收MQTT消息。

示例

简单的同步发布和订阅

use rumqttc::{MqttOptions, Client, QoS};
use std::time::Duration;
use std::thread;

let mut mqttoptions = MqttOptions::new("rumqtt-sync", "test.mosquitto.org", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));

let (mut client, mut connection) = Client::new(mqttoptions, 10);
client.subscribe("hello/rumqtt", QoS::AtMostOnce).unwrap();
thread::spawn(move || for i in 0..10 {
   client.publish("hello/rumqtt", QoS::AtLeastOnce, false, vec![i; i as usize]).unwrap();
   thread::sleep(Duration::from_millis(100));
});

// Iterate to poll the eventloop for connection progress
for (i, notification) in connection.iter().enumerate() {
    println!("Notification = {:?}", notification);
}

简单的异步发布和订阅

use rumqttc::{MqttOptions, AsyncClient, QoS};
use tokio::{task, time};
use std::time::Duration;
use std::error::Error;

let mut mqttoptions = MqttOptions::new("rumqtt-async", "test.mosquitto.org", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));

let (mut client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
client.subscribe("hello/rumqtt", QoS::AtMostOnce).await.unwrap();

task::spawn(async move {
    for i in 0..10 {
        client.publish("hello/rumqtt", QoS::AtLeastOnce, false, vec![i; i as usize]).await.unwrap();
        time::sleep(Duration::from_millis(100)).await;
    }
});

while let Ok(notification) = eventloop.poll().await {
    println!("Received = {:?}", notification);
}

功能快速概览

  • 事件循环并发地协调出入包,并处理状态
  • 在必要时ping代理,并检测客户端半开连接
  • 出包节流(待办事项)
  • 基于队列大小的出包流控
  • 通过继续eventloop.poll()/connection.iter()循环实现自动重连
  • 在网络状况不佳时,对客户端API提供自然背压
  • 支持WebSocket
  • 使用TLS进行安全传输

简而言之,维护稳健连接所需的一切

由于事件循环在外部轮询(在库和Eventloop外部循环中)并且Eventloop可访问,用户可以

  • 根据主题分发传入消息
  • 在需要时停止它
  • 访问内部状态,用于优雅关闭或修改重连前的选项等用例

重要注意事项

  • connection.iter()/eventloop.poll()上循环是运行事件循环和取得进展的必要条件。它产生进来的和出去的活动通知,允许您按需定制。

  • connection.iter()/eventloop.poll()循环内阻塞将会阻止连接进展。

依赖项

~3–16MB
~219K SLoC