2个版本

0.1.2 2022年5月17日
0.1.1 2022年5月16日
0.1.0 2022年5月13日

#1200 in 异步

MIT/Apache

47KB
946

Async DAG

Crate Crates.io API

async_dag 是一个异步任务调度工具。

当异步任务及其依赖关系可以用 DAG 来描述时,这个crate确保任务以最大可能的并行度运行。

示例

假设有几个任务,它们要么生成一个 i32,要么将两个 i32 相加,并且它们的依赖关系由以下图表示,

      7
     / \
    3   \
   / \   \
  1   2   4

这意味着有三个任务生成值 124,一个任务将 12 相加得到 3,还有一个任务将 34 相加得到最终输出,7

一个普通的开发者可能会编写

let _3 = sum(_1.await, _2.await).await;
let _7 = sum(_3, _4.await).await;

上面的代码效率低下,因为每个任务只有在前一个任务完成后才开始。

一个更好的版本将是

let (_1, _2, _4) = join!(_1, _2, _4).await;
let _3 = sum(_1, _2).await;
let _7 = sum(_3, _4).await;

其中 _1_2_4 并行运行。

然而,上述调度仍然不是最优的,因为 _1_2 的求和可以与 _4 并行运行。

为了达到最大并行度,必须编写类似以下的内容

let _1_2 = join!(_1, _2);
let (_3, _4) = select! {
    _3 = _1_2 => {
        (_3, _4.await)
    }
    _4 = _4 => {
        let (_1, _2) = _1_2.await;
        (sum(_1, _2).await, _4)
    }
}
let _7 = sum(_3, _4).await;

代码非常晦涩,如果任务和依赖关系更多,手动调度会很快变得令人疲倦,如果可能的话。

使用 async_dag,可以编写

use async_dag::Graph;

async fn sum(lhs: i32, rhs: i32) -> i32 { lhs + rhs }

async fn run() {
    let mut graph = Graph::new();
    // The closures are not run yet.
    let _1 = graph.add_task(|| async { 1 } );
    let _2 = graph.add_task(|| async { 2 } );
    let _4 = graph.add_task(|| async { 4 } );

    // Sets `_1` as `_3`'s first parameter.
    let _3 = graph.add_child_task(_1, sum, 0).unwrap();
    // Sets `_2` as `_3`'s second parameter.
    graph.update_dependency(_2, _3, 1).unwrap();

    // Sets `_3` as `_7`'s first parameter.
    let _7 = graph.add_child_task(_3, sum, 0).unwrap();
    // Sets `_4` as `_7`'s second parameter.
    graph.update_dependency(_4, _7, 1).unwrap();

    // Runs all the tasks with maximum possible parallelism.
    graph.run().await;

    assert_eq!(graph.get_value::<i32>(_7).unwrap(), 7);
}

use futures::executor::block_on;
block_on(run());

快速失败的图

TryGraph 可以用于用户想要有故障快速策略的可失败任务。

当任何任务完成带有 Err 时,它将终止运行futures。

开发者

pre-commit hook 设置: cargo run --bin install-pre-commit-hook

依赖项

约3MB
约47K SLoC