3个版本
0.1.2 | 2023年2月6日 |
---|---|
0.1.1 | 2023年2月2日 |
0.1.0 | 2023年1月3日 |
455 在 配置
49KB
712 行
Kafka-json-processor项目生成器
此实用工具可用于生成一个Rust项目,该项目可以编译成可执行文件,包含所有预配置的JSON处理器。可执行文件将读取Kafka主题,处理消息并将它们写入另一个Kafka主题。
项目基于模板生成。模板是一个配置文件,指示生成器如何处理消息。
准备template.yml
请参阅以下"示例处理器"项目的示例
name: "Example processor"
streams:
- input_topic: in
output_topic: out
processors:
- generator: static_field
field: $.hello
value: world
- generator: copy_field
source_field: $.abc[1]
target_field: $.def
在此文件中, мы определяем один поток, который будет обрабатывать сообщения из темы "in" и помещать обработанные сообщения в "out"。
Поток — это единственная труба для обработки сообщений, исходящих из одной темы и идущих в другую тему. Один поток содержит список обработчиков. Обработчики — это функции, которые будут добавлять или изменять поля в выходящее сообщение.
В этом примере мы определяем два обработчика
- Первый добавит
static_field
в выходящее сообщение. Желаеныйfield
определяется JSONPath, а статическое значение определяетсяvalue
. - Второй
copy_field
из входящего сообщения в выходящее сообщение. Он будет копировать изsource_field
(определяемый JSONPath) вtarget_field
.
Обратите внимание, что в шаблоне мы не используем термин обработчик, тип обработчика или тип обработчика дляspecifying what function to use in a pipeline. Причина в том, что мы на самом деле генерируем функции для вашего целевого проекта. Таким образом, этот файл (template.yml
) на самом деле определяет, как генерировать проект, и поэтому мы используем различные генераторы для желаемого поведения.
Что такое генераторы?
生成器是脚本或可执行文件,根据给定的参数输出函数源代码。您可以编写自己的脚本以创建自定义函数。您还可以编写自定义的可执行文件(或“插件” - 见kjp-generator-plugin)。
kafka-json-processor提供了一些现成的生成器 - 您可以使用kjp-generator-generators。
默认情况下,kjp-generator使用"./generators"路径来查找可用的生成器。如果您希望指定自定义路径,请使用此参数选项
-g, --generators-path <GENERATORS_PATH>
Custom path to processor generators.
Put all processor generators in this directory. This directory will be scanned for available files and those files will be used as executable plugins to generate any code requested by `generator` option in your `template.yaml`.
[default: ./generators]
示例: ./kjp-generator -g ./kjp-generator-generators -t ./template.yml -o output-directory
如何生成项目?
您需要
- 一个模板(
template.yml
), kjp-generator
可执行文件,- 生成器(例如 kjp-generator-generators)。
运行kjp-generator
时使用以下必需选项
-t, --template <TEMPLATE>
Path to template file (YAML).
A template file is a configuration file that will be used to generate the final project. 'The final project' - Rust project with generated code to process messages from selected Kafka topics.
-o, --output <OUTPUT>
Output directory of generated project.
This will be the directory where the project with message processors will be generated. This project will contain generated code to process messages basing on the supplied template. The code will need to be compiled afterwards.
这些选项是可选的,但请注意
-g, --generators-path <GENERATORS_PATH>
Custom path to processor generators.
Put all processor generators in this directory. This directory will be scanned for available files and those files will be used as executable plugins to generate any code requested by `generator` option in your `template.yaml`.
[default: ./generators]
-c, --core-path <CORE_PATH>
Custom path to kafka_json_processor_core.
kafka_json_processor_code is a dependency that contains code that will prevent boilerplate in the generated project. By default, it will use hardcoded version from `crates.io`. If it doesn't work (or you want to use custom core), supply a path to the source code of kafka_json_processor_code.
示例: ./kjp-generator -g ./kjp-generator-generators -t ./template.yml -o output-directory
另见
插件开发者指南
在代码生成阶段,kjp-generator将尝试用以下参数执行生成器
- 生成的函数名,
- 来自
template.yml
的生成器选项作为参数对("name" "value")。
例如,以下处理器配置将执行
- input_topic: in
output_topic: out
processors:
- generator: static_field
field: $.hello
value: world
将执行以下脚本
./static_field.sh in_out_static_0_field 'generator' 'static_field' 'field' '$.hello' 'value' 'world'
Kjp-generator期望插件在stdout中以以下格式输出有效的UTF-8字符串
- 第一行:1)
OK
或2)ERR
, - 其余stdout:1) 函数源代码或2) 错误信息。
行分隔符:\n
。插件终止被视为函数生成的结束 - 在此事件之后收集stdout。无效输出被视为错误。
插件不需要解释JSONPath - 它只需将JSONPath用##JSONPATH(...)##
包裹即可,以指示kjp-generator将其转换为&[ObjectKey]
。例如:##JSONPATH($.hello[1].world)##
将变为&[Key(\"hello\".to_string()), Index(1), Key(\"world\".to_string())]
。
输出示例
OK
fn in_out_static_0_field(_input: &Value, message: &mut OutputMessage) -> Result<(), ProcessingError> {
message.insert_val(##JSONPATH($.hello)##, Value::String("world".to_string()))?;
Ok(())
}
ERR
Generator required property that was missing in config: source_field
依赖关系
~5.5–8MB
~145K SLoC