#mongo-db #task #worker #schedule #running

mscheduler

使用 MongoDB 来调度任务运行

4 个版本

0.2.0 2023 年 12 月 5 日
0.1.2 2023 年 7 月 24 日
0.1.1 2023 年 7 月 24 日
0.1.0 2023 年 7 月 23 日

#824 in 并发

MIT/Apache

57KB
968

MIT licensed Build Status

功能点

  1. 任务的发布和执行
  2. 多个 worker 同时运行任务
  3. 任务失败后重试
  4. worker ping 超时后其他 worker 可以抢占任务

核心模型

Task

  1. TaskState
    1. 整体任务状态参数
    2. Vec<TaskWorkerState>
  2. TaskOption
  3. <T>参数

TaskConsumer

  1. Arc<Func> 因为需要 spawn 到其他线程内执行

任务中 worker state 的状态转换

  1. INIT 刚发送
  2. RUNNING 被 worker 占领并执行中
  3. SUCCESS worker 执行成功
  4. FAIL worker 执行失败

功能设计

任务发送

不变

任务消费

如何发现任务

  1. 启动时主动查询一批任务
  2. 通过 change stream 订阅一批发生变化的任务
  3. 任务列表空时主动查询一批任务

查询什么任务

  1. INIT=》需要抢占
  2. RUNNING=》等待超时后抢占
  3. 其他 worker 执行失败,但是符合抢占条件=》走复杂判断逻辑

按什么优先级查询任务

目前没有特殊要求, 按任意 key 顺序查询

如何判断任务是否可抢占

  1. 任务参数限制不可执行
    1. specific_worker_ids 限制不可执行
  2. 其他 worker 状态限制
    1. ping_expire_time 没到, 不可抢占
    2. 其他 worker info 占据了并发执行数量
      1. max_unexpected_retries 超出限制, 不执行. 约束: max_unexpected_retries
      2. concurrent_worker_cnt 限制并行执行数量. 约束: concurrent_worker_cnt

抢占的流程

  1. 启动前使用 change stream 监听. 启动后查询一批任务, 查询条件: 任务参数允许 worker 执行 && 任务没有完全成功或失败(根据 info cnt 数量确定).
  2. 查询后读取 worker info 状态, 用于判断抢占时间点
  3. 遍历这些任务, 计算任务可抢占的时间点. 设置定时任务在这些时间点进行抢占. PriorityQueue 实现, 有最大长度限制.
  4. 如果可抢占任务为空, 那么重新查询一批任务直到没有数据返回. 之后完全可以通过 change stream 监听

执行的流程

  1. 通过 tokio::spawn 隔离报错
  2. 按 option 里的设置持续 ping
  3. 任务失败/成功后更新任务状态

任务失败的报错 没有特殊需求, 保持原样

模块划分

  1. stream 监听. 始终存在
  2. 批量读取任务. 读取完成后可退出, 直到 stream 监听报错为止(存在时间 gap, 无法判断是否存在新更新的任务). 所以这个由 stream 监听线程启动
  3. 任务抢占. 监听和批量读取后通过该线程维护 PriorityQueue, 并执行任务抢占
  4. 任务执行. 任务抢占后 spawn 任务并执行. 执行过程中维护执行 handler 的状态. 任务完成后负责更新任务状态

测试用例

  1. 一个 worker, 持续运行,
功能 Bull Agenda
后端 redis mongo
优先级
并发
延迟作业
全局事件
速率限制器
暂停/恢复
沙盒 worker
可重复作业
原子操作 ~
持久性
用户界面
REST API
中央(可扩展)队列
支持长时间运行作业
优化于 作业/消息 作业
优雅地停止
远程停止
多个 worker

流程

  1. 发布任务
    1. 幂等

worker 状态

  1. 执行中 worker 占用任务成功后立即进入执行中状态, 并设置下次刷新时间 如果距离上次刷新时间超过 worker_timeout_ms, 那么认为是超时状态
  2. 失败 worker 主动设置执行状态为 fail. 如果任务失败时 worker 不在列表中, 状态设置会失败
  3. 成功 worker 主动设置执行状态为 success. 如果任务成功时 worker 不在列表中, 状态设置会失败
  4. 超时 根据 worker_timeout_ms 推导得到的状态

任务状态

任务状态由当前 worker 列表决定

  1. 未启动 列表为空
  2. 执行中 列表中存在执行中 worker
  3. 成功 列表中只有 success 的 worker
  4. 失败 列表中只有 fail 的 worker

提供功能

  1. 发布任务(幂等)查找key对应的没有运行中的任务 没有找到=>setOnInsert 找到了=>不设置任何东西,根据配置决定是否清除失败/成功的worker
  2. 占用任务(多worker)查找条件:判断是否被指定worker_id,任务状态不是成功,任务没有被当前worker处理过(不能在worker列表中),可以接受更多的worker 排序条件:优先级最高,优先被指定worker_id的 没有找到=>不进行操作 找到了=>增加自己的worker对象,并且过滤worker列表(清除超时的运行中worker)
  3. 维持任务 查找条件:key相同,worker id相同,找到了=>更新对应的超时时间 没找到=>结束当前任务
  4. 任务执行成功 尝试更新任务状态为成功
  5. 任务执行失败 主动返回任务失败,尝试更新任务状态为失败
  6. 任务执行异常 按option中设置重试,并更新重试次数,如果重试次数超过限制那么更新为任务失败。重试过程只在本地发生,所有重试次数失败后更新为失败状态

配置更新后的影响

  1. specific_worker_ids变动 如果更新后不允许当前worker执行,那么结束任务
  2. ping_interval_ms变动 下次ping时生效
  3. 其他参数变动 不影响正在执行中的任务,变动需要在重新占用任务时体现

具体实现

发布任务

直接发送就行

消费任务

核心问题:下一次什么时候去占用任务

  1. 启动时计算下一次时间next_try_time
  2. 使用change_stream实时更新next_try_time

待办事项

-[ ] clean_success 暂不实现,维护中 -[ ] clean_failed 暂不实现,维护中 -[ ] detect compatibility of collection data -[x] auto worker id -[ ] 错误处理

依赖关系

~25–58MB
~1M SLoC