4个版本

0.1.3 2023年10月22日
0.1.2 2022年4月16日
0.1.1 2021年3月19日
0.1.0 2021年2月21日

#697解析实现

MIT/Apache

51KB
1K SLoC

kafcat

Kafcat是Rust语言完全异步重写的kafkacat。

我试图将远程服务器上的kafka数据复制到本地的localhost进行测试。Kafkacat是唯一可用的工具来完成这项任务。然而,它是用C语言编写的,并且完全不支持二进制数据。我正在处理一个pull request,但尚未合并,代码难以维护且易于出错。因此,我编写了自己的kafcat。

特性

它模仿了kafkacat的命令行使用方式,但具有额外的功能。它可以

  • 从stdin读取并发送到kafka主题
  • 从kafka读取到stdin
  • 从一个主题复制到另一个主题(甚至在不同服务器上)
  • 以json格式导入/导出数据
  • 可更换的后端(目前仅支持librdkafka,kafka-rust即将推出)

缺点

Kafcat仍处于早期开发阶段,因此

  • 尚不支持TLS

  • 尚不支持Apache AVRO

  • 尚不支持复杂的输入/输出格式

基本用法

  1. 编译kafcat。你可以在target/release/kafcat找到二进制文件

    cargo build --release
    
  2. 确保Kafka正在运行(我们假设在localhost:9092)。你可以使用docker-compose -f tests/plaintext-server.yml up(注意,在MacOS上,docker中的kafka存在问题,我甚至无法通过docker设置适当的测试环境)

  3. 设置监听器,假设kafcat位于当前目录

    ./kafcat -C --topic test
    
  4. 在另一个终端中设置生产者

    ./kafcat -P --topic test
    
  5. 复制一个主题。格式为./kafcat copy <from> - <to>fromto与消费者和生产者使用的完全相同

    ./kafcat copy --topic test -- --topic test2
    
  6. 使用默认分隔符:输入任何键和值。例如,hello:world

  7. 你应该在消费者终端中看到hello:world

  8. 详细帮助信息可以在 ./kafcat --help./kafcat -C --help./kafcat -P --help 等命令中找到。您还可以查看 kafkacat 以获取参考。

    kafcat-consume 
    
    USAGE:
        kafcat {consume, -C} [FLAGS] [OPTIONS] --topic <topic>
    
    FLAGS:
        -e, --exit
                Exit successfully when last message received
    
        -h, --help
                Prints help information
    
        -V, --version
                Prints version information
    
    
    OPTIONS:
        -b, --brokers <brokers>
                Broker list in kafka format [default: localhost:9092]
    
        -s, --format <format>
                Serialize/Deserialize format [default: text]
    
        -G, --group-id <group-id>
                Consumer group id. (Kafka >=0.9 balanced consumer groups) [default: kafcat]
    
        -K <key-delimiter>
                Delimiter to split input key and message [default: :]
    
        -D <msg-delimiter>
                Delimiter to split input into messages(currently only supports '\n') [default: 
                ]
    
        -o <offset>
                Offset to start consuming from:
                                     beginning | end | stored |
                                     <value>  (absolute offset) |
                                     -<value> (relative offset from end)
                                     s@<value> (timestamp in ms to start at)
                                     e@<value> (timestamp in ms to stop at (not included))[default:
                beginning]
    
        -p, --partition <partition>
                Partition
    
        -t, --topic <topic>
                Topic
    kafcat-produce 
    
    USAGE:
        kafcat {produce, -P} [OPTIONS] --topic <topic>
    
    FLAGS:
        -h, --help       Prints help information
        -V, --version    Prints version information
    
    OPTIONS:
        -b, --brokers <brokers>        Broker list in kafka format [default: localhost:9092]
        -s, --format <format>          Serialize/Deserialize format [default: text]
        -G, --group-id <group-id>      Consumer group id. (Kafka >=0.9 balanced consumer groups)
                                       [default: kafcat]
        -K <key-delimiter>             Delimiter to split input key and message [default: :]
        -D <msg-delimiter>             Delimiter to split input into messages(currently only supports
                                       '\n') [default: 
                                       ]
        -p, --partition <partition>    Partition
        -t, --topic <topic>            Topic
    kafcat-copy 
    Copy mode accepts two parts of arguments <from> and <to>, the two parts are separated by [--].
    <from> is the exact as Consumer mode, and <to> is the exact as Producer mode.
    
    USAGE:
        kafcat copy <from>... [--] <to>...
    
    ARGS:
        <from>...    
        <to>...      
    
    FLAGS:
        -h, --help       Prints help information
        -V, --version    Prints version information
    
    
    

编程风格

  • 尽可能将 git rebase 合并到主分支
  • 推送前合并提交

依赖

~22–34MB
~490K SLoC