5个不稳定版本
0.3.1 | 2024年5月10日 |
---|---|
0.3.0 | 2023年3月29日 |
0.2.0 | 2023年1月29日 |
0.1.1 | 2023年1月24日 |
0.1.0 | 2023年1月22日 |
#107 in 异步
298 每月下载量
325KB
8K SLoC
poster-rs 📬
MQTT5客户端库
Poster-rs是一个异步、运行时无关、零拷贝的MQTT 5库,设计时考虑了操作局部性。
特性
- MQTTv5
- 运行时无关
- 零拷贝
- 每个订阅的异步流
- 无不安全代码
文档
入门
首先,选择您的异步运行时。准备好了吗?让我们开始吧!
在下面的示例中,我们将使用Tokio。
use std::error::Error;
use poster::{prelude::*, ConnectOpts, Context};
use tokio::net::TcpStream;
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn Error>> {
let (mut ctx, mut handle) = Context::new();
let ctx_task = tokio::spawn(async move {
// Set up a connection using your async framework of choice. We will need a read end, which is
// AsyncRead, and write end, which is AsyncWrite, so we split the TcpStream
// into ReadHalf and WriteHalf pair.
let (rx, tx) = TcpStream::connect("127.0.0.1:1883").await?.into_split();
// Pass (ReadHalf, WriteHalf) pair into the context and connect with the broker on
// the protocol level.
ctx.set_up((rx.compat(), tx.compat_write())).connect(ConnectOpts::default()).await?;
// Awaiting the Context::run invocation will block the current task.
if let Err(err) = ctx.run().await {
eprintln!("[context] Error occured: \"{}\", exiting...", err);
} else {
println!("[context] Context exited.");
}
Ok::<(), Box<dyn Error + Send + Sync>>(())
});
/* ... */
ctx_task.await?;
Ok(())
}
此时,我们的 上下文 已经启动并运行。
让我们分析上述示例。 poster-rs
是一个运行时无关的库,这意味着所有异步操作都是使用 futures-rs
crate 的 traits 抽象的。这种方法的最终结果是连接到代理必须手动建立,库只关心在上下文创建期间接收 (AsyncRead, AsyncWrite) 对。这个对通常是通过网络库中流/套接字的某种 split
函数获得的。 (参见 tokio, smol)
new
工厂方法为我们提供 (Context, ContextHandle) 元组。 Context 负责处理客户端与服务器之间的流量。然而,ContextHandle 是 Context 实例的克隆句柄,用于执行所有 MQTT 操作。
方法 run 会阻塞任务(在 .await 上)直到满足以下条件之一
- 执行优雅断开(使用 ContextHandle::disconnect 方法)。然后结果是 ()。
- 发生错误,导致 MqttError。这可能是由于套接字关闭、从服务器接收 DISCONNECT 等原因。
发布
发布是通过 ContextHandle::publish 方法实现的。
// ...
let opts = PublishOpts::default().topic_name("topic").payload("hello there".as_bytes());
handle.publish(opts).await?;
参见 PublishOpts。
订阅
订阅表示为异步流,通过 SubscribeRsp::stream 方法获取。订阅的一般步骤是
- 等待调用 ContextHandle::subscribe 方法
- 验证结果(可选)
- 使用 stream 方法创建订阅的流。
注意,在底层,库使用订阅标识符来分组订阅。参见 SubscribeOpts。
// ...
let opts = SubscribeOpts::default().subscription("topic", SubscriptionOpts::default());
let rsp = handle.subscribe(opts).await?;
let mut subscription = rsp.stream();
while let Some(msg) = subscription.next().await {
println!("topic: {}; payload: {}", msg.topic_name(), str::from_utf8(msg.payload()).unwrap());
}
用户可以在一个订阅请求中订阅多个主题。
// ...
let opts = SubscribeOpts::default()
.subscription("topic1", SubscriptionOpts::default())
.subscription("topic2", SubscriptionOpts::default());
let mut subscription = handle.subscribe(opts).await?.stream();
while let Some(msg) = subscription.next().await {
println!("topic: {}; payload: {}", msg.topic_name(), str::from_utf8(msg.payload()).unwrap());
}
可以使用 SubscriptionOpts 对每个订阅进行自定义。
let opts = SubscribeOpts::default()
.subscription("topic", SubscriptionOpts::new().maximum_qos(QoS::AtLeastOnce));
SubscribeRsp 结构体表示订阅请求的结果。为了访问每个主题的原因码,使用 SubscribeRsp::payload 方法
// ...
let rsp = handle.subscribe(opts).await?;
let all_ok = rsp.payload().iter().copied().all(|reason| reason == SubackReason::GranteedQoS2);
取消订阅
取消订阅是通过 ContextHandle::unsubscribe 方法实现的。请注意,它不会关闭订阅流(可能会导致逻辑错误)。
// ...
let opts = UnsubscribeOpts::default().topic_name("topic");
let rsp = handle.unsubscribe(opts).await?;
与订阅一样,可以通过 UnsubscribeRsp::payload 方法获取每个主题的原因码。参见 UnsubscribeOpts。
心跳和ping
如果在连接请求期间设置了 ConnectOpts::keep_alive 间隔,则用户必须定期使用 ContextHandle::ping 方法。
断开连接
断开连接可能由用户或代理程序发起。当由代理程序发起时,Context::run 方法返回 error::Disconnected 错误。
用户也可以通过使用 ContextHandle::disconnect 方法来执行优雅断开。当断开连接完成后,Context::run 方法返回 ()。
// ...
handle.disconnect(DisconnectOpts::default()).await?;
参见 DisconnectOpts。
错误处理
主要库错误类型是位于error::MqttError枚举,在error模块中。
TLS/SSL
存在带有AsyncRead,AsyncWrite TLS/SSL流的TLS/SSL库。这些可以提供给Context::set_up方法。库本身不处理加密。
依赖
Poster-rs依赖于以下crate
- futures - 启用与运行时无关的API
- bytes - 原始数据和缓冲区管理
- either - 处理两种不同类型“联合”的实用工具
- derive_builder - 无代码膨胀地实现Builder模式
许可证
版权所有 © 2023 Borys Chyliński
以下条件下,任何获得此软件及其相关文档副本(“软件”)的人均可免费使用软件,包括但不限于使用、复制、修改、合并、发布、分发、再许可和/或销售软件副本,并允许向提供软件的人使用上述权利,前提如下
上述版权声明和本许可声明应包含在软件的所有副本或主要部分中。
软件按“原样”提供,不提供任何明示或暗示的保证,包括但不限于适销性、适用于特定目的和不侵犯专利权。在任何情况下,作者或版权所有者均不对任何索赔、损害或其他责任负责,无论是基于合同、侵权或其他原因,无论是因软件或其使用或其它方式导致的。
作者
依赖
~1.6–2.3MB
~45K SLoC