#事件循环 #MQTT 客户端 # #鲁棒 #tokio #请求 #状态

svc-rumq-client

为您的连接设备提供的有效且鲁棒的 MQTT 客户端

1 个不稳定版本

0.1.0-alpha.112020 年 7 月 25 日

#21 in #事件循环

Download history 9/week @ 2024-04-07 23/week @ 2024-04-14 15/week @ 2024-04-21 9/week @ 2024-04-28 14/week @ 2024-05-05 45/week @ 2024-05-12 67/week @ 2024-05-19 65/week @ 2024-05-26 49/week @ 2024-06-02 104/week @ 2024-06-09 67/week @ 2024-06-16 20/week @ 2024-06-23 30/week @ 2024-06-30 51/week @ 2024-07-14 24/week @ 2024-07-21

每月 106 次下载

MIT 许可证

155KB
3K SLoC

一个纯 Rust MQTT 客户端,力求鲁棒、高效且易于使用。

  • 事件循环就是一个异步 Stream,可以被 tokio 轮询。
  • 对事件循环的请求也是一个 Stream。解决了有界和无界用例。
  • 鲁棒性只需一个循环即可实现。
  • 灵活访问事件循环的状态以控制其行为。

接受任何请求流。

构建有界、无界、可中断或其他任何满足您需求的流(用于向事件循环提供数据)。

我们的一些实际用例

  • 一个流,通过检测背压来协调磁盘和内存之间的数据,并且(实际上)不会丢失数据。
  • 一个流,根据数据的优先级在多个通道之间处理数据。
#[tokio::main(core_threads = 1)]
async fn main() {
    let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
    let requests = Vec::new::<Request>();

    let mut eventloop = eventloop(mqttoptions, requests_rx);
    let mut stream = eventloop.connect().await.unwrap();
    while let Some(item) = stream.next().await {
        println!("Received = {:?}", item);
    }
}

鲁棒性只需一个循环即可实现。

网络是不可靠的。但鲁棒性是容易实现的。

  • 只需从现有的事件循环创建一个新的流。
  • 从上次停止的地方继续。
  • 访问事件循环的状态以自定义下一次连接的行为。
#[tokio::main(core_threads = 1)]
async fn main() {
    let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
    let requests = Vec::new::<Request>();

    let mut eventloop = eventloop(mqttoptions, requests_rx);

    // loop to reconnect and resume
    loop {
        let mut stream = eventloop.connect().await.unwrap();
        while let Some(item) = stream.next().await {
            println!("Received = {:?}", item);
        }

        time::delay_for(Duration::from_secs(1)).await;
    }
}

事件循环就是一个可以被 tokio 轮询的流。

  • 将其连接到 select!join! 以在同一个线程上与其他流交错。
#[tokio::main(core_threads = 1)]
async fn main() {
    let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
    let requests = Vec::new::<Request>();

    let mut eventloop = eventloop(mqttoptions, requests_rx);

    // plug it into tokio ecosystem
    let mut stream = eventloop.connect().await.unwrap();
}

强大的通知系统以控制运行时

事件循环流产生所有有趣的事件,从网络上的数据到断开连接和重新连接。按您认为合适的方式使用它。

  • 重新连接后重新订阅
  • 接收第 N 个 puback 后停止
#[tokio::main(core_threads = 1)]
async fn main() {
    let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
    let (requests_tx, requests_rx) = channel(10);

    let mut eventloop = eventloop(mqttoptions, requests_rx);

    // loop to reconnect and resume
    loop {
        let mut stream = eventloop.connect().await.unwrap();
        while let Some(notification) = stream.next().await {
            println!("Received = {:?}", item);
            match notification {
                Notification::Connect => requests_tx.send(subscribe).unwrap(),
            }
        }

        time::delay_for(Duration::from_secs(1)).await;
    }
}

依赖关系

~15MB
~351K SLoC