#mqtt #iot #coap #tls-connection #pub-sub #rustls #http

spotflow-rumqttc-fork

rumqtt的临时分支,允许从发送的消息中获取数据包ID

1个不稳定版本

0.12.0 2024年6月11日

#1534网络编程


用于 spotflow

Apache-2.0

165KB
3.5K SLoC

注意:此包是从 分支 rumqtt 制作而成,增加了以下功能

  • 可以使用 native-tls 而不是 rustls 来进行TLS连接(后来已添加到原始包中)。
  • 将消息主题传播到 Outgoing::Publish,以便更容易地将确认与原始消息配对(将在 https://github.com/bytebeamio/rumqtt/issues/349 中解决)。

一旦原始包同时拥有这两个功能,此包将不再需要。我们很可能会将其移除。

以下是原始的README

rumqttc

一个纯Rust MQTT客户端,力求强大、高效且易于使用。此库由异步(tokio)事件循环支持,处理MQTT的所有强大和高效部分,但自然适合同步和异步世界,正如我们将看到的那样

让我们直接来看示例

一个简单的同步发布和订阅

use rumqttc::{MqttOptions, Client, QoS};
use std::time::Duration;
use std::thread;

let mut mqttoptions = MqttOptions::new("rumqtt-sync", "test.mosquitto.org", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));

let (mut client, mut connection) = Client::new(mqttoptions, 10);
client.subscribe("hello/rumqtt", QoS::AtMostOnce).unwrap();
thread::spawn(move || for i in 0..10 {
   client.publish("hello/rumqtt", QoS::AtLeastOnce, false, vec![i; i as usize]).unwrap();
   thread::sleep(Duration::from_millis(100));
});

// Iterate to poll the eventloop for connection progress
for (i, notification) in connection.iter().enumerate() {
    println!("Notification = {:?}", notification);
}

一个简单的异步发布和订阅

use rumqttc::{MqttOptions, AsyncClient, QoS};
use tokio::{task, time};
use std::time::Duration;
use std::error::Error;

let mut mqttoptions = MqttOptions::new("rumqtt-async", "test.mosquitto.org", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));

let (mut client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
client.subscribe("hello/rumqtt", QoS::AtMostOnce).await.unwrap();

task::spawn(async move {
    for i in 0..10 {
        client.publish("hello/rumqtt", QoS::AtLeastOnce, false, vec![i; i as usize]).await.unwrap();
        time::sleep(Duration::from_millis(100)).await;
    }
});

loop {
    let notification = eventloop.poll().await.unwrap();
    println!("Received = {:?}", notification);
}

功能快速概述

  • 事件循环并发地协调进出数据包,并处理状态
  • 在必要时ping代理,并检测客户端半开连接
  • 输出数据包的节流(待办事项)
  • 基于输出数据包队列大小的流量控制
  • 通过继续 eventloop.poll()/connection.iter() 循环来自动重连
  • 在不良网络情况下对客户端API的自然背压
  • 使用 client.cancel() 立即取消

简而言之,维护强大连接所需的一切

由于事件循环在外部轮询(使用iter()/poll()循环)且Eventloop可访问,用户可以进行以下操作:

  • 根据主题分发传入的消息
  • 在需要时停止它
  • 访问内部状态,用于优雅关闭或修改重连前的选项

重要提示

  • connection.iter()/eventloop.poll()上循环是运行事件循环和取得进展的必要条件。它会产生传入和传出活动的通知,允许您按需进行自定义。

  • connection.iter()/eventloop.poll()循环内阻塞将阻止连接进展。

常见问题解答

Connecting to a broker using raw ip doesn't work

您不能使用自签名证书创建到裸IP地址的TLS连接。这是rustls的限制。一种解决方案是在*nix/BSD-like系统下,将裸IP地址添加到DNS解析器查找的位置(例如/etc/hosts),然后在代码中使用该名称。

许可证:Apache-2.0

依赖关系

~3–17MB
~234K SLoC