#rabbitmq #elasticsearch #messaging #amqp

bin+lib rmqfwd

一个用于持久化、导出和重新发布 RabbitMQ 消息的工具

3 个版本

使用旧的 Rust 2015

0.3.2 2019 年 5 月 3 日
0.3.1 2019 年 3 月 19 日
0.3.0 2019 年 3 月 1 日

#1780 in 数据库接口

MIT/Apache

76KB
2K SLoC

Rmqfwd

Rmqfwd 监听发布在 amq.rabbitmq.trace 交换中的消息,并将它们持久化存储在文档存储中(即 Elasticsearch)。

此工具的设计考虑了两个主要用例

  • 消息审计: 通过依赖 Rabbit 的 Firehose 跟踪功能rmqfwd 允许检查集群中流入和流出的所有消息,无论发布或消费它们的特定应用程序,以及其日志行为。

  • 事件源和回放: 在基于事件的系统中,回放 消息是执行各种数据操作(例如填充系统、强制重新计算特定实体、解决偶尔的虫子或临时中断引入的数据不一致性)的一种既定模式。通过利用 ElasticSearch 的搜索能力,rmqfwd 允许通过单个命令重新发布到任意交换/路由键,而无需编写专门的代码。

用法

USAGE:
    rmqfwd [SUBCOMMAND]

FLAGS:
    -h, --help       Prints help information
    -V, --version    Prints version information

SUBCOMMANDS:
    trace     Bind a queue to 'amq.rabbitmq.trace' and persists received messages into the message store
    export    Query the message store and write the result to the file system
    replay    Publish a subset of the messages in the data store
    help      Prints this message or the help of the given subcommand(s)

配置

默认情况下,rmqfwd 将尝试读取 /etc/rmqfwd.toml 文件(这可以通过 -c 命令行开关来覆盖)。

以下是一个默认配置示例

[rabbitmq]
host = "localhost"
port = 5672
tracing_exchange = "amqp.rabbitmq.trace"

[rabbitmq.creds]
user = "guest"
password = "guest"

[elasticsearch]
index = "rabbit_messages"
message_type = "message"
base_url = "https://127.0.0.1:9200"

跟踪消息

trace 子命令将队列绑定到 Firehose 跟踪器 交换 amq.rabbitmq.trace。出队消息以规范格式写入 ElasticSearch,该格式捕获关键元数据(例如,发布的/交付的交换路由键绑定队列头部 等),以及实际的消息体作为纯文本。格式故意扁平,旨在与 Kibana 的内置过滤器良好配合。

开发设置

一旦安装了 Rust 稳定工具链,您可以使用以下命令从源代码构建

cargo build

编译后的可执行文件将在 ./target/debug/rmqfwd 中可用

运行和测试

您可以通过在项目目录中运行 docker-compose up 来设置开发环境。这将初始化以下过程,在两个独立的容器中

  • 一个启用了Firehose 跟踪器的 Rabbitmq 实例,管理控制台和访客用户访问。
  • 一个 Elasticsearch 2.5 实例

提供了一个smoketest,目前用于 CI 进行回归测试

依赖项

约 19–30MB
约 455K SLoC