5个不稳定版本

0.3.1 2024年5月10日
0.3.0 2023年3月29日
0.2.0 2023年1月29日
0.1.1 2023年1月24日
0.1.0 2023年1月22日

#107 in 异步

Download history 141/week @ 2024-05-07 11/week @ 2024-05-14 10/week @ 2024-05-21 22/week @ 2024-05-28

298 每月下载量

MIT 协议

325KB
8K SLoC

poster-rs 📬

build

MQTT5客户端库

Poster-rs是一个异步、运行时无关、零拷贝的MQTT 5库,设计时考虑了操作局部性。

特性

  • MQTTv5
  • 运行时无关
  • 零拷贝
  • 每个订阅的异步流
  • 无不安全代码

文档

在这里。

入门

首先,选择您的异步运行时。准备好了吗?让我们开始吧!

在下面的示例中,我们将使用Tokio。

use std::error::Error;
use poster::{prelude::*, ConnectOpts, Context};
use tokio::net::TcpStream;
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn Error>> {
    let (mut ctx, mut handle) = Context::new();

    let ctx_task = tokio::spawn(async move {
        // Set up a connection using your async framework of choice. We will need a read end, which is
        // AsyncRead, and write end, which is AsyncWrite, so we split the TcpStream
        // into ReadHalf and WriteHalf pair.
        let (rx, tx) = TcpStream::connect("127.0.0.1:1883").await?.into_split();

        // Pass (ReadHalf, WriteHalf) pair into the context and connect with the broker on
        // the protocol level.
        ctx.set_up((rx.compat(), tx.compat_write())).connect(ConnectOpts::default()).await?;

        // Awaiting the Context::run invocation will block the current task.
        if let Err(err) = ctx.run().await {
            eprintln!("[context] Error occured: \"{}\", exiting...", err);
        } else {
            println!("[context] Context exited.");
        }

         Ok::<(), Box<dyn Error + Send + Sync>>(())
    });

    /* ... */

    ctx_task.await?;
    Ok(())
}

此时,我们的 上下文 已经启动并运行。

让我们分析上述示例。 poster-rs 是一个运行时无关的库,这意味着所有异步操作都是使用 futures-rs crate 的 traits 抽象的。这种方法的最终结果是连接到代理必须手动建立,库只关心在上下文创建期间接收 (AsyncRead, AsyncWrite) 对。这个对通常是通过网络库中流/套接字的某种 split 函数获得的。 (参见 tokio, smol)

new 工厂方法为我们提供 (Context, ContextHandle) 元组。 Context 负责处理客户端与服务器之间的流量。然而,ContextHandleContext 实例的克隆句柄,用于执行所有 MQTT 操作。

方法 run 会阻塞任务(在 .await 上)直到满足以下条件之一

  1. 执行优雅断开(使用 ContextHandle::disconnect 方法)。然后结果是 ()。
  2. 发生错误,导致 MqttError。这可能是由于套接字关闭、从服务器接收 DISCONNECT 等原因。

发布

发布是通过 ContextHandle::publish 方法实现的。

// ...
let opts = PublishOpts::default().topic_name("topic").payload("hello there".as_bytes());
handle.publish(opts).await?;

参见 PublishOpts

订阅

订阅表示为异步流,通过 SubscribeRsp::stream 方法获取。订阅的一般步骤是

注意,在底层,库使用订阅标识符来分组订阅。参见 SubscribeOpts

// ...
let opts = SubscribeOpts::default().subscription("topic", SubscriptionOpts::default());
let rsp = handle.subscribe(opts).await?;
let mut subscription = rsp.stream();

while let Some(msg) = subscription.next().await {
    println!("topic: {}; payload: {}", msg.topic_name(), str::from_utf8(msg.payload()).unwrap());
}

用户可以在一个订阅请求中订阅多个主题。

// ...
let opts = SubscribeOpts::default()
    .subscription("topic1", SubscriptionOpts::default())
    .subscription("topic2", SubscriptionOpts::default());

let mut subscription = handle.subscribe(opts).await?.stream();

while let Some(msg) = subscription.next().await {
    println!("topic: {}; payload: {}", msg.topic_name(), str::from_utf8(msg.payload()).unwrap());
}

可以使用 SubscriptionOpts 对每个订阅进行自定义。

let opts = SubscribeOpts::default()
    .subscription("topic", SubscriptionOpts::new().maximum_qos(QoS::AtLeastOnce));

SubscribeRsp 结构体表示订阅请求的结果。为了访问每个主题的原因码,使用 SubscribeRsp::payload 方法

// ...
let rsp = handle.subscribe(opts).await?;
let all_ok = rsp.payload().iter().copied().all(|reason| reason == SubackReason::GranteedQoS2);

取消订阅

取消订阅是通过 ContextHandle::unsubscribe 方法实现的。请注意,它不会关闭订阅流(可能会导致逻辑错误)。

// ...
let opts = UnsubscribeOpts::default().topic_name("topic");
let rsp = handle.unsubscribe(opts).await?;

与订阅一样,可以通过 UnsubscribeRsp::payload 方法获取每个主题的原因码。参见 UnsubscribeOpts

心跳和ping

如果在连接请求期间设置了 ConnectOpts::keep_alive 间隔,则用户必须定期使用 ContextHandle::ping 方法。

断开连接

断开连接可能由用户或代理程序发起。当由代理程序发起时,Context::run 方法返回 error::Disconnected 错误。

用户也可以通过使用 ContextHandle::disconnect 方法来执行优雅断开。当断开连接完成后,Context::run 方法返回 ()。

// ...
handle.disconnect(DisconnectOpts::default()).await?;

参见 DisconnectOpts

错误处理

主要库错误类型是位于error::MqttError枚举,在error模块中。

TLS/SSL

存在带有AsyncRead,AsyncWrite TLS/SSL流的TLS/SSL库。这些可以提供给Context::set_up方法。库本身不处理加密。

依赖

Poster-rs依赖于以下crate

  • futures - 启用与运行时无关的API
  • bytes - 原始数据和缓冲区管理
  • either - 处理两种不同类型“联合”的实用工具
  • derive_builder - 无代码膨胀地实现Builder模式

许可证

版权所有 © 2023 Borys Chyliński

以下条件下,任何获得此软件及其相关文档副本(“软件”)的人均可免费使用软件,包括但不限于使用、复制、修改、合并、发布、分发、再许可和/或销售软件副本,并允许向提供软件的人使用上述权利,前提如下

上述版权声明和本许可声明应包含在软件的所有副本或主要部分中。

软件按“原样”提供,不提供任何明示或暗示的保证,包括但不限于适销性、适用于特定目的和不侵犯专利权。在任何情况下,作者或版权所有者均不对任何索赔、损害或其他责任负责,无论是基于合同、侵权或其他原因,无论是因软件或其使用或其它方式导致的。

作者

Borys Chyliński

依赖

~1.6–2.3MB
~45K SLoC