37次发布
0.10.2 | 2024年8月3日 |
---|---|
0.10.1 | 2024年3月6日 |
0.9.4 | 2023年3月22日 |
0.9.2 | 2022年4月28日 |
0.4.2 | 2020年1月31日 |
#47 in 异步
1,521 每月下载量
用于 3 个crate(直接使用2个)
86KB
1K SLoC
unicycle
大量future的调度器。
Unicycle提供了一系列无序类型
这些是异步抽象,运行一组可能以任何顺序完成的future或stream。类似于来自futures crate的FuturesUnordered。但我们旨在提供更强的公平性保证(见下文),并提高正在poll的future的内存局部性。
注意: 此项目是实验性的。它涉及一些不安全且可能错误的假设,在将其投入生产之前,需要对其进行审查或删除。
功能
parking-lot
- 启用使用parking_lot crate的锁定(默认)。futures-rs
- 启用使用futures-rs中的Stream类型。这是访问StreamsUnordered和IndexedStreamsUnordered所必需的,因为这些类型封装了futures-rs类型。(默认)
示例
use std::time::Duration;
use tokio::time;
use unicycle::FuturesUnordered;
let mut futures = FuturesUnordered::new();
futures.push(time::sleep(Duration::from_secs(2)));
futures.push(time::sleep(Duration::from_secs(3)));
futures.push(time::sleep(Duration::from_secs(1)));
while let Some(_) = futures.next().await {
println!("tick");
}
println!("done!");
无序类型可以从迭代器创建
use std::time::Duration;
use tokio::time;
use unicycle::FuturesUnordered;
let mut futures = Vec::new();
futures.push(time::sleep(Duration::from_secs(2)));
futures.push(time::sleep(Duration::from_secs(3)));
futures.push(time::sleep(Duration::from_secs(1)));
let mut futures = futures.into_iter().collect::<FuturesUnordered<_>>();
while let Some(_) = futures.next().await {
println!("tick");
}
println!("done!");
公平性
可以将像Unicycle这样的抽象视为调度器。它们被提供一组子任务,并试图尽可能地将它们驱动到完成。在这方面,讨论任务的驱动方式中的公平性很有趣。
当前 FuturesUnordered 的实现维护了一个唤醒感兴趣的任务队列。当任务被唤醒时,它会添加到队列的头部以表示其对轮询的兴趣。当 FuturesUnordered 运作时,它会循环清空这个队列并轮询相关的任务。这个过程有一个副作用,即那些积极表示唤醒兴趣的任务会获得优先级并更频繁地被轮询。由于在队列被清空的过程中,它们的兴趣有更大的可能性立即被重新添加到队列头部。这可能导致少量任务导致 FuturesUnordered 的轮询循环异常 自旋。这个问题由 Jon Gjengset 报告,并在 限制 FuturesUnordered 允许的自旋次数 上得到改进。
Unicycle 通过限制每个子任务在每个 轮询周期 中可能被轮询的频率来解决这个问题。这是通过跟踪两个独立的集合中的轮询兴趣来完成的。一旦我们被轮询,我们就更换出活动集合,然后使用更换出的集合作为下一个周期中轮询的基础,同时限制自己每个子任务只轮询 一次。额外的唤醒只记录在换入集合中,它将在下一个周期中被轮询。
这样我们希望达到更高的公平性,从不偏袒任何特定任务的行为。
架构
Unordered 类型将所有正在轮询的未来存储在一个连续存储 slab 中,其中每个未来都存储在一个单独的分配中。这个存储的头是原子引用计数的,可以用来构建一个 waker 而无需额外的分配。
除了 slab 之外,我们维护两个 BitSets,一个 活动 一个 备用。当任务注册唤醒兴趣时,与其实例索引相关的位会在活动集合中设置,并将传递给 Unordered 的最新 waker 调用来唤醒它。一旦 Unordered 被轮询,它会原子地交换活动和备用的 BitSets,等待直到它能够独占访问现在的 备用 BitSet,并从所有标记的索引中清空它,以确定要轮询的任务。然后每个任务按顺序轮询 一次。如果任务是 Ready,则产出其结果。在我们再次获得控制权后,我们继续以这种方式清空备用集合,直到它为空。当这完成时,我们产出一次,然后再次开始周期。
依赖
~0–5MB