4个版本
0.1.3 | 2023年10月22日 |
---|---|
0.1.2 | 2022年4月16日 |
0.1.1 | 2021年3月19日 |
0.1.0 | 2021年2月21日 |
#697 在 解析实现
51KB
1K SLoC
kafcat
Kafcat是Rust语言完全异步重写的kafkacat。
我试图将远程服务器上的kafka数据复制到本地的localhost进行测试。Kafkacat是唯一可用的工具来完成这项任务。然而,它是用C语言编写的,并且完全不支持二进制数据。我正在处理一个pull request,但尚未合并,代码难以维护且易于出错。因此,我编写了自己的kafcat。
特性
它模仿了kafkacat的命令行使用方式,但具有额外的功能。它可以
- 从stdin读取并发送到kafka主题
- 从kafka读取到stdin
- 从一个主题复制到另一个主题(甚至在不同服务器上)
- 以json格式导入/导出数据
- 可更换的后端(目前仅支持librdkafka,kafka-rust即将推出)
缺点
Kafcat仍处于早期开发阶段,因此
-
尚不支持TLS
-
尚不支持Apache AVRO
-
尚不支持复杂的输入/输出格式
基本用法
-
编译kafcat。你可以在target/release/kafcat找到二进制文件
cargo build --release
-
确保Kafka正在运行(我们假设在localhost:9092)。你可以使用
docker-compose -f tests/plaintext-server.yml up
(注意,在MacOS上,docker中的kafka存在问题,我甚至无法通过docker设置适当的测试环境) -
设置监听器,假设kafcat位于当前目录
./kafcat -C --topic test
-
在另一个终端中设置生产者
./kafcat -P --topic test
-
复制一个主题。格式为
./kafcat copy <from> - <to>
,from
和to
与消费者和生产者使用的完全相同./kafcat copy --topic test -- --topic test2
-
使用默认分隔符
:
输入任何键和值。例如,hello:world
-
你应该在消费者终端中看到
hello:world
-
详细帮助信息可以在
./kafcat --help
,./kafcat -C --help
,./kafcat -P --help
等命令中找到。您还可以查看 kafkacat 以获取参考。kafcat-consume USAGE: kafcat {consume, -C} [FLAGS] [OPTIONS] --topic <topic> FLAGS: -e, --exit Exit successfully when last message received -h, --help Prints help information -V, --version Prints version information OPTIONS: -b, --brokers <brokers> Broker list in kafka format [default: localhost:9092] -s, --format <format> Serialize/Deserialize format [default: text] -G, --group-id <group-id> Consumer group id. (Kafka >=0.9 balanced consumer groups) [default: kafcat] -K <key-delimiter> Delimiter to split input key and message [default: :] -D <msg-delimiter> Delimiter to split input into messages(currently only supports '\n') [default: ] -o <offset> Offset to start consuming from: beginning | end | stored | <value> (absolute offset) | -<value> (relative offset from end) s@<value> (timestamp in ms to start at) e@<value> (timestamp in ms to stop at (not included))[default: beginning] -p, --partition <partition> Partition -t, --topic <topic> Topic kafcat-produce USAGE: kafcat {produce, -P} [OPTIONS] --topic <topic> FLAGS: -h, --help Prints help information -V, --version Prints version information OPTIONS: -b, --brokers <brokers> Broker list in kafka format [default: localhost:9092] -s, --format <format> Serialize/Deserialize format [default: text] -G, --group-id <group-id> Consumer group id. (Kafka >=0.9 balanced consumer groups) [default: kafcat] -K <key-delimiter> Delimiter to split input key and message [default: :] -D <msg-delimiter> Delimiter to split input into messages(currently only supports '\n') [default: ] -p, --partition <partition> Partition -t, --topic <topic> Topic kafcat-copy Copy mode accepts two parts of arguments <from> and <to>, the two parts are separated by [--]. <from> is the exact as Consumer mode, and <to> is the exact as Producer mode. USAGE: kafcat copy <from>... [--] <to>... ARGS: <from>... <to>... FLAGS: -h, --help Prints help information -V, --version Prints version information
编程风格
- 尽可能将
git rebase
合并到主分支 - 推送前合并提交
依赖
~22–34MB
~490K SLoC