#redis-queue #queue #redis #nodejs #cross-platform #platform #work

redis-work-queue

基于 redis 数据库的工作队列,实现语言包括 Python、Rust、Go、Node.js (TypeScript) 和 Dotnet (C#)

6 个版本

0.1.6 2023 年 8 月 17 日
0.1.5 2023 年 7 月 25 日
0.1.3 2023 年 4 月 23 日
0.1.1 2023 年 3 月 29 日

696异步

Download history 30/week @ 2024-04-08 9/week @ 2024-05-13 17/week @ 2024-05-20 20/week @ 2024-05-27 7/week @ 2024-06-03 7/week @ 2024-06-17 68/week @ 2024-06-24 6/week @ 2024-07-01 63/week @ 2024-07-08 124/week @ 2024-07-15 41/week @ 2024-07-22

每月 235 次下载

MIT 许可证

29KB
238

Redis Work Queue

基于 redis 数据库的工作队列,实现语言包括 Python、Rust、Go、Node.js (TypeScript) 和 Dotnet (C#)。

此库不提供跟踪工作项结果的方法。这可以通过自己实现(只需将结果存储在 redis 数据库中,使用工作项 id 派生的键即可)来实现。如果您需要一个功能更全面的工作管理系统,请参阅我们的 集合管理器

欢迎其他语言实现,提交 PR 吧!

文档

除了下面的主要概述外,每个实现都有其自己的示例和 API 参考。

  • Python: PyPI Docs MIT License

  • Rust: Crates.io docs.rs MIT License

  • Go: Go Report Card GoDoc MIT License

  • DotNet (C#): Nuget Docs MIT License

  • Node.js (TypeScript): NPM MIT License

概述

所有实现都共享相同的操作,在相同的核心类型上,这些是

项目

工作队列中的项目由一个 id(ID)、一个字符串和一些 data(数据)组成,任意字节数据。

为了方便,ID 常常是随机生成的 UUID,但也可以自定义。具有与先前项目相同 ID 的另一个项目不应添加,直到先前项目已完成。

添加项目

Python: WorkQueue.add_item,Rust: WorkQueue::add_item,Node.js: WorkQueue::add_item,Go: WorkQueue.AddItem

添加项目就是其名字的含义!它将一个项目添加到工作队列中。然后该项目将在队列中或在处理中(如果处理失败则返回队列)直到任务完成。

租赁项目

Python: WorkQueue.lease,Rust: WorkQueue::lease,Node.js: WorkQueue::lease,Go: WorkQueue.Lease

希望接收工作并完成工作的工作者必须先获取一个租赁。

在请求租赁时,你用项目的过期时间来交换。工作者应该在该项目过期之前通过调用 complete 来完成该项目。如果在规定时间内没有调用 complete,则假定工作者已死亡,该项目将返回队列供其他工作者拾取。

这意味着一个工作进程可以接收到另一个工作进程已经部分或全部完成(然后在调用 complete 之前死亡)的工作,甚至如果租约过期时间太短,两个工作进程可能同时处理同一项工作(如果可能,尽量避免这种情况!)。因此,重要的是要确保工作进程的编写方式不会在另一个工作进程已经完全或部分完成任务,或同时处理它的情况下引起问题。这允许系统具有完全的弹性。

工作队列一旦添加了工作,就不会丢失跟踪,因此只要工作进程持续成功工作,工作就会一直运行到完成(即使在这个过程中多次运行)。

如果您对工作多次运行不满意,请参阅 但我永远不会想我的工作运行超过一次

存储工作项的结果

工作队列不提供跟踪工作项结果的方法。这自己实现起来相当简单(只需将结果存储在由工作项 ID 派生的 Redis 数据库中的键)。如果您需要一个功能更完善的工作管理系统,请参阅我们的 Collection Manager

处理错误

如果发生错误并且工作应在以后由相同或不同的工作进程重试,则工作进程不应该调用 complete,而应获取另一个租约并处理下一个项目,忽略它之前正在处理的项目。当之前的租约到期时,它将被返回到工作队列,并重新尝试。例如

while True:
    job = work_queue.lease(100)
    # ... do some work ...
    if should_try_again_later:
        # Don't call complete, just get another lease
        continue
    # ... finish the work ...
    work_queue.complete(job)

如果发生错误,意味着工作不应重试,您应将此错误发送到正确的位置(也许和您放置结果的地方相同),然后调用 complete。然后工作将不再运行。

但我永远不会想我的工作运行超过一次

在遵循以下说明之前,您应该非常认真思考标题声明。如果工作不能运行超过一次,那么如果工作进程在工作期间死亡,工作将永远不完整…… 永远…… 永远……(除非您有自己的错误恢复系统)

几乎可以编写所有工作,以便在工作节点死亡时可以重新启动。如果可以,这可能值得努力!

我仍然认为我希望我的工作只可能运行一次

如果这种情况成立,您应该在获得租约后立即调用 complete并检查返回值)。

例如,在 Python 中

job = queue.lease(1000)
if queue.complete(job):
    # This will only run once, per job, ever, even if the worker dies
    foo(job)

这之所以有效,是因为 complete 返回 true 当且仅当 它是完成工作的进程。因此,尽管 lease 可能多次返回相同的工作,但 complete(job) 将在每个工作项中只返回一次 true

完成项

Python: WorkQueue.complete,Node.js: WorkQueue.Complete,Rust: WorkQueue::complete,Go: WorkQueue.Complete

完成将工作标记为已完成,并将其从工作队列中删除。在调用 complete(并返回 true)之后,没有任何工作进程将再次收到此工作。

complete 返回一个布尔值,表示 工作是否已被删除当前工作进程是第一个调用 complete 的工作进程。因此,尽管租约可能会将相同的工作分配给多个工作进程,但 complete 仅对单个工作进程返回 true

存储结果

请参阅 存储工作项的结果

清理

轻量级清理

Python: WorkQueue.light_clean,Rust 实现计划中,没有 Go 或 C# 实现计划

当工作进程在处理工作过程中死亡或放弃工作时,该工作将保留在处理状态,直到其过期。轻量级清理的作用是将这些工作移回到主工作队列,以便其他工作进程可以取走。

应运行的 轻量级清理 间隔应大约等于您使用的最短租约时间。

深度清理

Python 和 Rust 实现计划中,没有 Go 或 C# 实现计划

此外,工作进程在调用 complete 过程中死亡,可能会留下不再与活动工作相关联的数据库项。深度清理的任务是遍历这些键,并确保数据库清洁。

深度清理很少需要,但如果非常不幸,则可能会发生。因此,它应该自动运行,但频率较低。

我们提供的清理过程默认每 6 小时运行一次。

清理过程

当存在许多不同类型的工作进程时,只运行一个专门的清理进程会更简单。我们提供了一种简单的清理器,在 Python 和 Rust 中都提供。

其他操作

获取队列长度

Python: WorkQueue.queue_len,Rust: WorkQueue::queue_len,Go: WorkQueue.QueueLen,Node.js: WorkQueue.queueLen

获取租用项的数量

Python: WorkQueue.processing,Rust: WorkQueue::processing,Node.js: WorkQueue.processing,Go: WorkQueue.QueueLen

这包括正在处理的项目和已放弃的项目(请参阅 处理错误),但尚未返回到主队列。

测试

每个客户端实现都有自己的(非常简单)单元测试。大部分测试都是通过集成测试完成的,位于 tests 目录中。

依赖关系

~12–23MB
~360K SLoC