#mqtt #iot #coap #mqtt-client #http #pub-sub

rumqttc-async-std

适用于您连接设备的有效且稳健的 MQTT 客户端

1 个不稳定版本

0.5.0 2021年2月7日

#37#mqtt-client

Apache-2.0

84KB
1.5K SLoC

rumqttc-async-std

rumqttc 库的一个分支,可在 此处 找到,改为使用 async-std 而不是 tokio 异步运行时。

一个纯 Rust MQTT 客户端,力求稳健、高效且易于使用。此库由一个异步(async-std)事件循环支持,处理 MQTT 的所有稳健性和效率部分,但自然地适合同步和异步世界,正如我们将看到的那样

让我们直接进入示例

一个简单的同步发布和订阅

use rumqttc::{MqttOptions, Client, QoS};
use std::time::Duration;
use std::thread;

let mut mqttoptions = MqttOptions::new("rumqtt-sync", "test.mosquitto.org", 1883);
mqttoptions.set_keep_alive(5);

let (mut client, mut connection) = Client::new(mqttoptions, 10);
client.subscribe("hello/rumqtt", QoS::AtMostOnce).unwrap();
thread::spawn(move || for i in 0..10 {
   client.publish("hello/rumqtt", QoS::AtLeastOnce, false, vec![i; i as usize]).unwrap();
   thread::sleep(Duration::from_millis(100));
});

// Iterate to poll the eventloop for connection progress
for (i, notification) in connection.iter().enumerate() {
    println!("Notification = {:?}", notification);
}

一个简单的异步发布和订阅

use rumqttc::{MqttOptions, AsyncClient, QoS};
use async_std::task;
use std::time::Duration;
use std::error::Error;

let mut mqttoptions = MqttOptions::new("rumqtt-async", "test.mosquitto.org", 1883);
mqttoptions.set_keep_alive(5);

let (mut client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
client.subscribe("hello/rumqtt", QoS::AtMostOnce).await.unwrap();

task::spawn(async move {
    for i in 0..10 {
        client.publish("hello/rumqtt", QoS::AtLeastOnce, false, vec![i; i as usize]).await.unwrap();
        task::sleep(Duration::from_millis(100)).await;
    }
});

loop {
    let notification = eventloop.poll().await.unwrap();
    println!("Received = {:?}", notification);
}

功能快速概述

  • 事件循环并发地编排出/入包,并处理状态
  • 在必要时ping代理,并检测客户端半开连接
  • 出包节流(待定)
  • 基于队列大小的出包流控制
  • 通过仅继续 eventloop.poll()/connection.iter() 循环来自动重新连接
  • 在网络状况不佳时对客户端 API 的自然背压
  • 使用 client.cancel() 立即取消

简而言之,维护稳健连接所需的一切

由于事件循环在外部轮询(使用 iter()/poll() 在循环中)并在库和 Eventloop 外部可访问,因此用户可以

  • 根据主题分发传入的消息
  • 在需要时停止它
  • 访问内部状态,用于诸如优雅关闭或重新连接前修改选项之类的用例

重要注意事项

  • connection.iter()/eventloop.poll() 上循环是运行事件循环并取得进展所必需的。它产生传入和传出活动通知,允许您进行适当的自定义。

  • connection.iter()/eventloop.poll() 循环中阻塞将阻止连接进程。

常见问题解答

Connecting to a broker using raw ip doesn't work

您无法使用自签名证书创建到裸IP地址的TLS连接。这是 rustls 的限制。一种解决方案是在 *nix/BSD-like 系统下,将裸IP地址添加到您的DNS解析器查找的地方(例如,/etc/hosts),并在代码中使用该名称。

许可证:Apache-2.0

依赖项

~13–28MB
~493K SLoC