#mqtt #mqtt-client #networking #async #message #edge #protocols

no-std edge-mqtt

在rumqttc crate的基础上实现了嵌入式-svc MQTT traits

3个版本 (破坏性更新)

0.2.0 2024年2月1日
0.1.0 2024年1月27日
0.0.0 2023年11月22日

#1804 in 嵌入式开发


edge-net使用

MIT/Apache

11KB
133

edge-mqtt

CI crates.io Documentation

rumqttc crate的包装器,将其适配到embedded-svc crate的异步MQTT traits

注意:需要STD!

未来的计划是,一旦rust-mqtt获得MQTT 3.1兼容性并实现更直观的API(发送消息可以独立于接收MQTT消息进行),就废弃这个crate。

...或者实现一个真正的no_std无分配替代方案 - 就像所有其他edge-* crate一样 - 如果rust-mqtt没有进一步的开发。

示例

use async_compat::CompatExt;

use embedded_svc::mqtt::client::asynch::{Client, Connection, Publish, QoS};
use embedded_svc::mqtt::client::Event;

use embassy_futures::select::{select, Either};
use embassy_time::{Duration, Timer};

use edge_mqtt::io::{AsyncClient, MqttClient, MqttConnection, MqttOptions};

use log::*;

const MQTT_HOST: &str = "broker.emqx.io";
const MQTT_PORT: u16 = 1883;
const MQTT_CLIENT_ID: &str = "edge-mqtt-demo";
const MQTT_TOPIC: &str = "edge-mqtt-demo";

fn main() {
    env_logger::init_from_env(
        env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"),
    );

    let (client, conn) = mqtt_create(MQTT_CLIENT_ID, MQTT_HOST, MQTT_PORT).unwrap();

    futures_lite::future::block_on(
        run(client, conn, MQTT_TOPIC).compat(), /* necessary for tokio */
    )
    .unwrap()
}

async fn run<M, C>(mut client: M, mut connection: C, topic: &str) -> Result<(), anyhow::Error>
where
    M: Client + Publish + 'static,
    M::Error: std::error::Error + Send + Sync + 'static,
    C: Connection + 'static,
{
    info!("About to start the MQTT client");

    info!("MQTT client started");

    client.subscribe(topic, QoS::AtMostOnce).await?;

    info!("Subscribed to topic \"{topic}\"");

    let res = select(
        async move {
            info!("MQTT Listening for messages");

            while let Ok(event) = connection.next().await {
                info!("[Queue] Event: {}", event.payload());
            }

            info!("Connection closed");

            Ok(())
        },
        async move {
            // Just to give a chance of our connection to get even the first published message
            Timer::after(Duration::from_millis(500)).await;

            let payload = "Hello from edge-mqtt-demo!";

            loop {
                client
                    .publish(topic, QoS::AtMostOnce, false, payload.as_bytes())
                    .await?;

                info!("Published \"{payload}\" to topic \"{topic}\"");

                let sleep_secs = 2;

                info!("Now sleeping for {sleep_secs}s...");
                Timer::after(Duration::from_secs(sleep_secs)).await;
            }
        },
    )
    .await;

    match res {
        Either::First(res) => res,
        Either::Second(res) => res,
    }
}

fn mqtt_create(
    client_id: &str,
    host: &str,
    port: u16,
) -> Result<(MqttClient, MqttConnection), anyhow::Error> {
    let mut mqtt_options = MqttOptions::new(client_id, host, port);

    mqtt_options.set_keep_alive(core::time::Duration::from_secs(10));

    let (rumqttc_client, rumqttc_eventloop) = AsyncClient::new(mqtt_options, 10);

    let mqtt_client = MqttClient::new(rumqttc_client);
    let mqtt_conn = MqttConnection::new(rumqttc_eventloop);

    Ok((mqtt_client, mqtt_conn))
}

依赖项

~4–16MB
~167K SLoC