13个稳定版本
1.0.12 | 2022年9月22日 |
---|---|
1.0.1 | 2022年9月21日 |
#273 在 异步
62 每月下载量
用于 restapi
67KB
929 行
Kafka Threadpool for Rust支持mTLS
一个使用SSL(mTLS)或PLAINTEXT协议发布消息到Kafka的异步Rust线程池。
架构
这是一个正在进行中的项目。架构可能会随时间变化。目前这里是最新参考架构。
背景
有关此存储库的更多信息,请参阅博客文章。
配置
支持的环境变量
环境变量名称 | 目的/值 |
---|---|
KAFKA_ENABLED | 切换kafka_threadpool的开关:使用true或 |
KAFKA_LOG_LABEL | 跟踪标签,将在所有crate日志中显示 |
KAFKA_BROKERS | 逗号分隔的brokers列表(host1:port,host2:port,host3:port ) |
KAFKA_TOPICS | 逗号分隔的支持的主题列表 |
KAFKA_PUBLISH_RETRY_INTERVAL_SEC | 每次发布重试前等待的秒数 |
KAFKA_PUBLISH_IDLE_INTERVAL_SEC | 没有消息处理时等待的秒数 |
KAFKA_NUM_THREADS | threadpool的线程数量 |
KAFKA_TLS_CLIENT_KEY | 可选 - kafka mTLS密钥的路径 |
KAFKA_TLS_CLIENT_CERT | 可选 - kafka mTLS证书的路径 |
KAFKA_TLS_CLIENT_CA | 可选 - kafka mTLS证书授权机构(CA)的路径 |
KAFKA_METADATA_COUNT_MSG_OFFSETS | 可选 - 设置为任何非true以跳过偏移量计数 |
入门指南
在开始之前,请确保您的Kafka集群正在运行。如果您需要运行Kafka集群的帮助,请参阅rust-with-strimzi-kafka-tls仓库以获取更多详细信息。
设置环境变量
您可以创建一个存储环境变量的./env/kafka.env
文件,以使您的生产者和消费者保持一致(并准备好使用podman/docker或kubernetes)
export KAFKA_ENABLED=1
export KAFKA_LOG_LABEL="ktp"
export KAFKA_BROKERS="host1:port,host2:port,host3:port"
export KAFKA_TOPICS="testing"
export KAFKA_PUBLISH_RETRY_INTERVAL_SEC="1.0"
export KAFKA_NUM_THREADS="5"
export KAFKA_TLS_CLIENT_CA="PATH_TO_TLS_CA_FILE"
export KAFKA_TLS_CLIENT_CERT="PATH_TO_TLS_CERT_FILE"
export KAFKA_TLS_CLIENT_KEY="PATH_TO_TLS_KEY_FILE"
export KAFKA_METADATA_COUNT_MSG_OFFSETS="true"
加载环境
source ./env/kafka.env
启动Kafka线程池并发布100条消息
包含的./examples/start-threadpool.rs示例将根据环境配置连接到kafka集群,并将100条消息发布到kafka的testing
主题。
cargo build --example start-threadpool
export RUST_BACKTRACE=1
export RUST_LOG=info,kafka_threadpool=info,rdkafka=info
./target/debug/examples/start-threadpool
消费消息
要消费从testing
主题发布的新测试消息,您可以使用自己的消费者或rust-with-strimzi-kafka-and-tls/examples/run-consumer.rs示例
# from the rust-with-strimzi-kafka-and-tls directory:
cargo build --example run-consumer
export RUST_BACKTRACE=1
export RUST_LOG=info,rdkafka=info
./target/debug/examples/run-consumer -g rust-consumer-testing -t testing
获取所有主题、分区、ISR和偏移量的Kafka集群元数据
运行./examples/get-all-metadata.rs示例
cargo build --example get-all-metadata
export RUST_BACKTRACE=1
export RUST_LOG=info,kafka_threadpool=info,rdkafka=info
./target/debug/examples/get-all-metadata
获取单个主题(包括分区、ISR和偏移量)的Kafka集群元数据
-
将主题名称设置为环境变量
export KAFKA_TOPIC=testing
-
运行./examples/get-metadata-for-topic.rs示例
cargo build --example get-metadata-for-topic export RUST_BACKTRACE=1 export RUST_LOG=info,kafka_threadpool=info,rdkafka=info ./target/debug/examples/get-metadata-for-topic
依赖项
~17–31MB
~440K SLoC