#message #hub #astarte #protocols #buffer #api #client

astarte-message-hub-proto

Astarte消息中心协议缓冲区,用于Rust

4个版本

0.6.2 2024年4月23日
0.6.1 2023年12月21日
0.6.0 2023年12月13日
0.5.1 2023年10月11日
0.5.1-rc.1 2023年10月10日

1147网络编程

Download history 215/week @ 2024-04-14 210/week @ 2024-04-21 258/week @ 2024-04-28 649/week @ 2024-05-05 188/week @ 2024-05-12 22/week @ 2024-05-19 265/week @ 2024-05-26 124/week @ 2024-06-02 48/week @ 2024-06-09 28/week @ 2024-06-16 18/week @ 2024-06-23 63/week @ 2024-06-30 32/week @ 2024-07-07 38/week @ 2024-07-14 89/week @ 2024-07-21 76/week @ 2024-07-28

235 每月下载量
2 crate 中使用

Apache-2.0

81KB
1.5K SLoC

Astarte消息中心协议缓冲区,用于Rust

本模块通过Rust API提供对Astarte消息中心协议缓冲区的访问。

文档

需求

  • protobuf >= 3.15
  • Rust版本 >= 1.72.0

客户端示例

use std::time;

use clap::Parser;

use astarte_message_hub_proto::astarte_message::Payload;
use astarte_message_hub_proto::message_hub_client::MessageHubClient;
use astarte_message_hub_proto::AstarteMessage;
use astarte_message_hub_proto::Node;
use log::info;

/// Create a ProtoBuf client for the Astarte message hub.
#[derive(Parser, Debug)]
#[clap(version, about)]
struct Cli {
    /// UUID to be used when registering the client as an Astarte message hub node.
    uuid: String,

    /// Stop after sending COUNT messages.
    #[clap(short, long)]
    count: Option<u64>,

    /// Milliseconds to wait between messages.
    #[clap(short, long, default_value = "3000")]
    time: u64,
}

async fn run_example_client() {
    env_logger::init();
    let args = Cli::parse();

    let mut client = MessageHubClient::connect("http://[::1]:50051")
        .await
        .unwrap();

    let device_datastream_interface: &str = r#"{
        "interface_name": "org.astarte-platform.rust.examples.datastream.DeviceDatastream",
        "version_major": 0,
        "version_minor": 1,
        "type": "datastream",
        "ownership": "device",
        "mappings": [
            {
                "endpoint": "/uptime",
                "type": "string",
                "explicit_timestamp": true
            }
        ]
    }"#;

    let interface_jsons = [device_datastream_interface];

    let node = Node::new(&args.uuid, &interface_jsons);

    let mut stream = client.attach(node.clone()).await.unwrap().into_inner();

    // Start a separate task to handle incoming data
    let reply_handle = tokio::spawn(async move {
        info!("Waiting for messages from the message hub.");

        while let Some(astarte_message) = stream.message().await.unwrap() {
            println!("Received AstarteMessage = {:?}", astarte_message);
        }

        info!("Done receiving messages, closing the connection.");
    });

    // Start a separate task to publish data
    let send_handle = tokio::spawn(async move {
        let now = time::SystemTime::now();
        let mut count = 0;
        // Consistent interval of 3 seconds
        let mut interval = tokio::time::interval(std::time::Duration::from_millis(args.time));

        while args.count.is_none() || Some(count) < args.count {
            // Wait a little
            interval.tick().await;

            println!("Publishing the uptime through the message hub.");

            let elapsed = now.elapsed().unwrap().as_secs();

            let elapsed_str = format!("Uptime for node {}: {}", args.uuid, elapsed);
            let msg = AstarteMessage {
                interface_name: "org.astarte-platform.rust.examples.datastream.DeviceDatastream"
                    .to_string(),
                path: "/uptime".to_string(),
                timestamp: None,
                payload: Some(Payload::AstarteData(elapsed_str.into())),
            };
            client.send(msg).await.unwrap();

            count += 1;
        }

        info!("Done sending messages, closing the connection.");
        client.detach(node).await.expect("Detach failed");
    });

    let res = tokio::join!(reply_handle, send_handle);

    match res {
        (Ok(_), Ok(_)) => (),
        (Err(e), Ok(_)) | (Ok(_), Err(e)) => panic!("Error: {}", e),
        (Err(e1), Err(e2)) => panic!("Error:\n\t{}\n\t{}", e1, e2),
    }
}

依赖项

~5.5–9.5MB
~150K SLoC