4 个版本
0.2.0 | 2023 年 12 月 5 日 |
---|---|
0.1.2 | 2023 年 7 月 24 日 |
0.1.1 | 2023 年 7 月 24 日 |
0.1.0 | 2023 年 7 月 23 日 |
#824 in 并发
57KB
968 行
功能点
- 任务的发布和执行
- 多个 worker 同时运行任务
- 任务失败后重试
- worker ping 超时后其他 worker 可以抢占任务
核心模型
Task
- TaskState
- 整体任务状态参数
- Vec<TaskWorkerState>
- TaskOption
- <T>参数
TaskConsumer
- Arc<Func> 因为需要 spawn 到其他线程内执行
任务中 worker state 的状态转换
- INIT 刚发送
- RUNNING 被 worker 占领并执行中
- SUCCESS worker 执行成功
- FAIL worker 执行失败
功能设计
任务发送
不变
任务消费
如何发现任务
- 启动时主动查询一批任务
- 通过 change stream 订阅一批发生变化的任务
- 任务列表空时主动查询一批任务
查询什么任务
- INIT=》需要抢占
- RUNNING=》等待超时后抢占
- 其他 worker 执行失败,但是符合抢占条件=》走复杂判断逻辑
按什么优先级查询任务
目前没有特殊要求, 按任意 key 顺序查询
如何判断任务是否可抢占
- 任务参数限制不可执行
- specific_worker_ids 限制不可执行
- 其他 worker 状态限制
- ping_expire_time 没到, 不可抢占
- 其他 worker info 占据了并发执行数量
- max_unexpected_retries 超出限制, 不执行. 约束: max_unexpected_retries
- concurrent_worker_cnt 限制并行执行数量. 约束: concurrent_worker_cnt
- concurrent_worker_cnt 限制并行执行数量. 约束: concurrent_worker_cnt
- max_unexpected_retries 超出限制, 不执行. 约束: max_unexpected_retries
抢占的流程
- 启动前使用 change stream 监听. 启动后查询一批任务, 查询条件: 任务参数允许 worker 执行 && 任务没有完全成功或失败(根据 info cnt 数量确定).
- 查询后读取 worker info 状态, 用于判断抢占时间点
- 遍历这些任务, 计算任务可抢占的时间点. 设置定时任务在这些时间点进行抢占. PriorityQueue 实现, 有最大长度限制.
- 如果可抢占任务为空, 那么重新查询一批任务直到没有数据返回. 之后完全可以通过 change stream 监听
执行的流程
- 通过 tokio::spawn 隔离报错
- 按 option 里的设置持续 ping
- 任务失败/成功后更新任务状态
任务失败的报错 没有特殊需求, 保持原样
模块划分
- stream 监听. 始终存在
- 批量读取任务. 读取完成后可退出, 直到 stream 监听报错为止(存在时间 gap, 无法判断是否存在新更新的任务). 所以这个由 stream 监听线程启动
- 任务抢占. 监听和批量读取后通过该线程维护 PriorityQueue, 并执行任务抢占
- 任务执行. 任务抢占后 spawn 任务并执行. 执行过程中维护执行 handler 的状态. 任务完成后负责更新任务状态
测试用例
- 一个 worker, 持续运行,
功能 | Bull | Agenda |
---|---|---|
后端 | redis | mongo |
优先级 | ✓ | ✓ |
并发 | ✓ | ✓ |
延迟作业 | ✓ | ✓ |
全局事件 | ✓ | ✓ |
速率限制器 | ✓ | |
暂停/恢复 | ✓ | ✓ |
沙盒 worker | ✓ | ✓ |
可重复作业 | ✓ | ✓ |
原子操作 | ✓ | ~ |
持久性 | ✓ | ✓ |
用户界面 | ✓ | ✓ |
REST API | ✓ | |
中央(可扩展)队列 | ✓ | |
支持长时间运行作业 | ✓ | |
优化于 | 作业/消息 | 作业 |
优雅地停止 | ||
远程停止 | ||
多个 worker |
流程
- 发布任务
- 幂等
worker 状态
- 执行中 worker 占用任务成功后立即进入执行中状态, 并设置下次刷新时间 如果距离上次刷新时间超过 worker_timeout_ms, 那么认为是超时状态
- 失败 worker 主动设置执行状态为 fail. 如果任务失败时 worker 不在列表中, 状态设置会失败
- 成功 worker 主动设置执行状态为 success. 如果任务成功时 worker 不在列表中, 状态设置会失败
- 超时 根据 worker_timeout_ms 推导得到的状态
任务状态
任务状态由当前 worker 列表决定
- 未启动 列表为空
- 执行中 列表中存在执行中 worker
- 成功 列表中只有 success 的 worker
- 失败 列表中只有 fail 的 worker
提供功能
- 发布任务(幂等)查找key对应的没有运行中的任务 没有找到=>setOnInsert 找到了=>不设置任何东西,根据配置决定是否清除失败/成功的worker
- 占用任务(多worker)查找条件:判断是否被指定worker_id,任务状态不是成功,任务没有被当前worker处理过(不能在worker列表中),可以接受更多的worker 排序条件:优先级最高,优先被指定worker_id的 没有找到=>不进行操作 找到了=>增加自己的worker对象,并且过滤worker列表(清除超时的运行中worker)
- 维持任务 查找条件:key相同,worker id相同,找到了=>更新对应的超时时间 没找到=>结束当前任务
- 任务执行成功 尝试更新任务状态为成功
- 任务执行失败 主动返回任务失败,尝试更新任务状态为失败
- 任务执行异常 按option中设置重试,并更新重试次数,如果重试次数超过限制那么更新为任务失败。重试过程只在本地发生,所有重试次数失败后更新为失败状态
配置更新后的影响
- specific_worker_ids变动 如果更新后不允许当前worker执行,那么结束任务
- ping_interval_ms变动 下次ping时生效
- 其他参数变动 不影响正在执行中的任务,变动需要在重新占用任务时体现
具体实现
发布任务
直接发送就行
消费任务
核心问题:下一次什么时候去占用任务
- 启动时计算下一次时间next_try_time
- 使用change_stream实时更新next_try_time
待办事项
-[ ] clean_success 暂不实现,维护中 -[ ] clean_failed 暂不实现,维护中 -[ ] detect compatibility of collection data -[x] auto worker id -[ ] 错误处理
依赖关系
~25–58MB
~1M SLoC