6 个版本
使用旧的 Rust 2015
0.1.5 | 2017年7月24日 |
---|---|
0.1.4 | 2017年7月24日 |
#6 in #sqs
130KB
3K SLoC
sqs-service-handler
一个用于编写基于 SQS 消费者的服务的库。
功能
- 自动处理可见性超时
- 自动消费者节流/扩展
- 自动使用 AWS 批量 API
- 并行消息处理
可见性超时处理
当从其队列接收到 SQS 消息时,它在设定的时间内保持不可见,该时间由 SQS 队列或消息定义。默认为 30 秒。
当超时到期时,消息将带有新的收据重新出现在队列中。
如果消息的处理时间超过其可见性超时,那么,如果没有额外措施,消息可能会被处理多次。
为了防止这个问题,框架将自动启动后台服务来更新超时。目标是尽可能降低超时,以便在处理过程中出现错误时,它将快速出现在队列上,同时确保无论处理消息所需的时间如何,它都将保持不可见。
当前的算法大致如下
- 消费者接收一个消息
- 消息在 MessageStateManager 中注册
- 消息收据被发送到 VisibilityExtenderBuffer,当它达到十个收据或超时时会刷新。
- 现在分组在一起的收据达到实际的 VisibilityTimeoutExtender,它使用批量 API 调用来更新消息的可见性。
在这个过程中,跟踪消息从消费者到扩展器的所需时间,并将其发送到节流器。节流器使用这个时间来确定在快速处理消息以保持其不可见状态的同时,服务可以处理的最多飞行中的消息数量。
值得注意的是,在网络故障的情况下,如果消息可见性更新或删除在超时期间失败,将会发生多次消息处理。这是 SQS 所选择的至少一次处理模型的一个限制。
消费者节流/扩展
如前所述,服务跟踪消息收据从消费者到可见性扩展器所需的时间。
这是服务的瓶颈。
基于这个中位数时间,服务将扩展消费者,以及实施节流 - 每个消费者在 API 调用之间需要等待的一段时间(这考虑了调用延迟,因此将这个等待期保持在最小)。
使用专门的 StreamingMedian 结构来有效地计算最后 63 条消息的中位数。
这种节流/缩放应该意味着你始终在处理框架可以处理的最多消息量,但不超过这个量。
自动批量API
SQS支持使用批量API来减少I/O和成本。这可以使每个使用的批量API的成本降低10倍。
默认情况下,消费者将长轮询20秒,最多获取10条消息。
这个库将自动缓冲删除和可见性更新请求,从而增加批量API的使用频率。
并行消息处理
一旦您向框架提供了MessageHandler,它将分散处理器到线程上(目前由用户定义)。
处理器使用基于MPMC队列的工作窃取算法。
依赖关系
~24–34MB
~644K SLoC