3个版本
0.1.2 | 2023年2月6日 |
---|---|
0.1.1 | 2023年2月2日 |
0.1.0 | 2023年1月3日 |
#1512 in 编码
63KB
1.5K SLoC
Kafka-json-processor-core
这是multicatch的multicatch的一个核心依赖项。它是multicatch的multicatch的multicatch项目的一部分。它包含kafka-json-processor项目中用multicatch的multicatch生成的核心功能。
特性
- 用于管理从Kafka主题中读取和写入的内置函数
- 配置
- 用于读取/序列化JSON的函数
- 错误,减少在生成的项目中的模板代码的类型定义
- 美化XML和JSON格式化
- 流模拟器。
如何使用?
大多数情况下,它可能只用于生成项目。感谢这个核心依赖项,您可以multicatch使用multicatch生成单个文件的项目,例如main.rs
,它仍然可以阅读(您可以在编译之前调整一些函数)。
但是,没有阻止您手动实现自己的kafka-json-processor!请参阅multicatch的multicatch示例。
模拟
要在一个“干燥”环境中测试流,您可以使用模拟。这是一个测试工具,允许您测试JSON消息是否被正确处理(在运行编译后的kafka-json-processor之前)。
对于模拟,请准备以下目录结构
<project_directory>
> simulations
| > stream_name
对于生成的项目,stream_name
将会是 ${input_topic}_${output_topic}
(例如 in_out
),但在您的自定义 kafka-json-processor 中,您可以拥有任何名称的流。准备一个 HashMap<String, Stream>
流的哈希表,并使用 kafka_json_processor_core::simulation::simulate_streams_from_default_folder
运行模拟。
在模拟开始时,模拟器将查找目录中的所有文件,并尝试
- 反序列化
[Input]
JSON, - 使用给定输入消息在流中运行所有处理器,
- 断言输出消息等于
[Expected]
消息(通过比较 JSON,而不是原始序列化字符串)。
示例
配置 kafka-json-processor
默认情况下,kafka-json-processor 将查找 ./processor.properties
。您可以通过设置环境变量 KAFKA_PROCESSOR_CONFIG_PATH
来更改默认位置。
此文件包含 Kafka 客户端(rdkafka)和 kafka-json-processor 特定选项的配置。
有关 rdkafka 配置,请参阅 文档。使用 consumer.
或 producer.
前缀的 rdkafka 属性将仅应用于消费者或生产者。未前缀的属性将应用于两个客户端。
kafka-json-processor 特定选项
# Worker threads - how many threads to use for processing.
# Default: 4
processor.worker.threads=4
# Received messages are passed by a channel to worker threads. If the processors are too slow, the channel fill up.
# Default: 50
processor.channel.capacity=50
# The producer queue size. Processed messages are queued to be sent to Kafka. Producing will slow down if the queue fills up.
# You should set this option to the same value as producer.queue.buffering.max.messages.
# Default: 100000
processor.queue.size=100000
# Slow down time. When the producer queue is filled up above 95%, then the message production will be paused for the following time.
# This does not mean that processing will be paused too!
# Default: 10000 (10s)
processor.queue.slowdown.ms=10000
依赖关系
~17–28MB
~356K SLoC