#mqtt-client #event-loop #robust #iot #async-stream #connected #tokio

rumq-client

为您的连接设备提供高效且健壮的MQTT客户端

10个版本

0.1.0-alpha.102020年4月15日
0.1.0-alpha.92020年3月29日
0.1.0-alpha.52020年2月16日
0.1.0-alpha.32020年1月6日
0.1.0-alpha.22019年12月26日

#14 in #connected

Download history 20/week @ 2024-03-30 2/week @ 2024-04-06 6/week @ 2024-05-25 3/week @ 2024-06-01

每月60次下载

MIT许可证

155KB
3K SLoC

一个追求健壮、高效且易于使用的纯Rust MQTT客户端。

  • 事件循环只是一个可以被tokio轮询的异步Stream
  • 对事件循环的请求也是一个Stream。解决了有界和无界两种用例。
  • 健壮性只需一个循环即可实现。
  • 灵活访问事件循环的状态以控制其行为。

接受任何请求流。

构建有界、无界、可中断或其他任何满足您需求的流(以喂养事件循环)。

一些我们的实际应用场景

  • 一个流,通过检测背压在磁盘和内存之间编排数据,永远不会(实际上)丢失数据。
  • 一个流,根据数据的优先级在多个通道之间切换数据。
#[tokio::main(core_threads = 1)]
async fn main() {
    let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
    let requests = Vec::new::<Request>();

    let mut eventloop = eventloop(mqttoptions, requests_rx);
    let mut stream = eventloop.connect().await.unwrap();
    while let Some(item) = stream.next().await {
        println!("Received = {:?}", item);
    }
}

健壮性只需一个循环即可实现。

网络是不可靠的。但健壮性是容易实现的。

  • 只需从现有的事件循环创建一个新的流即可。
  • 从上次离开的地方继续。
  • 访问事件循环的状态以自定义下一个连接的行为。
#[tokio::main(core_threads = 1)]
async fn main() {
    let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
    let requests = Vec::new::<Request>();

    let mut eventloop = eventloop(mqttoptions, requests_rx);

    // loop to reconnect and resume
    loop {
        let mut stream = eventloop.connect().await.unwrap();
        while let Some(item) = stream.next().await {
            println!("Received = {:?}", item);
        }

        time::delay_for(Duration::from_secs(1)).await;
    }
}

事件循环只是一个可以被tokio轮询的流。

  • 将其插入到select! join!中,与其他线程上的流交织。
#[tokio::main(core_threads = 1)]
async fn main() {
    let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
    let requests = Vec::new::<Request>();

    let mut eventloop = eventloop(mqttoptions, requests_rx);

    // plug it into tokio ecosystem
    let mut stream = eventloop.connect().await.unwrap();
}

强大的通知系统来控制运行时。

事件循环流产生所有有趣的事件,从网络上的数据到断开和重新连接。

  • 重新连接后重新订阅。
  • 在收到第N个puback后停止。
#[tokio::main(core_threads = 1)]
async fn main() {
    let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
    let (requests_tx, requests_rx) = channel(10);

    let mut eventloop = eventloop(mqttoptions, requests_rx);

    // loop to reconnect and resume
    loop {
        let mut stream = eventloop.connect().await.unwrap();
        while let Some(notification) = stream.next().await {
            println!("Received = {:?}", item);
            match notification {
                Notification::Connect => requests_tx.send(subscribe).unwrap(),
            }
        }

        time::delay_for(Duration::from_secs(1)).await;
    }
}

依赖项

~15MB
~351K SLoC