#task #channel #worker #resilient #reusable #programs #long-lived

sisyphus-tasks

面向长生存期、以工作器为中心的Rust程序的复用工具

2个版本

0.1.1 2023年12月10日
0.1.0 2023年12月7日

#16 in #reusable

Apache-2.0 OR MIT

58KB
951 代码行,不包括注释

Sisyphus Tasks

用于长运行、弹性任务的工具

此库包含我编写、发现有用并希望继续使用的代码。它旨在提供系统

通用的习惯用法侧重于启动长生存期的工作器循环,这些循环使用通道进行通信。

理解此crate

高级示例

pub struct MyWorker{
  pipe: Pipe<WorkItem>,
}

impl Boulder for MyWorker {
  fn spawn(self) -> JoinHandle<Fall<Self>> {
    tokio::spawn(async move {
      // pipe gives refs to items
      while let Some(item) = self.pipe.next().await {
        // worker does work on each item as it becomes available
        self.do_work_on(item);
        // do some async work too :)
        self.async_work_as_well(item).await;
      }
    });
  }
}

let task = MyWorker::new(a_pipe).run_forever();

当前工具

  • Sisyphus

    • 用于启动长生存期、可恢复任务的脚手架系统
    • Boulder 是一个循环、可能失败的任务
    • Boulder::spawn() 方法定义了 Boulder 逻辑,该方法返回一个 JoinHandle
    • Fall 是该任务中的错误。
      • Fall::Recoverable - 任务认为可以恢复的错误
      • Fall::Unrecoverable - 任务认为无法恢复的错误
    • Boulder::run_until_panic 方法处理可恢复的 Fall 的重启,并报告不可恢复的错误
    • Boulder 可以定义自定义恢复或清理逻辑
    • Boulder 也可以引发恐慌。在这种情况下,不会生成 Fall,并且
      • 恐慌向上传播
    • Sisyphus 管理一个 Boulder 循环。它提供了一个接口来观察其状态并中止工作
  • 管道

    • 一个入站和一个出站通道
    • 强制执行一次处理语义
    • 防止工作器错误导致的数据丢失
    • 适用于相对线性的数据处理管道
      • 例如,检索 -> 指标 -> 索引 -> 其他处理
    • 运行通道内容的同步和异步 for_each 的便捷方法

未来工具

  • 从 Sisyphus 列表中实例化复杂管道的抽象层
  • 管道应允许同步和异步转换(入站 T,出站 U

一些代码源自为 Nomad 编写的工具。它根据其许可条款使用和复制。

依赖关系

~11–22MB
~312K SLoC