#kafka #replication #apache-kafka #observer #event-streaming

bin+lib kafka-replicator

在 Kafka 集群之间复制数据的应用程序

5 个版本

0.5.1 2020 年 12 月 10 日
0.5.0 2020 年 7 月 30 日
0.4.2 2020 年 7 月 5 日
0.4.1 2020 年 7 月 4 日
0.4.0 2020 年 7 月 4 日

#1614 in 命令行工具

自定义许可协议

67KB
1.5K SLoC

Kafka replicator

Kafka Replicator 是一个易于使用的工具,用于在两个 Apache Kafka 集群之间复制数据,具有可配置的重新分区策略。

将根据配置规则从源集群的主题中读取数据,并将其写入目标集群的主题。

特性

让我们先概述一下 kafka-replicator 中存在的特性

  • 数据复制: Kafka 集群和数据中心之间的实时事件流;
  • 模式复制: 从源集群复制模式到目标;
  • 灵活的主题选择: 使用可配置的配置选择主题;
  • 自动创建主题: 对于 strict_p2p 策略,目标主题将自动创建;
  • 状态: 工具显示复制状态;
  • 监控: Kafka replicator 通过 prometheus 导出状态。
  • 循环检测

用例

  • 在 Kafka 集群之间复制数据;
  • 将多个主题的记录聚合到一个主题中;
  • 通过重新分区策略扩展现有主题的带宽。

安装

系统依赖

libsasl2-dev
libssl-dev

从 crates.io 安装

如果您的本地系统已经安装了 Rust 工具链。

rustup update stable
cargo install kafka-replicator

从源代码编译和运行它

克隆存储库并将其更改为您的工作目录。

git clone https://github.com/lispython/kafka-replicator.git
cd kafka-replicator

rustup override set stable
rustup update stable
cargo install

用法

RUST_LOG=info kafka-replicator /path/to/config.yml

使用 Docker 运行它

sudo docker run -it -v /replication/:/replication/ -e RUST_LOG=info lispython/kafka_replicator:latest kafka-replicator /replication/config.yml

示例配置

clusters:
  - name: cluster_1
    hosts:
      - replicator-kafka-1:9092
      - replicator-kafka-1:9092
  - name: cluster_2
    hosts:
      - replicator-kafka-2:9092

clients:
  - client: cl_1_client_1
    cluster: cluster_1
    config: # optional
       message.timeout.ms: 5000
       auto.offset.reset: earliest
  - client: cl_2_client_1
    cluster: cluster_2

routes:
  - upstream_client: cl_1_client_1
    downstream_client: cl_1_client_1
    upstream_topics:
      - 'topic1'
    downstream_topic: 'topic2'
    repartitioning_strategy: random # strict_p2p | random
    upstream_group_id: group_22
    show_progress_interval_secs: 10
    limits:
      messages_per_sec: 10000
      number_of_messages:

  - upstream_client: cl_1_client_1
    downstream_client: cl_2_client_1
    upstream_topics:
      - 'topic2'
    downstream_topic: 'topic2'
    repartitioning_strategy: strict_p2p
    upstream_group_id: group_22
    show_progress_interval_secs: 10

  - upstream_client: cl_2_client_1
    downstream_client: cl_1_client_1
    upstream_topics:
      - 'topic2'
    downstream_topic: 'topic3'
    repartitioning_strategy: strict_p2p # strict_p2p | random
    default_begin_offset: earliest # optional
    upstream_group_id: group_2
    show_progress_interval_secs: 10


observers:
  - client: cl_1_client_1
    name: "my name"
    group_id: group_name # used for remaining metrics
    topics: # filter by topics
      - 'topic1'
      - 'topic2'
    fetch_timeout_secs: 5 # default: 5
    fetch_interval_secs: 5 # default: 60
    show_progress_interval_secs: 10 # default: 60

  - client: cl_2_client_1
    topic: 'topic3'
    topics:
      - 'topic2'
    show_progress_interval_secs: 5


  - client: cl_1_client_1
    topic: 'topic1'
    topics: [] # fetch all topics

选项描述

根配置选项

  • clusters - 是 Kafka 集群列表
  • clients - 是消费者配置列表
  • routes - 是复制规则列表
  • observers - 是观察者列表

贡献

任何建议、反馈或贡献都备受赞赏。感谢您的支持!

依赖项

~51MB
~880K SLoC