#kafka #generator #json #processor #config-file #topic #generate

bin+lib kjp_generator

自定义Kafka JSON处理器生成器,kafka-json-processor项目的一部分

3个版本

0.1.2 2023年2月6日
0.1.1 2023年2月2日
0.1.0 2023年1月3日

455配置

GPL-3.0-or-later

49KB
712

Kafka-json-processor项目生成器

此实用工具可用于生成一个Rust项目,该项目可以编译成可执行文件,包含所有预配置的JSON处理器。可执行文件将读取Kafka主题,处理消息并将它们写入另一个Kafka主题。

项目基于模板生成。模板是一个配置文件,指示生成器如何处理消息。

准备template.yml

请参阅以下"示例处理器"项目的示例

name: "Example processor"
streams:
- input_topic: in
  output_topic: out

  processors:
    - generator: static_field
      field: $.hello
      value: world
    - generator: copy_field
      source_field: $.abc[1]
      target_field: $.def

在此文件中, мы определяем один поток, который будет обрабатывать сообщения из темы "in" и помещать обработанные сообщения в "out"。

Поток — это единственная труба для обработки сообщений, исходящих из одной темы и идущих в другую тему. Один поток содержит список обработчиков. Обработчики — это функции, которые будут добавлять или изменять поля в выходящее сообщение.

В этом примере мы определяем два обработчика

  • Первый добавит static_field в выходящее сообщение. Желаеный field определяется JSONPath, а статическое значение определяется value.
  • Второй copy_field из входящего сообщения в выходящее сообщение. Он будет копировать из source_field (определяемый JSONPath) в target_field.

Обратите внимание, что в шаблоне мы не используем термин обработчик, тип обработчика или тип обработчика дляspecifying what function to use in a pipeline. Причина в том, что мы на самом деле генерируем функции для вашего целевого проекта. Таким образом, этот файл (template.yml) на самом деле определяет, как генерировать проект, и поэтому мы используем различные генераторы для желаемого поведения.

Что такое генераторы?

生成器是脚本或可执行文件,根据给定的参数输出函数源代码。您可以编写自己的脚本以创建自定义函数。您还可以编写自定义的可执行文件(或“插件” - 见kjp-generator-plugin)。

kafka-json-processor提供了一些现成的生成器 - 您可以使用kjp-generator-generators

默认情况下,kjp-generator使用"./generators"路径来查找可用的生成器。如果您希望指定自定义路径,请使用此参数选项

  -g, --generators-path <GENERATORS_PATH>
          Custom path to processor generators.
          
          Put all processor generators in this directory. This directory will be scanned for available files and those files will be used as executable plugins to generate any code requested by `generator` option in your `template.yaml`.

          [default: ./generators]

示例: ./kjp-generator -g ./kjp-generator-generators -t ./template.yml -o output-directory

如何生成项目?

您需要

运行kjp-generator时使用以下必需选项

  -t, --template <TEMPLATE>
          Path to template file (YAML).
          
          A template file is a configuration file that will be used to generate the final project. 'The final project' - Rust project with generated code to process messages from selected Kafka topics.

  -o, --output <OUTPUT>
          Output directory of generated project.
          
          This will be the directory where the project with message processors will be generated. This project will contain generated code to process messages basing on the supplied template. The code will need to be compiled afterwards.

这些选项是可选的,但请注意

  -g, --generators-path <GENERATORS_PATH>
          Custom path to processor generators.
          
          Put all processor generators in this directory. This directory will be scanned for available files and those files will be used as executable plugins to generate any code requested by `generator` option in your `template.yaml`.
                    
          [default: ./generators]
          
  -c, --core-path <CORE_PATH>
          Custom path to kafka_json_processor_core.
          
          kafka_json_processor_code is a dependency that contains code that will prevent boilerplate in the generated project. By default, it will use hardcoded version from `crates.io`. If it doesn't work (or you want to use custom core), supply a path to the source code of kafka_json_processor_code.

示例: ./kjp-generator -g ./kjp-generator-generators -t ./template.yml -o output-directory

另见

插件开发者指南

在代码生成阶段,kjp-generator将尝试用以下参数执行生成器

  • 生成的函数名,
  • 来自template.yml的生成器选项作为参数对("name" "value")。

例如,以下处理器配置将执行

- input_topic: in
  output_topic: out

  processors:
    - generator: static_field
      field: $.hello
      value: world

将执行以下脚本

./static_field.sh in_out_static_0_field 'generator' 'static_field' 'field' '$.hello' 'value' 'world'

Kjp-generator期望插件在stdout中以以下格式输出有效的UTF-8字符串

  • 第一行:1) OK或2) ERR
  • 其余stdout:1) 函数源代码或2) 错误信息。

行分隔符:\n。插件终止被视为函数生成的结束 - 在此事件之后收集stdout。无效输出被视为错误。

插件不需要解释JSONPath - 它只需将JSONPath用##JSONPATH(...)##包裹即可,以指示kjp-generator将其转换为&[ObjectKey]。例如:##JSONPATH($.hello[1].world)##将变为&[Key(\"hello\".to_string()), Index(1), Key(\"world\".to_string())]

输出示例

OK
fn in_out_static_0_field(_input: &Value, message: &mut OutputMessage) -> Result<(), ProcessingError> {
    message.insert_val(##JSONPATH($.hello)##, Value::String("world".to_string()))?;
    Ok(())
}

ERR
Generator required property that was missing in config: source_field

依赖关系

~5.5–8MB
~145K SLoC