#kafka #thread-pool #plain-text #mtls #ssl #publishing #messages

kafka-threadpool

一个使用SSL(mTLS)或PLAINTEXT协议发布消息到Kafka的异步Rust线程池

13个稳定版本

1.0.12 2022年9月22日
1.0.1 2022年9月21日

#273异步

Download history 3/week @ 2024-06-01 1/week @ 2024-06-08 5/week @ 2024-06-15 3/week @ 2024-06-22

62 每月下载量
用于 restapi

MIT 许可证

67KB
929

Kafka Threadpool for Rust支持mTLS

一个使用SSL(mTLS)或PLAINTEXT协议发布消息到Kafka的异步Rust线程池。

架构

这是一个正在进行中的项目。架构可能会随时间变化。目前这里是最新参考架构。

kafka-threadpool Reference Architecture

背景

有关此存储库的更多信息,请参阅博客文章

配置

支持的环境变量

环境变量名称 目的/值
KAFKA_ENABLED 切换kafka_threadpool的开关:使用true1启用,其他情况禁用threadpool
KAFKA_LOG_LABEL 跟踪标签,将在所有crate日志中显示
KAFKA_BROKERS 逗号分隔的brokers列表(host1:port,host2:port,host3:port
KAFKA_TOPICS 逗号分隔的支持的主题列表
KAFKA_PUBLISH_RETRY_INTERVAL_SEC 每次发布重试前等待的秒数
KAFKA_PUBLISH_IDLE_INTERVAL_SEC 没有消息处理时等待的秒数
KAFKA_NUM_THREADS threadpool的线程数量
KAFKA_TLS_CLIENT_KEY 可选 - kafka mTLS密钥的路径
KAFKA_TLS_CLIENT_CERT 可选 - kafka mTLS证书的路径
KAFKA_TLS_CLIENT_CA 可选 - kafka mTLS证书授权机构(CA)的路径
KAFKA_METADATA_COUNT_MSG_OFFSETS 可选 - 设置为任何非true以跳过偏移量计数

入门指南

在开始之前,请确保您的Kafka集群正在运行。如果您需要运行Kafka集群的帮助,请参阅rust-with-strimzi-kafka-tls仓库以获取更多详细信息。

设置环境变量

您可以创建一个存储环境变量的./env/kafka.env文件,以使您的生产者和消费者保持一致(并准备好使用podman/docker或kubernetes)

export KAFKA_ENABLED=1
export KAFKA_LOG_LABEL="ktp"
export KAFKA_BROKERS="host1:port,host2:port,host3:port"
export KAFKA_TOPICS="testing"
export KAFKA_PUBLISH_RETRY_INTERVAL_SEC="1.0"
export KAFKA_NUM_THREADS="5"
export KAFKA_TLS_CLIENT_CA="PATH_TO_TLS_CA_FILE"
export KAFKA_TLS_CLIENT_CERT="PATH_TO_TLS_CERT_FILE"
export KAFKA_TLS_CLIENT_KEY="PATH_TO_TLS_KEY_FILE"
export KAFKA_METADATA_COUNT_MSG_OFFSETS="true"

加载环境

source ./env/kafka.env

启动Kafka线程池并发布100条消息

包含的./examples/start-threadpool.rs示例将根据环境配置连接到kafka集群,并将100条消息发布到kafka的testing主题。

cargo build --example start-threadpool
export RUST_BACKTRACE=1
export RUST_LOG=info,kafka_threadpool=info,rdkafka=info
./target/debug/examples/start-threadpool

消费消息

要消费从testing主题发布的新测试消息,您可以使用自己的消费者或rust-with-strimzi-kafka-and-tls/examples/run-consumer.rs示例

# from the rust-with-strimzi-kafka-and-tls directory:
cargo build --example run-consumer
export RUST_BACKTRACE=1
export RUST_LOG=info,rdkafka=info
./target/debug/examples/run-consumer -g rust-consumer-testing -t testing

获取所有主题、分区、ISR和偏移量的Kafka集群元数据

运行./examples/get-all-metadata.rs示例

cargo build --example get-all-metadata
export RUST_BACKTRACE=1
export RUST_LOG=info,kafka_threadpool=info,rdkafka=info
./target/debug/examples/get-all-metadata

获取单个主题(包括分区、ISR和偏移量)的Kafka集群元数据

  1. 将主题名称设置为环境变量

    export KAFKA_TOPIC=testing
    
  2. 运行./examples/get-metadata-for-topic.rs示例

    cargo build --example get-metadata-for-topic
    export RUST_BACKTRACE=1
    export RUST_LOG=info,kafka_threadpool=info,rdkafka=info
    ./target/debug/examples/get-metadata-for-topic
    

依赖项

~17–31MB
~440K SLoC