1个不稳定版本
0.12.0 | 2024年6月11日 |
---|
#1534 在 网络编程
用于 spotflow
165KB
3.5K SLoC
注意:此包是从 分支 rumqtt
制作而成,增加了以下功能
- 可以使用
native-tls
而不是rustls
来进行TLS连接(后来已添加到原始包中)。 - 将消息主题传播到
Outgoing::Publish
,以便更容易地将确认与原始消息配对(将在 https://github.com/bytebeamio/rumqtt/issues/349 中解决)。
一旦原始包同时拥有这两个功能,此包将不再需要。我们很可能会将其移除。
以下是原始的README
rumqttc
一个纯Rust MQTT客户端,力求强大、高效且易于使用。此库由异步(tokio)事件循环支持,处理MQTT的所有强大和高效部分,但自然适合同步和异步世界,正如我们将看到的那样
让我们直接来看示例
一个简单的同步发布和订阅
use rumqttc::{MqttOptions, Client, QoS};
use std::time::Duration;
use std::thread;
let mut mqttoptions = MqttOptions::new("rumqtt-sync", "test.mosquitto.org", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));
let (mut client, mut connection) = Client::new(mqttoptions, 10);
client.subscribe("hello/rumqtt", QoS::AtMostOnce).unwrap();
thread::spawn(move || for i in 0..10 {
client.publish("hello/rumqtt", QoS::AtLeastOnce, false, vec![i; i as usize]).unwrap();
thread::sleep(Duration::from_millis(100));
});
// Iterate to poll the eventloop for connection progress
for (i, notification) in connection.iter().enumerate() {
println!("Notification = {:?}", notification);
}
一个简单的异步发布和订阅
use rumqttc::{MqttOptions, AsyncClient, QoS};
use tokio::{task, time};
use std::time::Duration;
use std::error::Error;
let mut mqttoptions = MqttOptions::new("rumqtt-async", "test.mosquitto.org", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));
let (mut client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
client.subscribe("hello/rumqtt", QoS::AtMostOnce).await.unwrap();
task::spawn(async move {
for i in 0..10 {
client.publish("hello/rumqtt", QoS::AtLeastOnce, false, vec![i; i as usize]).await.unwrap();
time::sleep(Duration::from_millis(100)).await;
}
});
loop {
let notification = eventloop.poll().await.unwrap();
println!("Received = {:?}", notification);
}
功能快速概述
- 事件循环并发地协调进出数据包,并处理状态
- 在必要时ping代理,并检测客户端半开连接
- 输出数据包的节流(待办事项)
- 基于输出数据包队列大小的流量控制
- 通过继续
eventloop.poll()/connection.iter()
循环来自动重连 - 在不良网络情况下对客户端API的自然背压
- 使用
client.cancel()
立即取消
简而言之,维护强大连接所需的一切
由于事件循环在外部轮询(使用iter()/poll()
循环)且Eventloop
可访问,用户可以进行以下操作:
- 根据主题分发传入的消息
- 在需要时停止它
- 访问内部状态,用于优雅关闭或修改重连前的选项
重要提示
-
在
connection.iter()
/eventloop.poll()
上循环是运行事件循环和取得进展的必要条件。它会产生传入和传出活动的通知,允许您按需进行自定义。 -
在
connection.iter()
/eventloop.poll()
循环内阻塞将阻止连接进展。
常见问题解答
Connecting to a broker using raw ip doesn't work
您不能使用自签名证书创建到裸IP地址的TLS连接。这是rustls的限制。一种解决方案是在*nix/BSD-like系统下,将裸IP地址添加到DNS解析器查找的位置(例如/etc/hosts
),然后在代码中使用该名称。
许可证:Apache-2.0
依赖关系
~3–17MB
~234K SLoC