#kafka #cli #kafka-connect

应用 kofr

一款现代可配置的 Kafka Connect 集群管理 CLI

2 个版本

0.1.1 2023 年 12 月 3 日
0.1.0 2023 年 11 月 15 日

#48 in #kafka

MIT 协议

62KB
1.5K SLoC

kofr

Kofr 是受 kaf 和 kubectl 启发的 Kafka connect CLI。Kofr 封装了所有 Kafka connect REST API 操作。

目录

安装

用法

添加新的连接集群

$ kofr config add-cluster dev --hosts https://127.0.0.1:8083

更改上下文到集群

$ kofr config use-cluster dev

列出可用的集群

$ kofr config get-clusters

连接器操作

列出正在运行的连接器

$ kofr ls
 NAME                STATE     TASKS   TYPE     WORKER_ID
 load-kafka-config   RUNNING   1       SOURCE   127.0.1.1:8083
 test-connector      RUNNING   1       SINK     127.0.1.1:8083

列出当前连接集群状态

$ kofr cluster status
 Current Cluster: dev
 id : "GnW0xXSmqeO-t6CQVTJg"
 ...........................................
 HOST                      STATE
 https://127.0.0.1:8083     Online
 https://127.0.0.1:8080     Offline

描述连接器

$ kofr cn describe <connector-name>
{
  "name": "test-connector",
  "config": {
    "name": "test-connector",
    "tasks.max": "1",
    "topics": "test-topic",
    "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector"
  },
  "connector": {
    "state": "RUNNING",
    "worker_id": "127.0.1.1:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "127.0.1.1:8083"
    }
  ],
  "type": "sink"
}

从配置文件创建连接器,类似于 kubectl。

$ echo '{"name":"loadd-kafka-config", "config":{"connector.class":"FileStreamSource","file":"config/server.properties","topic":"kafka-config-topic"}}' \
| kofr cn create -f -

编辑正在运行的连接器配置,这将打开 $EDITOR,类似于 kubectl。

$ kofr cn edit <connector-name>

重启、暂停和恢复连接器。

$ kofr cn restart <connector-name>
# alternatively, you can specify tasks options
$ kofr cn restart <connector-name> --include-tasks
$ kofr cn restart <connector-name> --only-failed

$ kofr cn pause <connector-name>
$ kofr cn resume <connector-name>

使用新配置修补正在运行的连接器

$ kofr cn patch test-connector -d '{"file": "config/server.properties","name": "load-kafka-config","connector.class": "FileStreamSource","topic": "kafka-config-topic"'

删除正在运行的连接器

$ kofr cn delete <connector-name>

任务操作

列出运行连接器的任务

$ kofr tasks ls test-connector
Active tasks of connector: 'test-connector'
 ID   STATE     WORKER_ID        TRACE
 0    RUNNING   127.0.1.1:8083   -
 1    PAUSED    127.0.1.1:8083   -

重启任务

$ kofr task restart sink-connector 0

获取任务状态

$ kofr task status test-connector 0
{
  "config": {
    "task.class": "org.apache.kafka.connect.file.FileStreamSinkTask",
    "topics": "test-topic"
  },
  "status": {
    "id": 0,
    "state": "RUNNING",
    "worker_id": "127.0.1.1:8083"
  }
}

连接插件

列出集群上安装的插件

$ kofr plugin ls

使用连接器插件验证给定的连接器配置。

$ echo '{"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": "1",
        "topics": "test-topic"
}' | kofr plugin validate-config -f -

配置

默认情况下,kofr 从 ~/.kofr/config 读取配置。参见 示例 了解基本配置文件。

贡献

我欢迎对错误或更好的做事方式的修复,或者更重要的是,代码审查。Kofr 是在处理多个 Kafka connect 集群时产生的,当时觉得日常工作很枯燥,更重要的是,学习 rust wink-wink。我像使用 kubectl 或 kaf 一样使用它。参见 问题 了解我希望改进的事情。如果您对代码库有疑问或想讨论某些内容,请随时创建问题。另请参阅我用于测试 kofr 的 kcmockserver。它尚不完整,需要一些贡献。

相关项目

kcmockserver

依赖

~8–17MB
~267K SLoC