5 个不稳定版本
0.5.0 | 2024 年 4 月 24 日 |
---|---|
0.3.2 | 2023 年 9 月 5 日 |
0.3.0 | 2023 年 7 月 11 日 |
0.2.1 | 2023 年 5 月 7 日 |
0.2.0 | 2023 年 3 月 25 日 |
1064 在 并发 中排名 #1064
1,911 每月下载量
用于 5 个 Crates (3 个直接使用)
170KB
4K SLoC
sea-streamer-redis
: Redis 后端
这是 SeaStreamer 的 Redis 后端实现。此 crate 提供了一个高层的异步 API,在 Redis Streams 上使用 Redis 时可以避免错误
- 实现了熟悉的 SeaStreamer 抽象接口
- 一个全面的类型系统,通过 API 引导/限制您
- 高级 API,因此您不再需要调用
XADD
、XREAD
或XACK
- 无锁实现:通过消息传递实现并发
- 流水线
XADD
和分页XREAD
,吞吐量可达每秒 100k 消息
虽然我们希望提供类似 Kafka 的客户端体验,但 Redis 和 Kafka 之间有一些根本的不同之处
- 在 Redis 中,序列号不是连续的
- 在 Kafka 中,序列号是连续的
- 在 Redis 中,消息按照“先到先得”的方式分配给消费者组中的成员,这导致下一个问题
- 在 Kafka 中,消费者组中的消费者与分片之间是一对一
- 在 Redis 中,每个消息都需要进行
ACK
- 在 Kafka 中,只需要一个 Ack (read-up-to) 就可以完成一系列读取
已实现的功能
- 实时模式,带有自动重置流
- 可恢复模式,带有自动 Ack 和/或自动提交
- 负载均衡模式,带有故障转移行为
- 时间点回溯/重放
- 基本的流分片:将流拆分为多个子流
最好通过查看 测试 来了解不同的流行为。
SeaStreamer 如何提供更好的并发性?
考虑以下简单的流处理器
loop {
let input = XREAD.await;
let output = process(input).await;
XADD(output).await;
}
当它正在读取或写入时,它不会处理。因此,它在空闲时浪费了时间,并且以更高的延迟读取消息,这反过来又限制了吞吐量。此外,读取的理想批大小可能不是写入的理想批大小。
SeaStreamer 将读写循环与您的进程循环分离,因此它们都可以并行发生(在 Rust 中,async 是多线程的,因此它实际上是并行的)!
如果您正在从消费者组读取,您还需要考虑何时确认以及在一次请求中批处理多少个确认。SeaStreamer 可以在常规间隔内后台提交,或者您可以在不阻塞您的进程循环的情况下异步提交。
在未来,我们希望支持 Redis 集群,因为不集群的分区并不是很有用。目前它还在开发中。这是一项相当困难的任务,因为客户端在使用集群时必须承担责任。在 Redis 中,分片和节点是 M-N 映射关系 - 分片可以在任何时间在节点之间移动。这使得测试变得更加困难。如果您愿意提供帮助,请告诉我们!
还有一个小型实用程序可以将 Redis Streams 消息转储到 SeaStreamer 文件中。
此crate基于redis
构建。
依赖项
~4–18MB
~271K SLoC