#thread #performance #join #parallel

已弃用 rayon-futures

(已弃用) 将 Futures 集成到 Rayon 中

3 个不稳定版本

使用旧的 Rust 2015

0.1.1 2019 年 12 月 22 日
0.1.0 2017 年 11 月 9 日
0.0.1 2017 年 9 月 9 日

#85 in #join

Apache-2.0/MIT

300KB
5K SLoC

将 Future 集成到 Rayon 中

此软件包现在已 弃用,因为它只支持过时的 futures-0.1。新的与 std::future::Future 的集成可能会直接在 rayon-core 中进行,但这留给了未来的 ...。

注意: rayon-futures 目前需要 rayon-core 的不稳定功能,这些功能可能只能通过 rustc --cfg 启用,例如,通过在环境中设置 RUSTFLAGS=--cfg rayon_unstable

Future 的工作原理

让我们先简要了解一下 Future 的工作原理。我们的例子将是一个简单的 Future 链。

F_map -> F_socket

在这里,F_socket 是一个映射到 TCP 套接字的 Future。它返回从该套接字读取的数据的 Vec<u8>F_map 是一个 Future,它将接受这些数据并进行一些转换。(注意,实际从套接字等读取的 Future 并不是这样工作的,这只是一个例子。)

期货的概念是每个期货都提供一个 poll() 方法。当调用 poll() 时,期货将尝试执行。通常,这通常涉及到递归调用其他期货的 poll()。因此,在我们的例子中,当 F_map 开始时,会调用 F_socket.poll() 来查看数据是否已准备好。其思想是 poll() 返回三个值之一

  • Ok(Async::Ready(R)) -- 期货已完成,这里是结果 R
  • Err(E) -- 期货已完成并导致错误 E
  • Ok(Async::NotReady) -- 期货尚未完成。

最后一个是最有意思的。这意味着期货被阻塞在 某些事件 X 上,通常是 I/O 事件(例如,我们正在等待 TCP 套接字上更多的数据到达)。

当期货返回 NotReady 时,它还有一个额外的任务。它必须将“当前任务”注册(现在可以想象成当前线程)以在事件 X 发生时重新唤醒。对于大多数期货,这个任务委托给另一个期货:例如,在我们的例子中,F_map 调用 F_socket.poll()。因此,如果 F_socket.poll() 返回未准备好,那么它已经注册了当前线程,因此 F_map 可以仅仅将 NotReady 结果进一步传播。

当前任务和执行器

futures.rs 库的一个关键概念是 执行器。执行器是首先调用顶级期货(在我们的例子中是 F_map)的运行时。这正是 Rayon 扮演的角色。请注意,在任何期货系统中可能有多个互操作的执行器。

执行器的部分工作是在未来执行时维护一些线程局部存储(TLS)。具体来说,它必须设置“当前任务”(基本上是一个唯一的整数,尽管它是一个不可见类型)以及一个类型为 Arc<Unpark> 的“未停对象”。The Unpark 特性 提供了一个方法(unpark()),当任务应该被重新唤醒时可以调用它。因此,F_socket 可能会获取当前的 Arc<Unpark> 对象并将其存储供 I/O 线程使用。I/O 线程可能会调用 epoll()select() 或其他任何操作,并在检测到套接字有更多数据时调用 unpark() 方法。

Rayon 的未来集成

当你将类型为 F 的未来实例化到 rayon 中时,其想法是它将在线程池中独立执行。同时,spawn_future() 方法返回给你自己的未来(让我们称之为 F'),你可以用它来轮询和监控其进度。然而,在 Rayon 内部,我们只分配一个 Arc 来表示这两者——一个精确的 Arc<ScopeFuture<F>>,以此精确表示——因此这个 Arc 承担了两个不同的角色。

F'(返回给用户的句柄)的操作由特性 ScopeFutureTrait 指定,非常简单。用户可以轮询未来,即检查 rayon 是否已经执行完毕,或者取消未来。当 F' 被丢弃时,发生 cancel(),这表示用户不再对结果感兴趣。

未来引用计数

每个实例化的未来都由一个 Arc 表示。这个 Arc 实际上具有一些有趣的结构。下面图表中的每条边都代表一些通过保持引用计数(以某种方式,通常通过 Arc)来“保持活跃”的东西

  F' ---+  [ deque ] --+
        |              |
        v              v
  +---> /---------------------\
  |     | registry:           | ------> [rayon registry]
  |     | contents: --------\ |
  |     | | scope           | | ------> [spawning scope]
  |     | | this            | | --+ (self references)
  |     | | ...             | |   |
  |     | \-----------------/ |   |
  |     \---------------------/   |
  +-------------------------------+

让我们来看看它们

  • 来自 F' 的进入边表示从 spawn_future 的调用者返回的未来。这确保只要调用者仍然对查看其结果感兴趣,未来弧就不会被释放。
  • [ deque ] 来的边表示,当未来的值被入队到一个线程局部deque中(它只在某些时候是这样的),这个deque中包含一个引用。这是通过将 Arc 转换为一个 *const Job 对象来实现的(因此,*const 逻辑上持有原本由 Arc 拥有的引用)。当任务执行时,它会转换回 Arc,最终释放引用。
  • registry 字段保留了一个 Arc<Registry>,因此保持了一些中央注册表的活跃状态。这实际上并没有做什么,但防止了 Registry 被丢弃。特别是,这并不能防止注册表中的线程在未来的任务未调度等情况时终止(尽管未来中的其他字段可以做到)。
  • scope 字段(类型为 S)是“封装作用域”。这个作用域是一个实现了 FutureScope<'scope> trait 的抽象值 -- 这意味着它负责确保 'scope 不会在调用 FutureScope 方法之前结束(这发生在未来执行完毕时)。例如,如果未来是在一个 scope() 调用中产生的,那么 S 将是一个包装器(ScopeFutureScope)围绕一个 *const Scope<'scope>。当未来创建时,作用域为这个未来分配一个任务,并在未来标记为完成时减少作用域计数器。
    • 总的来说,scope 字段的任务是确保未来类型(F)保持有效。毕竟,由于 F: 'scopeF'scope 生命周期结束前都是有效的,并且这个生命周期不能在调用 scope 方法之前结束,因此我们知道 F 必须保持有效,直到这些方法之一被调用。
    • 所有类型为 F 的数据都存储在 spawn 字段中(此处未显示)。在作用域计数器减少之前,该字段始终设置为 None。有关更多详细信息,请参阅生命周期安全部分。
  • this 字段存储了一个实际上就是同一未来的 Arc。因此,未来有一个引用计数循环,直到这个循环被打破之前都不能被释放。该字段实际上是一个 Option<Arc<..>>,并在未来完成时将其设置为 None,打破循环,允许在丢弃其他引用时释放它。

未来状态机

内部,未来会经过各种状态,此处展示

PARKED <----+
|           |
v           |
UNPARKED    |
|           |
v           |
EXECUTING --+
|   |   ^
|   v   |
|   EXECUTING_UNPARKED
|
v
COMPLETE

当它们首次创建时,期货开始为已挂起。一个已挂起的期货是等待发生某事的期货。它没有在任何线程的deque中排期。然而,即使在从spawn_future()返回之前,我们也将过渡到未挂起。一个未挂起的期货是等待执行。它被排入某个Rayon线程的deque中,因此将在线程有空闲时执行。

一旦期货开始执行(它本身是Rayon工作),它将过渡到执行状态。这意味着它正忙于调用F.poll(),基本上。当它调用poll()时,它还会将其contents.this字段设置为当前的“通知”实例。因此,如果F返回NotReady,它将克隆this字段并保留它以通知我们期货已准备好再次执行。

现在假设F已完成,因此返回Ok(Ready(_))Err(_)。在这种情况下,期货可以过渡到COMPLETE。此时,许多不再需要的状态位(例如,期货本身,以及this字段)被设置为None并丢弃,结果存储在result字段中。(此外,我们可能需要通知其他任务,但这将在未来的章节中讨论。)

如果F返回Ok(Async::NotReady),那么我们通常会过渡到PARKED状态并等待对notify()的调用。当调用notify()时,它会将期货移动到UNPARK状态并将其注入注册表中。

然而,由于线程调度的不可预测性,可能会发生这种情况:notify()在退出EXECUTING状态之前被调用。例如,我们可能会调用F.poll(),这会将Unpark实例发送到I/O线程,该线程检测到I/O操作,并在F.poll()返回之前调用notify()。在这种情况下,notify()方法将状态(当然是以原子方式)转换为EXECUTING_UNPARKED。在这种情况下,当F.poll()返回时,不会转换为PARKED,未来实例将直接返回到EXECUTING并尝试再次调用poll()。这可能会重复几次。

生命周期安全

当然,Rayon的标志性特性是它允许你使用包含引用的未来F,只要这些引用超出范围'scope的生命周期。那么为什么这是安全的呢?

为什么这是安全的基本思想如下。ScopeFuture结构体持有对作用域本身的引用(通过字段scope)。在引用递减之前,作用域将不会结束(因此'scope仍然有效)。这个引用仅在将未来转换为COMPLETE状态时递减——因此在那时之前,我们知道我们不必担心,引用仍然有效。

当我们转换到COMPLETE状态时,事情变得更加有趣。你会注意到,在转换过程中,将self.scope任务标记为完成是最后发生的事情。重要的是,在完成之前,我们放弃对类型F的所有访问:也就是说,我们将None存储在可能引用类型F值的字段中。这意味着我们知道,在我们转换到COMPLETE之后,我们不能再访问F中找到的任何引用。

这是好事,因为在我们进入COMPLETE状态后,仍然存在对ScopeFuture的活跃引用。这些引用的来源有两个:未阻塞值和未来结果。

通知句柄值。我们可能已经给出了NotifyHandle值——这些包含特例对象,实际上是ScopeFuture的引用。请注意,NotifyHandle: 'static,因此这些可以在任何时间长度内漂浮(我们必须转义生命周期以分发它们)。这是可以接受的,因为(a)Arc保持ScopeFuture活跃,并且(b)你唯一能做的就是调用notify(),由于状态是COMPLETE,这将立即返回(并且,无论如何,如我们上面所看到的,它无法访问任何引用)。

未来结果。ScopeFuture 的另一个更有趣的参考是我们最初生成未来时返回给用户的价值。这个值更有趣,因为它可以用来做非平凡的事情,与 NotifyHandle 不同。如果您仔细观察这个句柄,您会看到它的类型被设计用来隐藏类型 F。实际上,它只揭示了类型 TE,它们是未来 F 的 ok/err 结果类型。这是故意的:假设类型 F 包含一些引用,但这些引用没有出现在结果中。我们希望“结果”未来能够逃离作用域,然后进入任何类型 TE 仍然在作用域的地方。如果我们在这里暴露了 F,那就不可行了。(隐藏 F 还需要一个转换为对象类型的转换,在这种情况下是一个内部特性,称为 ScopeFutureTrait。)但是请注意,TE 也可能包含引用。它们甚至可以是与作用域绑定的引用。

那么用户可以用这个结果未来做什么呢?他们有两个可用的操作:轮询和取消。我们先看看取消,因为它更简单。如果状态是 完成,那么 cancel() 就是一个立即的无操作,所以我们知道它不能用来访问任何可能无效的引用。无论如何,它所做的只是设置一个字段为 true 并调用 notify(),我们已经在上一节中考察了 notify() 的可能影响。

那么关于 poll() 呢?这是用户从未来获取最终结果的方法。它所做的最重要的事情是访问(并实际上使其为空)字段 result,该字段存储未来的结果,因此可能有权访问 TE 值。这些值可能包含引用...那么我们如何知道它们仍然在作用域内呢?答案是,这些类型在未来的用户类型中被公开,因此基本的 Rust 类型系统应该保证任何引用仍然有效,否则用户就不应该能够调用 poll()。 (同样,在取消时也是如此,但这并不重要,因为 cancel() 没有做什么有趣的事情。)

依赖关系

~645KB