2个版本
0.1.2 | 2022年5月17日 |
---|---|
0.1.1 |
|
0.1.0 | 2022年5月13日 |
#1200 in 异步
47KB
946 行
Async DAG
async_dag
是一个异步任务调度工具。
当异步任务及其依赖关系可以用 DAG 来描述时,这个crate确保任务以最大可能的并行度运行。
示例
假设有几个任务,它们要么生成一个 i32
,要么将两个 i32
相加,并且它们的依赖关系由以下图表示,
7
/ \
3 \
/ \ \
1 2 4
这意味着有三个任务生成值 1
、2
和 4
,一个任务将 1
和 2
相加得到 3
,还有一个任务将 3
和 4
相加得到最终输出,7
。
一个普通的开发者可能会编写
let _3 = sum(_1.await, _2.await).await;
let _7 = sum(_3, _4.await).await;
上面的代码效率低下,因为每个任务只有在前一个任务完成后才开始。
一个更好的版本将是
let (_1, _2, _4) = join!(_1, _2, _4).await;
let _3 = sum(_1, _2).await;
let _7 = sum(_3, _4).await;
其中 _1
、_2
和 _4
并行运行。
然而,上述调度仍然不是最优的,因为 _1
和 _2
的求和可以与 _4
并行运行。
为了达到最大并行度,必须编写类似以下的内容
let _1_2 = join!(_1, _2);
let (_3, _4) = select! {
_3 = _1_2 => {
(_3, _4.await)
}
_4 = _4 => {
let (_1, _2) = _1_2.await;
(sum(_1, _2).await, _4)
}
}
let _7 = sum(_3, _4).await;
代码非常晦涩,如果任务和依赖关系更多,手动调度会很快变得令人疲倦,如果可能的话。
使用 async_dag
,可以编写
use async_dag::Graph;
async fn sum(lhs: i32, rhs: i32) -> i32 { lhs + rhs }
async fn run() {
let mut graph = Graph::new();
// The closures are not run yet.
let _1 = graph.add_task(|| async { 1 } );
let _2 = graph.add_task(|| async { 2 } );
let _4 = graph.add_task(|| async { 4 } );
// Sets `_1` as `_3`'s first parameter.
let _3 = graph.add_child_task(_1, sum, 0).unwrap();
// Sets `_2` as `_3`'s second parameter.
graph.update_dependency(_2, _3, 1).unwrap();
// Sets `_3` as `_7`'s first parameter.
let _7 = graph.add_child_task(_3, sum, 0).unwrap();
// Sets `_4` as `_7`'s second parameter.
graph.update_dependency(_4, _7, 1).unwrap();
// Runs all the tasks with maximum possible parallelism.
graph.run().await;
assert_eq!(graph.get_value::<i32>(_7).unwrap(), 7);
}
use futures::executor::block_on;
block_on(run());
快速失败的图
TryGraph
可以用于用户想要有故障快速策略的可失败任务。
当任何任务完成带有 Err
时,它将终止运行futures。
开发者
pre-commit hook 设置: cargo run --bin install-pre-commit-hook
。
依赖项
约3MB
约47K SLoC