3 个不稳定版本
使用旧的 Rust 2015
0.1.1 | 2019 年 12 月 22 日 |
---|---|
0.1.0 | 2017 年 11 月 9 日 |
0.0.1 | 2017 年 9 月 9 日 |
#85 in #join
300KB
5K SLoC
将 Future 集成到 Rayon 中
此软件包现在已 弃用,因为它只支持过时的 futures-0.1
。新的与 std::future::Future
的集成可能会直接在 rayon-core
中进行,但这留给了未来的 ...。
注意: rayon-futures
目前需要 rayon-core
的不稳定功能,这些功能可能只能通过 rustc --cfg
启用,例如,通过在环境中设置 RUSTFLAGS=--cfg rayon_unstable
。
Future 的工作原理
让我们先简要了解一下 Future 的工作原理。我们的例子将是一个简单的 Future 链。
F_map -> F_socket
在这里,F_socket
是一个映射到 TCP 套接字的 Future。它返回从该套接字读取的数据的 Vec<u8>
。 F_map
是一个 Future,它将接受这些数据并进行一些转换。(注意,实际从套接字等读取的 Future 并不是这样工作的,这只是一个例子。)
期货的概念是每个期货都提供一个 poll()
方法。当调用 poll()
时,期货将尝试执行。通常,这通常涉及到递归调用其他期货的 poll()
。因此,在我们的例子中,当 F_map
开始时,会调用 F_socket.poll()
来查看数据是否已准备好。其思想是 poll()
返回三个值之一
Ok(Async::Ready(R))
-- 期货已完成,这里是结果R
。Err(E)
-- 期货已完成并导致错误E
。Ok(Async::NotReady)
-- 期货尚未完成。
最后一个是最有意思的。这意味着期货被阻塞在 某些事件 X 上,通常是 I/O 事件(例如,我们正在等待 TCP 套接字上更多的数据到达)。
当期货返回 NotReady
时,它还有一个额外的任务。它必须将“当前任务”注册(现在可以想象成当前线程)以在事件 X 发生时重新唤醒。对于大多数期货,这个任务委托给另一个期货:例如,在我们的例子中,F_map
调用 F_socket.poll()
。因此,如果 F_socket.poll()
返回未准备好,那么它已经注册了当前线程,因此 F_map
可以仅仅将 NotReady
结果进一步传播。
当前任务和执行器
futures.rs 库的一个关键概念是 执行器。执行器是首先调用顶级期货(在我们的例子中是 F_map
)的运行时。这正是 Rayon 扮演的角色。请注意,在任何期货系统中可能有多个互操作的执行器。
执行器的部分工作是在未来执行时维护一些线程局部存储(TLS)。具体来说,它必须设置“当前任务”(基本上是一个唯一的整数,尽管它是一个不可见类型)以及一个类型为 Arc<Unpark>
的“未停对象”。The Unpark
特性 提供了一个方法(unpark()
),当任务应该被重新唤醒时可以调用它。因此,F_socket
可能会获取当前的 Arc<Unpark>
对象并将其存储供 I/O 线程使用。I/O 线程可能会调用 epoll()
或 select()
或其他任何操作,并在检测到套接字有更多数据时调用 unpark()
方法。
Rayon 的未来集成
当你将类型为 F
的未来实例化到 rayon 中时,其想法是它将在线程池中独立执行。同时,spawn_future()
方法返回给你自己的未来(让我们称之为 F'
),你可以用它来轮询和监控其进度。然而,在 Rayon 内部,我们只分配一个 Arc
来表示这两者——一个精确的 Arc<ScopeFuture<F>>
,以此精确表示——因此这个 Arc
承担了两个不同的角色。
对 F'
(返回给用户的句柄)的操作由特性 ScopeFutureTrait
指定,非常简单。用户可以轮询未来,即检查 rayon 是否已经执行完毕,或者取消未来。当 F'
被丢弃时,发生 cancel()
,这表示用户不再对结果感兴趣。
未来引用计数
每个实例化的未来都由一个 Arc
表示。这个 Arc
实际上具有一些有趣的结构。下面图表中的每条边都代表一些通过保持引用计数(以某种方式,通常通过 Arc
)来“保持活跃”的东西
F' ---+ [ deque ] --+
| |
v v
+---> /---------------------\
| | registry: | ------> [rayon registry]
| | contents: --------\ |
| | | scope | | ------> [spawning scope]
| | | this | | --+ (self references)
| | | ... | | |
| | \-----------------/ | |
| \---------------------/ |
+-------------------------------+
让我们来看看它们
- 来自
F'
的进入边表示从spawn_future
的调用者返回的未来。这确保只要调用者仍然对查看其结果感兴趣,未来弧就不会被释放。 - 从
[ deque ]
来的边表示,当未来的值被入队到一个线程局部deque中(它只在某些时候是这样的),这个deque中包含一个引用。这是通过将Arc
转换为一个*const Job
对象来实现的(因此,*const
逻辑上持有原本由Arc
拥有的引用)。当任务执行时,它会转换回Arc
,最终释放引用。 registry
字段保留了一个Arc<Registry>
,因此保持了一些中央注册表的活跃状态。这实际上并没有做什么,但防止了Registry
被丢弃。特别是,这并不能防止注册表中的线程在未来的任务未调度等情况时终止(尽管未来中的其他字段可以做到)。scope
字段(类型为S
)是“封装作用域”。这个作用域是一个实现了FutureScope<'scope>
trait 的抽象值 -- 这意味着它负责确保'scope
不会在调用FutureScope
方法之前结束(这发生在未来执行完毕时)。例如,如果未来是在一个scope()
调用中产生的,那么S
将是一个包装器(ScopeFutureScope
)围绕一个*const Scope<'scope>
。当未来创建时,作用域为这个未来分配一个任务,并在未来标记为完成时减少作用域计数器。- 总的来说,
scope
字段的任务是确保未来类型(F
)保持有效。毕竟,由于F: 'scope
,F
在'scope
生命周期结束前都是有效的,并且这个生命周期不能在调用scope
方法之前结束,因此我们知道F
必须保持有效,直到这些方法之一被调用。 - 所有类型为
F
的数据都存储在spawn
字段中(此处未显示)。在作用域计数器减少之前,该字段始终设置为None
。有关更多详细信息,请参阅生命周期安全部分。
- 总的来说,
this
字段存储了一个实际上就是同一未来的Arc
。因此,未来有一个引用计数循环,直到这个循环被打破之前都不能被释放。该字段实际上是一个Option<Arc<..>>
,并在未来完成时将其设置为None
,打破循环,允许在丢弃其他引用时释放它。
未来状态机
内部,未来会经过各种状态,此处展示
PARKED <----+
| |
v |
UNPARKED |
| |
v |
EXECUTING --+
| | ^
| v |
| EXECUTING_UNPARKED
|
v
COMPLETE
当它们首次创建时,期货开始为已挂起。一个已挂起的期货是等待发生某事的期货。它没有在任何线程的deque中排期。然而,即使在从spawn_future()
返回之前,我们也将过渡到未挂起。一个未挂起的期货是等待执行。它被排入某个Rayon线程的deque中,因此将在线程有空闲时执行。
一旦期货开始执行(它本身是Rayon工作),它将过渡到执行状态。这意味着它正忙于调用F.poll()
,基本上。当它调用poll()
时,它还会将其contents.this
字段设置为当前的“通知”实例。因此,如果F
返回NotReady
,它将克隆this
字段并保留它以通知我们期货已准备好再次执行。
现在假设F
已完成,因此返回Ok(Ready(_))
或Err(_)
。在这种情况下,期货可以过渡到COMPLETE
。此时,许多不再需要的状态位(例如,期货本身,以及this
字段)被设置为None
并丢弃,结果存储在result
字段中。(此外,我们可能需要通知其他任务,但这将在未来的章节中讨论。)
如果F
返回Ok(Async::NotReady)
,那么我们通常会过渡到PARKED
状态并等待对notify()
的调用。当调用notify()
时,它会将期货移动到UNPARK
状态并将其注入注册表中。
然而,由于线程调度的不可预测性,可能会发生这种情况:notify()
在退出EXECUTING
状态之前被调用。例如,我们可能会调用F.poll()
,这会将Unpark
实例发送到I/O线程,该线程检测到I/O操作,并在F.poll()
返回之前调用notify()
。在这种情况下,notify()
方法将状态(当然是以原子方式)转换为EXECUTING_UNPARKED
。在这种情况下,当F.poll()
返回时,不会转换为PARKED
,未来实例将直接返回到EXECUTING
并尝试再次调用poll()
。这可能会重复几次。
生命周期安全
当然,Rayon的标志性特性是它允许你使用包含引用的未来F
,只要这些引用超出范围'scope
的生命周期。那么为什么这是安全的呢?
为什么这是安全的基本思想如下。ScopeFuture
结构体持有对作用域本身的引用(通过字段scope
)。在引用递减之前,作用域将不会结束(因此'scope
仍然有效)。这个引用仅在将未来转换为COMPLETE状态时递减——因此在那时之前,我们知道我们不必担心,引用仍然有效。
当我们转换到COMPLETE状态时,事情变得更加有趣。你会注意到,在转换过程中,将self.scope
任务标记为完成是最后发生的事情。重要的是,在完成之前,我们放弃对类型F
的所有访问:也就是说,我们将None
存储在可能引用类型F
值的字段中。这意味着我们知道,在我们转换到COMPLETE之后,我们不能再访问F
中找到的任何引用。
这是好事,因为在我们进入COMPLETE状态后,仍然存在对ScopeFuture
的活跃引用。这些引用的来源有两个:未阻塞值和未来结果。
通知句柄值。我们可能已经给出了NotifyHandle
值——这些包含特例对象,实际上是ScopeFuture
的引用。请注意,NotifyHandle: 'static
,因此这些可以在任何时间长度内漂浮(我们必须转义生命周期以分发它们)。这是可以接受的,因为(a)Arc
保持ScopeFuture
活跃,并且(b)你唯一能做的就是调用notify()
,由于状态是COMPLETE,这将立即返回(并且,无论如何,如我们上面所看到的,它无法访问任何引用)。
未来结果。 对 ScopeFuture
的另一个更有趣的参考是我们最初生成未来时返回给用户的价值。这个值更有趣,因为它可以用来做非平凡的事情,与 NotifyHandle
不同。如果您仔细观察这个句柄,您会看到它的类型被设计用来隐藏类型 F
。实际上,它只揭示了类型 T
和 E
,它们是未来 F
的 ok/err 结果类型。这是故意的:假设类型 F
包含一些引用,但这些引用没有出现在结果中。我们希望“结果”未来能够逃离作用域,然后进入任何类型 T
和 E
仍然在作用域的地方。如果我们在这里暴露了 F
,那就不可行了。(隐藏 F
还需要一个转换为对象类型的转换,在这种情况下是一个内部特性,称为 ScopeFutureTrait
。)但是请注意,T
和 E
也可能包含引用。它们甚至可以是与作用域绑定的引用。
那么用户可以用这个结果未来做什么呢?他们有两个可用的操作:轮询和取消。我们先看看取消,因为它更简单。如果状态是 完成,那么 cancel()
就是一个立即的无操作,所以我们知道它不能用来访问任何可能无效的引用。无论如何,它所做的只是设置一个字段为 true 并调用 notify()
,我们已经在上一节中考察了 notify()
的可能影响。
那么关于 poll()
呢?这是用户从未来获取最终结果的方法。它所做的最重要的事情是访问(并实际上使其为空)字段 result
,该字段存储未来的结果,因此可能有权访问 T
和 E
值。这些值可能包含引用...那么我们如何知道它们仍然在作用域内呢?答案是,这些类型在未来的用户类型中被公开,因此基本的 Rust 类型系统应该保证任何引用仍然有效,否则用户就不应该能够调用 poll()
。 (同样,在取消时也是如此,但这并不重要,因为 cancel()
没有做什么有趣的事情。)
依赖关系
~645KB