#concurrency #async-task #task #structured #async

scoped_spawn

异步编程的完整结构化并发

4 个版本

0.2.1 2020 年 6 月 23 日
0.2.0 2020 年 6 月 10 日
0.1.1 2020 年 2 月 12 日
0.1.0 2020 年 2 月 8 日

#1357 in 异步

MIT 许可证

36KB
494 代码行,不包括注释

Scoped Spawn

crates.io API Documentation Pipeline Status

异步编程的完整结构化并发。

结构化并发

在结构化并发中,每个异步任务仅在某个特定范围内运行。此范围由任务的父任务创建,并且不能超出父任务的范围。

在任意时刻,结构化并发中创建的任务形成一棵树。如果一个节点是活跃的,那么它的路径上的所有节点也都是活跃的。换句话说,没有任何节点可以比它的父节点活得久。

此库提供结构化并发的强保证:子任务必须完全退出并释放所有资源(除了用于通知父任务其终止的资源,这些资源由该库处理)之后,我们才认为它已终止。

API 概览

此库提供了 ScopedSpawn 特性,您可以从其中派生新任务。派生的任务将成为当前任务的子任务,并且当当前任务开始终止时将终止。API 还提供了可以提前终止子任务的方法。

在要终止的任务外部启动的终止过程也称为取消。

ScopedSpawn 特性由 ScopedSpawner 实现。要创建 ScopedSpawner,请传递一个实现 Spawn 的对象,您可以为所有已知的执行器轻松实现 Spawn

任何希望接受启动器的代码都应接受 ScopedSpawn 特性,而不是 ScopedSpawner

任务的终止

终止过程有几个阶段。

  1. 当任务的 future 完成,或者从父任务收到取消信号时,终止过程开始,哪个先到算哪个。
  2. 任务的 future 立即丢弃。
  3. 任务反过来向其子任务发送取消信号。
  4. 任务异步等待其子任务的终止。
  5. 在任务中调用 done 函数。有关详细信息,请参阅 ScopedSpawn 的文档。
  6. 最后,任务通过 "done" 信号向其父任务发出终止信号。有关详细信息,请参阅 ParentSignalsChildSignals 的文档。

低级 API

还提供了一个低级别的 remote_scope API。它提供了启动任务所需的所有内容,但不执行实际的启动操作。

示例

以下示例展示了在使用 Tokio 时 ScopedSpawn 的用法。

#[tokio::main]
async fn main() {
    use scoped_spawn::{ScopedSpawn, ScopedSpawner};

    let spawn = TokioDefaultSpawner::new();
    let spawn = ScopedSpawner::new(spawn);
    let signal = spawn
        .spawn_with_signal(
            |spawn| async {
                // Here `spawn` is the child's spawner. Do not give it to anyone else!
                // And do not try to use the parent's spawner because it would break structured
                // concurrency.

                // We could spawn nested children here, but for the demo we don't.
                drop(spawn);

                eprintln!("I'm alive!");
                tokio::time::delay_for(std::time::Duration::from_secs(2)).await;
                eprintln!("I'm still alive!"); // Nope.
            },
            || (),
        )
        .unwrap();

    tokio::time::delay_for(std::time::Duration::from_secs(1)).await;

    drop(signal.cancel_sender); // Cancel the task by dropping.
    signal.done_receiver.await; // Do this if you want to wait.
                                // When the await returns, the future of the spawned task is
                                // guaranteed dropped.
    eprintln!("Task terminated.");
}

// Just some chores to turn the Tokio spawner into a `Spawn`.
#[derive(Clone)]
struct TokioDefaultSpawner {}

impl TokioDefaultSpawner {
    fn new() -> Self {
        Self {}
    }
}

impl futures::task::Spawn for TokioDefaultSpawner {
    fn spawn_obj(
        &self,
        future: futures::future::FutureObj<'static, ()>,
    ) -> Result<(), futures::task::SpawnError> {
        tokio::spawn(future);
        Ok(())
    }

    fn status(&self) -> Result<(), futures::task::SpawnError> {
        Ok(())
    }
}

依赖项

~1MB
~16K SLoC