#streaming #messaging #http-transport #access-token #cli #iggy

app iggy-cli

Iggy 是用 Rust 编写的持久消息流平台,支持 QUIC、TCP 和 HTTP 传输协议,每秒可处理数百万条消息

2 个版本

新版本 0.5.12 2024年8月18日
0.5.11 2024年8月16日

#753 in 网络编程

每月下载 39

MIT 许可证

1MB
22K SLoC

Iggy

crates.io crates.io docs workflow coverage dependency x discord-badge


iggy


Iggy 是用 Rust 编写的持久消息流平台,支持 QUIC、TCP(自定义二进制规范)和 HTTP(常规 REST API)传输协议。目前作为单个服务器运行,允许创建流、主题、分区和段,并向它们发送/接收消息。消息以追加日志的形式存储在磁盘上,并在重启之间持久化。

项目的目标是创建一个分布式流平台(作为集群运行),能够水平扩展并处理每秒 数百万条消息(实际上,它已经非常快了,请参见下面的基准测试)。

Iggy 在利用最少的计算资源的同时提供了 异常高的吞吐量和性能

这不是在现有基础设施(如 Kafka 或 SQL 数据库)之上运行的另一个扩展。

Iggy 是一个从底层开始的持久消息流日志,使用低级别 I/O 以实现速度和效率。

名字是意大利灰犬的缩写——小型但极其快速的狗,它们是同类中的佼佼者。就像我的可爱的 Fabio & Cookie ❤️


功能

  • 高性能,用于消息流的持久化只增日志
  • 极高的吞吐量,读写均如此
  • 低延迟和可预测的资源使用,得益于Rust编译语言(无GC)
  • 用户身份验证和授权,具有细粒度权限和PAT(个人访问令牌)
  • 支持多个流、主题和分区
  • 支持多种传输协议(QUIC、TCP、HTTP)
  • 完全可操作的RESTful API,可选择性启用
  • 提供多种语言的客户端SDK
  • 直接处理二进制数据(无强制模式和要求序列化/反序列化)
  • 可配置的服务器功能(例如缓存、段大小、数据刷新间隔、传输协议等)
  • 可以在服务器上存储消费者偏移量
  • 多种轮询消息的方式
    • 通过偏移量(使用索引)
    • 通过时间戳(使用时间索引)
    • 前N/后N条消息
    • 为特定消费者获取下N条消息
  • 支持自动提交偏移量(例如,以实现最多一次投递)
  • 消费者组提供消息排序和连接客户端的水平扩展
  • 消息过期,根据可配置的保留策略自动删除
  • 其他功能,如服务器端消息去重
  • 多租户支持,得益于的概念,将主题分组
  • TLS支持所有传输协议(TCP、QUIC、HTTPS)
  • 可选的客户端和服务器端数据加密,使用AES-256-GCM
  • 可选支持元数据,以消息头的形式
  • 内置的CLI用于管理流服务器
  • 内置的基准测试应用以测试性能
  • 单二进制部署(无外部依赖)
  • 作为单个节点运行(尚不支持集群)

路线图

  • 底层优化(零拷贝等)
  • 无共享设计和支持io_uring
  • 集群和数据复制
  • 高级Web UI
  • 支持多种语言的开发者友好SDK
  • 插件和扩展支持

有关当前进度的详细信息,请参阅项目板


支持的语言SDK(工作中)


CLI

全新的、丰富的、交互式的CLI正在cli项目下实现,以提供最佳的开发者体验。这将是对Web UI的重大补充,尤其是对于所有偏好使用控制台工具的开发者。

CLI

Web UI

正在进行构建服务器管理Web UI的工作,这将允许管理流、主题、分区、消息等。请查看Web UI存储库

Web UI


Docker

在存储库的根目录中可以找到Dockerfiledocker-compose。要构建和启动服务器,请运行:docker compose up

此外,您可以通过执行以下命令在运行容器中运行CLIdocker exec -it iggy-server /iggy

请注意,如果在除Linux以外的操作系统上运行容器,其中Docker在VM中运行,可能会引起显著的性能下降。

官方镜像可以在这里找到,只需输入:docker pull iggyrs/iggy


配置

默认配置可在 server.toml 文件中找到(默认文件)或在 configs 目录下的 server.json 文件中。

配置文件是从当前工作目录加载的,但您可以通过设置 IGGY_CONFIG_PATH 环境变量来指定配置文件的路径,例如 export IGGY_CONFIG_PATH=configs/server.json(或其他命令,取决于操作系统)。

有关配置文件的详细文档,请参阅配置部分。


快速入门

构建项目(较长的编译时间是由于在发布LTO配置中启用了profile

cargobuild

运行测试

cargotest

启动服务器

cargor --biniggy-server

请注意,以下所有命令都使用了iggy二进制文件,它是发布(cli子组件)的一部分。

使用默认凭据和tcp传输(可用传输:quictcphttp,默认tcp)创建名为dev的流(服务器将自动分配数字ID)。

cargor --biniggy ----transport tcp --username iggy --password iggy stream create dev

列出可用流

cargor --biniggy ----username iggy --password iggy stream list

获取dev流详情

cargor --biniggy ---u iggy -p iggy stream get dev

为流dev创建名为sample的主题(服务器将自动分配数字ID),有2个分区(ID 1和2),禁用压缩(none)和禁用消息过期(省略可选参数)

cargor --biniggy ---u iggy -p iggy topic create dev sample 2 none

列出流dev的可用主题

cargor --biniggy ---u iggy -p iggy topic list dev

获取流dev中主题sample的详细信息

cargor --biniggy ---u iggy -p iggy topic get dev sample

将消息“hello world”(消息ID 1)发送到流dev的主题sample和分区1

cargor --biniggy ---u iggy -p iggy message send --partition-id 1 dev sample"hello world"

将另一条消息“lorem ipsum”(消息ID 2)发送到相同的流、主题和分区

cargor --biniggy ---u iggy -p iggy message send --partition-id 1 dev sample"lorem ipsum"

使用ID为1的常规消费者从流dev的主题sample和ID为1的分区中轮询消息,起始偏移量为0,消息数量为2,不自动提交(在服务器上存储消费者偏移量)

cargor --biniggy ---u iggy -p iggy message poll --consumer 1 --offset 0 --message-count 2 --auto-commit dev sample 1

最后,重启服务器以查看它是否能够加载持久化数据。

HTTP API端点可以在server.http文件中找到,可以使用VS Code的REST Client扩展使用。

要查看CLI/服务器的详细日志,请使用RUST_LOG=trace环境变量运行它。请参阅下面的图片

files structure文件结构

服务器服务器


示例

您可以在 examples 目录下找到示例消费者和生成器应用程序。这些应用程序的目的是展示客户端 SDK 的使用方法。要了解更多关于构建应用程序的信息,请参阅入门指南

要运行示例,首先使用以下命令启动服务器:cargo r --bin iggy-server,然后分别使用以下命令运行生成器和消费者应用程序:cargo r --example message-envelope-producercargo r --example message-envelope-consumer

您可能同时启动多个生成器和消费者,以查看消息如何在多个客户端之间被处理。检查 Args 结构以查看可用的选项,例如传输协议、流、主题、分区、消费者 ID、消息大小等。

默认情况下,消费者将使用 next 可用偏移量轮询消息,并启用自动提交,以将偏移量存储在服务器上。使用这种方法,您可以轻松实现 最多一次 的交付。

sample


SDK

Iggy 附带 Rust SDK,可在 crates.io 上找到。

SDK 提供了特定传输的低级别客户端,包括消息发送和轮询以及所有管理操作,如管理流、主题、用户等,以及高级客户端,该客户端抽象了低级别细节,并为消息生成器和消费者提供了易于使用的 API。

您可以在 examples 目录下找到更多示例,包括多租户示例。

// Create the Iggy client
let client = IggyClient::from_connection_string("iggy://user:secret@localhost:8090")?;

// Create a producer for the given stream and one of its topics
let mut producer = client
    .producer("dev01", "events")?
    .batch_size(1000)
    .send_interval(IggyDuration::from_str("1ms")?)
    .partitioning(Partitioning::balanced())
    .build();

producer.init().await?;

// Send some messages to the topic
let messages = vec![Message::from_str("Hello Iggy.rs")?];
producer.send(messages).await?;

// Create a consumer for the given stream and one of its topics
let mut consumer = client
	.consumer_group("my_app", "dev01", "events")?
	.auto_commit(AutoCommit::IntervalOrWhen(
		IggyDuration::from_str("1s")?,
		AutoCommitWhen::ConsumingAllMessages,
	))
	.create_consumer_group_if_not_exists()
	.auto_join_consumer_group()
	.polling_strategy(PollingStrategy::next())
	.poll_interval(IggyDuration::from_str("1ms")?)
	.batch_size(1000)
	.build();

consumer.init().await?;

// Start consuming the messages
while let Some(message) = consumer.next().await {
    // Handle the message
}

基准测试

要基准测试项目,首先以发布模式构建项目

cargo build --release

然后,使用所需选项运行基准测试应用程序

  1. 轮询(读取)基准测试

    cargo r --bin iggy-bench -r -- -c -v send tcp
    
  2. 发送(写入)基准测试

    cargo r --bin iggy-bench -r -- -c -v poll tcp
    
  3. 并行发送和轮询基准测试

    cargo r --bin iggy-bench -r -- -c -v send-and-poll tcp
    
  4. 使用消费者组的轮询

    cargo r --bin iggy-bench -r -- -c -v send --streams 1 --partitions 10 --disable-parallel-producers tcp
    
    cargo r --bin iggy-bench -r -- -c -v consumer-group-poll tcp
    

这些基准测试将启动具有默认配置的服务器,创建流、主题和分区,然后发送或轮询消息。默认配置针对最佳性能进行了优化,因此您可能需要根据您的需求对其进行调整。如果您需要更多选项,请参阅 iggy-bench 子命令的 helpexamples。例如,要为已启动的服务器运行基准测试,请提供附加参数 --server-address 0.0.0.0:8090

根据硬件、传输协议(quictcphttp)和有效负载大小(messages-per-batch * message-size),您可能期望写入吞吐量超过 3000 MB/s(例如每秒 3M 个 1 KB 消息),读取吞吐量为 10000 MB/s。这些结果是在 Ryzen 9 7950X(64 GB RAM 和 gen 4 NVMe SSD)上实现的。


依赖关系

~30–49MB
~1M SLoC