#future #async-executor #executor #async

async_nursery

结构化并发的原始类型

9 个版本 (5 个破坏性更新)

0.5.0 2022 年 5 月 11 日
0.4.0 2022 年 1 月 6 日
0.3.1 2021 年 6 月 11 日
0.3.0 2021 年 1 月 1 日
0.2.0 2020 年 6 月 24 日

#294 in 并发

每月 36 次下载
用于 2 crates

无授权

40KB
533

async_nursery

standard-readme compliant Build Status Docs crates.io

结构化并发的原始类型。

孵化器允许编写遵循结构化并发的并发程序。如果您对这个概念不太熟悉,结构化并发论坛有一些非常优秀的资源,您可以访问 结构化并发论坛。这个库的名字灵感来源于优秀的 Python Trio 库

目录

描述

async_nursery 为 Rust 带来了结构化并发的原始类型。结构化并发有三个主要目标

1. 并发程序的合理控制流。

结构化并发的笔记,或:Go 语句被认为是有害的,由 Nathaniel J. Smith 解释得非常精彩。概括来说,如果一个函数想要分叉并发做一些工作,确保在函数返回时所有子任务都已完成。这样它就像我们熟悉的同步代码中的黑盒一样工作。函数有输入和返回值,当它完成时,它创建的代码不再运行。

您可以通过将来自 async_executorsJoinHandle 放入 FuturesUnordered 来做到这一点,但正如我们下面将看到的,async_nursery 要灵活和方便得多。与来自 tokioasync-std 的直接 JoinHandle 不同,来自 async_executorsJoinHandle 默认不会在丢弃时分离。

2. 防止资源泄漏。

孤儿任务,没有 JoinHandle 的任务可能会永远存活,要么在循环中运行,要么死锁。结构化并发确保没有泄漏,将所有资源整齐地放入调用树中,这与调用栈非常相似。在调用树中,一个栈帧可以是一系列并排坐在一起的几个栈帧,它们正在并发执行,但当我们返回到上一个栈帧时,它们都已完成。

3. 传播错误

在Rust中,向上传递错误到调用栈是很常见的。如果你启动一个任务并让它运行在虚空中,你需要通过通道等通道进行错误处理。在结构化并发中,由于所有任务都在父任务返回之前被连接,你可以像同步代码一样返回错误。如果某个任务遇到错误,也可以取消所有兄弟任务。

async_nursery 的特性

  • Nursery 作为启动器。
  • NurseryStream 实现了所有它照料的 futures 的结果的 Stream<Out>
  • NurseryStream 如果只想等待一切完成,但不在乎返回值,则实现了 Future<Output=()>
  • NurseryStream 主要为您管理 JoinHandle
  • 可以由任何实现了 SpawnHandleLocalSpawnHandle 的 executor 支持。
  • 在丢弃 NurseryStream 时取消所有正在运行的未来。
  • Nursery 实现 FutureObj 和/或 LocalFutureObj 以及 NurseNurseExt 的 Sink。
  • Nursery 将包装的 executor 的 async_executor 特性转发。这对于 TimerTokioIoYieldNowSpawnBlocking 都有效。请注意,当使用这种方式像 SpawnBlocking 时,nursery 不管理任务,它只是让您使用包装的 executor。

缺失的功能

  • 没有提供用于 协作取消 的 API。由于 std::task::Context 中没有对该功能的支持,您必须基本上传递一些取消令牌到任务中,该任务需要执行清理且不支持在任何 await 点被丢弃。由于这需要启动的任务具有特定的支持,我将此留给用户。在 examples 目录 中包括了一个使用 AtomicBool 的示例。优势在于灵活性。您可以取消 nurseries 中的某些任务,而让其他任务继续运行,或者如果它们支持的话,让其他任务在丢弃时取消等。异步析构(Async drop)可能会有一天缓解这种痛苦,但还不是时候。

  • 没有提供用于运行非 'static' futures 的 API。在安全的 Rust 中这是不可能的,因为可以使用 std::mem::forget 来泄露 nurseries 并欺骗它超越其父栈帧的生命周期,此时它将保留一个无效的引用。如果您真的想这么做,我建议您查看 async-scoped crate,它通过要求您使用 unsafe 来允许这样做。

安装

使用 cargo addcargo add async_nursery

使用 cargo yaml

dependencies:

   async_nursery: ^0.5

使用 Cargo.toml

[dependencies]

   async_nursery = "0.5"

升级

在升级时请查看 变更日志

依赖项

这个crate依赖项较少(futuresasync_executors)。Cargo将自动为您处理依赖项。您需要从async_executors crate中选择执行器,并设置该crate的正确功能来启用它。

没有可选功能。

安全性

该crate使用forbid(unsafe),但依赖于futures,它有相当多的不安全代码。据我所知,使用此crate没有特定的安全问题。

性能

当前实现很简单。`Nursery`仅通过无界通道将`JoinHandle`发送到`NurseryStream`。这很方便,因为它意味着`NurseExt::nurse`不需要是异步的,但与直接使用底层执行器相比,它有一些开销。未来我希望优化实现。

用法

警告:如果您等待流完成,请记住只有当没有更多`Nursery`存在时,它才会完成。您必须在等待`NurseryStream`之前释放Nursery。如果您的程序发生死锁,这应该是您首先检查的地方。

在nursery上启动的所有任务都必须具有相同的Future::Output类型。

基本示例

示例目录中有一个广泛使用async_nursery的示例列表。请查看它们。

use
{
   async_nursery   :: { Nursery, NurseExt } ,
   async_executors :: { AsyncStd          } ,
};

pub type DynResult<T> = Result<T, Box< dyn std::error::Error + Send + Sync + 'static >>;

async fn self_contained() -> DynResult<()>
{
   let (nursery, output) = Nursery::new( AsyncStd );

   for _ in 0..5
   {
      nursery.nurse( async { /* do something useful */ } )?;
   }

   // This is necessary. Since we could keep spawning tasks even after starting to poll
   // the output, it can't know that we are done, unless we drop all senders or call
   // `close_nursery`. If we don't, the await below deadlocks.
   //
   drop(nursery);

   // Resolves when all spawned tasks are done.
   //
   output.await;

   Ok(())
}

返回错误

可以使用TryStreamExt::try_next的功能在所有并发任务都需要成功完成时提前退出。现在您可以丢弃NurseryStream并取消所有正在运行的同级任务。

恢复其他返回类型

有时可以从启动的任务中返回有用的数据。您可以将其视为可以并发运行的函数调用或闭包。nursery允许您在运行过程中恢复这些。例如,可以用来实现进度条。

另一种可能性是在所有操作完成后使用collect获取所有返回值的集合。

恐慌

Nursery不对panic进行特殊处理。如果您的任务panic,这取决于执行器会发生什么。目前,与其它执行器不同,tokio会捕获spawned任务中的catch_unwind。其他执行器将panic传播到等待JoinHandle(例如,等待NurseryStream)的线程。如果您想要在所有执行器上工作的健壮应用程序,请使用futures库中的catch_unwind组合器。再次使用TryStreamExt::try_next,如果有一个任务panic,您可以提前退出。

与 FuturesUnordered 的区别

`Nursery`和`NurseryStream`内部包装了FuturesUnordered。这给我们带来的主要功能是允许我们开始轮询输出流,同时继续启动更多子任务。《FuturesUnordered》有一个非常严格的两个阶段API。首先启动,然后获取输出。这使得我们能够将`NuseryStream`用作长生命周期的容器。例如,如果您将要启动网络请求,您可以持续监听`NurseryStream`中处理过程中发生的错误,同时继续启动更多请求。然后,当连接关闭时,我们希望停止处理此连接的未完成请求。通过丢弃`NurseryStream`,我们可以做到这一点。

另外还添加了一些便利功能。

  • Nursery 自动为您进行孵化,无需手动处理 JoinHandle
  • NurseryStream 不仅实现了 Stream,还实现了 Future,如果您只想等待所有任务完成而无需关心输出。
  • Nursery 可以被克隆并传递,用于函数调用和孵化子任务。您无需手动通过通道将 JoinHandle 发送到 FuturesUnordered

API

API 文档可以在 docs.rs 上找到。

贡献

请查看贡献指南

测试

cargo testwasm-pack test --firefox --headless -- -Z features=itarget --no-default-features,尽管后者需要使用 nightly 版本,并且在 https://github.com/rustwasm/wasm-pack/issues/698 修复或您修补了 wasm-pack 之前无法工作。您可以使用 wasm-bindgen-cli

行为准则

任何在公民行为准则第4点“不可接受的行为”中描述的行为都是不受欢迎的,并可能导致您被封禁。如果任何人都未能尊重这些/您的限制,包括项目的维护者和版主,您有权指出。

许可证

许可协议

依赖项

~81–470KB