#kafka #json #error #processor #stream #topic #definition

kafka_json_processor_core

你的Kafka处理器核心逻辑,kafka-json-processor项目的一部分

3个版本

0.1.2 2023年2月6日
0.1.1 2023年2月2日
0.1.0 2023年1月3日

#1512 in 编码

GPL-3.0-or-later

63KB
1.5K SLoC

Kafka-json-processor-core

这是multicatchmulticatch的一个核心依赖项。它是multicatchmulticatchmulticatch项目的一部分。它包含kafka-json-processor项目中用multicatchmulticatch生成的核心功能。

特性

  • 用于管理从Kafka主题中读取和写入的内置函数
  • 配置
  • 用于读取/序列化JSON的函数
  • 错误,减少在生成的项目中的模板代码的类型定义
  • 美化XML和JSON格式化
  • 流模拟器。

如何使用?

大多数情况下,它可能只用于生成项目。感谢这个核心依赖项,您可以multicatch使用multicatch生成单个文件的项目,例如main.rs,它仍然可以阅读(您可以在编译之前调整一些函数)。

但是,没有阻止您手动实现自己的kafka-json-processor!请参阅multicatchmulticatch示例。

模拟

要在一个“干燥”环境中测试流,您可以使用模拟。这是一个测试工具,允许您测试JSON消息是否被正确处理(在运行编译后的kafka-json-processor之前)。

对于模拟,请准备以下目录结构

<project_directory>
 > simulations
 | > stream_name

对于生成的项目,stream_name 将会是 ${input_topic}_${output_topic} (例如 in_out),但在您的自定义 kafka-json-processor 中,您可以拥有任何名称的流。准备一个 HashMap<String, Stream> 流的哈希表,并使用 kafka_json_processor_core::simulation::simulate_streams_from_default_folder 运行模拟。

在模拟开始时,模拟器将查找目录中的所有文件,并尝试

  • 反序列化 [Input] JSON,
  • 使用给定输入消息在流中运行所有处理器,
  • 断言输出消息等于 [Expected] 消息(通过比较 JSON,而不是原始序列化字符串)。

示例

配置 kafka-json-processor

默认情况下,kafka-json-processor 将查找 ./processor.properties。您可以通过设置环境变量 KAFKA_PROCESSOR_CONFIG_PATH 来更改默认位置。

此文件包含 Kafka 客户端(rdkafka)和 kafka-json-processor 特定选项的配置。

有关 rdkafka 配置,请参阅 文档。使用 consumer.producer. 前缀的 rdkafka 属性将仅应用于消费者或生产者。未前缀的属性将应用于两个客户端。

kafka-json-processor 特定选项

# Worker threads - how many threads to use for processing.
# Default: 4
processor.worker.threads=4

# Received messages are passed by a channel to worker threads. If the processors are too slow, the channel fill up.
# Default: 50
processor.channel.capacity=50

# The producer queue size. Processed messages are queued to be sent to Kafka. Producing will slow down if the queue fills up.
# You should set this option to the same value as producer.queue.buffering.max.messages.
# Default: 100000
processor.queue.size=100000

# Slow down time. When the producer queue is filled up above 95%, then the message production will be paused for the following time.
# This does not mean that processing will be paused too!
# Default: 10000 (10s)
processor.queue.slowdown.ms=10000

依赖关系

~17–28MB
~356K SLoC