#mqtt-client #actix #框架 #消息 #actor #发布 #停止

actix-mqtt-client

基于actix框架的MQTT客户端

15个版本

0.5.1 2024年4月13日
0.5.0 2021年6月13日
0.4.4 2020年9月29日
0.4.2 2020年3月11日
0.1.1 2019年7月31日

#349 in 异步

每月38次下载
用于 mqttp

MIT 许可证

125KB
3K SLoC

基于actix框架的MQTT v3.1.1客户端

actix-mqtt-client crate是基于actix框架的mqtt v3.1.1客户端

Build Status crates.io docs.rs

基本用法和示例

首先,创建2个actix actor,一个用于接收发布消息,另一个用于接收客户端的错误消息,你也可以创建一个可选的actix actor来接收停止消息

pub struct ErrorActor;

impl actix::Actor for ErrorActor {
    type Context = actix::Context<Self>;
}

impl actix::Handler<ErrorMessage> for ErrorActor {
    type Result = ();
    fn handle(&mut self, error: ErrorMessage, _: &mut Self::Context) -> Self::Result {
        log::error!("{}", error.0);
    }
}

pub struct MessageActor;

impl actix::Actor for MessageActor {
    type Context = actix::Context<Self>;
}

impl actix::Handler<PublishMessage> for MessageActor {
    type Result = ();
    fn handle(
        &mut self,
        msg: PublishMessage,
        _: &mut Self::Context,
    ) -> Self::Result {
        log::info!(
            "Got message: id:{}, topic: {}, payload: {:?}",
            msg.id,
            msg.topic_name,
            msg.payload
        );
    }
}

然后,连接到服务器(使用tokio)并使用流的读取和写入部分以及actor来创建一个MqttClient

use std::io::Error as IoError;
use std::net::SocketAddr;
use std::str::FromStr;
use std::time::Duration;
use actix::{Actor, Arbiter, System};
use env_logger;
use tokio::io::split;
use tokio::net::TcpStream;
use tokio::time::{sleep_until, Instant};
use actix_mqtt_client::client::{MqttClient, MqttOptions};

let sys = System::new();
let socket_addr = SocketAddr::from_str("127.0.0.1:1883").unwrap();
sys.block_on(async move {
    let result = async move {
        let stream = TcpStream::connect(socket_addr).await?;
        let (r, w) = split(stream);
        log::info!("TCP connected");
        let mut client = MqttClient::new(
            r,
            w,
            String::from("test"),
            MqttOptions::default(),
            MessageActor.start().recipient(),
            ErrorActor.start().recipient(),
            None,
        );
        client.connect().await?;

        // Waiting for the client to be connected
        while !client.is_connected().await? {
            let delay_time = Instant::now() + Duration::new(1, 0);
            sleep_until(delay_time).await;
        }

        log::info!("MQTT connected");
        log::info!("Subscribe");
        client
            .subscribe(String::from("test"), mqtt::QualityOfService::Level2)
            .await?;
        log::info!("Publish");
        client
            .publish(
                String::from("test"),
                mqtt::QualityOfService::Level0,
                Vec::from("test".as_bytes()),
            )
            .await?;
        log::info!("Wait for 10s");
        let delay_time = Instant::now() + Duration::new(10, 0);
        sleep_until(delay_time).await;
        client
            .publish(
                String::from("test"),
                mqtt::QualityOfService::Level1,
                Vec::from("test2".as_bytes()),
            )
            .await?;
        log::info!("Wait for 10s");
        let delay_time = Instant::now() + Duration::new(10, 0);
        sleep_until(delay_time).await;
        client
            .publish(
                String::from("test"),
                mqtt::QualityOfService::Level2,
                Vec::from("test3".as_bytes()),
            )
            .await?;
        log::info!("Wait for 10s");
        let delay_time = Instant::now() + Duration::new(10, 0);
        sleep_until(delay_time).await;
        log::info!("Disconnect");
        client.disconnect(false).await?;
        log::info!("Check if disconnect is successful");
        Ok(assert_eq!(true, client.is_disconnected())) as Result<(), IoError>
    }
    .await;
    result.unwrap()
});
sys.run().unwrap();

依赖关系

~7–18MB
~220K SLoC