#cloud-events #mqtt #router #cerk #transport-layer #send-event

cerk_port_mqtt

这是一个为 CERK 开发的包。CERK 是一个用 Rust 编写的开源 CloudEvents 路由器,采用微内核架构。

7 个版本

0.2.11 2021 年 1 月 9 日
0.2.10 2021 年 1 月 9 日
0.2.6 2020 年 12 月 7 日
0.2.1 2020 年 11 月 25 日

#1 in #cerk

Apache-2.0

71KB
1K SLoC

cerk_port_mqtt

Build status

这是一个为 CERK 开发的包。CERK 是一个用 Rust 编写的开源 CloudEvents 路由器,采用微内核架构。

简介

CERK 允许您在不同端口之间路由您的 CloudEvents。端口是 CloudEvents 可以交换的传输层绑定。它注重模块化和可移植性。

组件

CERK 配备了一些预制组件,但实现自定义组件很容易。

GitHub 上提供了良好的概述 GitHub.

此组件:MQTT 端口

此端口将 CloudEvents 发布到或从 MQTT v3.1 主题订阅。

该端口使用 Eclipse Paho MQTT Rust 客户端 实现,并按照 CloudEvents v1.0 的 MQTT 协议绑定规范 发送和接收消息。

配置

配置应为类型 cerk::kernel::Config::HashMap 并至少包含以下条目

必填字段

host

值必须为类型 Config::String,并包含带有协议和端口的主机名。

例如:Config::String(String::from("tcp://mqtt-broker:1883"))

可选字段

send_topic

值必须为类型 Config::String,并包含消息将被发送到的 MQTT 主题名称。

例如:Config::String(String::from("test"))

以下配置是可选的。

持久化

值必须为类型 Config::U8,并包含以下值之一。

这些值是根据 Eclipse Paho MQTT Rust 客户端 PersistenceType 定义的。

  • 0: 文件(默认)- 数据和消息将保存在本地文件中(默认)
  • 1: 无 - 不使用持久化。

例如:Config::U8(0)

send_qos

消息发送的服务质量。服务质量仅适用于 MQTT 代理,不会改变路由器或端口的任何行为。路由器目前仅支持尽力而为。

  • 0: 至多一次交付(默认)
  • 1: 至少一次交付
  • 2: 精确一次交付

例如:Config::U8(0)

subscribe_topics

值必须为类型 Config::Vec([Config::String]),并且必须与 subscribe_qos 具有相同的长度。向量中的值包含路由器应订阅的 MQTT 主题。

如果在一个 MQTT 端口上订阅了多个主题,目前还没有方法让路由器或输出端口知道事件是从哪个主题接收到的。

subscribe_qos

值必须为类型 Config::Vec([Config::U8]),并且必须与 subscribe_topics 具有相同的长度。

主题订阅的服务质量。服务质量仅适用于 MQTT 代理,不会改变路由器或端口的任何行为。路由器目前仅支持尽力而为。

  • 0: 至多一次交付
  • 1: 至少一次交付
  • 2: 精确一次交付

配置示例

发送事件的最小配置

此配置将连接到代理,但不会发送或接收任何事件。

use std::collections::HashMap;
use cerk::kernel::Config;

let map: HashMap<String, Config> = [
    ("host".to_string(), Config::String("tcp://mqtt-broker:1883".to_string())),
]
.iter()
.cloned()
.collect();

let config = Config::HashMap(map);

发送事件的全配置

use std::collections::HashMap;
use cerk::kernel::Config;

let map: HashMap<String, Config> = [
    ("host".to_string(), Config::String("tcp://mqtt-broker:1883".to_string())),
    ("persistence".to_string(), Config::U8(0)),
    ("send_topic".to_string(), Config::String("test".to_string())),
    ("send_qos".to_string(), Config::U8(2)),
]
.iter()
.cloned()
.collect();

let config = Config::HashMap(map);

接收事件的全配置

use std::collections::HashMap;
use cerk::kernel::Config;

let map: HashMap<String, Config> = [
    ("host".to_string(), Config::String("tcp://mqtt-broker:1883".to_string())),
    ("persistence".to_string(), Config::U8(0)),
    (
      "subscribe_topics".to_string(),
      Config::Vec(vec![Config::String("test".to_string())]),
    ),
    (
      "subscribe_qos".to_string(),
      Config::Vec(vec![Config::U8(2)]),
    ),
]
.iter()
.cloned()
.collect();

let config = Config::HashMap(map);

接收事件的全配置

use std::collections::HashMap;
use cerk::kernel::Config;

let map: HashMap<String, Config> = [
    ("host".to_string(), Config::String("tcp://mqtt-broker:1883".to_string())),
    ("persistence".to_string(), Config::U8(0)),
    ("send_topic".to_string(), Config::String("test".to_string())),
    ("send_qos".to_string(), Config::U8(2)),
    (
      "subscribe_topics".to_string(),
      Config::Vec(vec![Config::String("test".to_string())]),
    ),
    (
      "subscribe_qos".to_string(),
      Config::Vec(vec![Config::U8(2)]),
    ),
]
.iter()
.cloned()
.collect();

let config = Config::HashMap(map);

示例

限制

  • 可靠性 此端口不支持任何除 DeliveryGuarantee 之外的 BestEffort,因此从不发送 OutgoingCloudEventProcessedIncomingCloudEventProcessed 消息。

更新 Readme

原始 readme 文本是位于 lib.rs 文件中的 Rust 文档注释。

  1. cargo安装 cargo-readme
  2. cargo readme > README.md

许可证

Apache-2.0

依赖关系

~19–32MB
~596K SLoC