#future #async

unicycle

大量future的调度器

37次发布

0.10.2 2024年8月3日
0.10.1 2024年3月6日
0.9.4 2023年3月22日
0.9.2 2022年4月28日
0.4.2 2020年1月31日

#47 in 异步

Download history 250/week @ 2024-04-28 414/week @ 2024-05-05 220/week @ 2024-05-12 287/week @ 2024-05-19 444/week @ 2024-05-26 334/week @ 2024-06-02 282/week @ 2024-06-09 566/week @ 2024-06-16 431/week @ 2024-06-23 249/week @ 2024-06-30 568/week @ 2024-07-07 391/week @ 2024-07-14 325/week @ 2024-07-21 495/week @ 2024-07-28 326/week @ 2024-08-04 354/week @ 2024-08-11

1,521 每月下载量
用于 3 个crate(直接使用2个)

MIT/Apache

86KB
1K SLoC

unicycle

github crates.io docs.rs build status

大量future的调度器。

Unicycle提供了一系列无序类型

这些是异步抽象,运行一组可能以任何顺序完成的future或stream。类似于来自futures crateFuturesUnordered。但我们旨在提供更强的公平性保证(见下文),并提高正在poll的future的内存局部性。

注意: 此项目是实验性的。它涉及一些不安全且可能错误的假设,在将其投入生产之前,需要对其进行审查或删除。


功能


示例

use std::time::Duration;

use tokio::time;
use unicycle::FuturesUnordered;

let mut futures = FuturesUnordered::new();

futures.push(time::sleep(Duration::from_secs(2)));
futures.push(time::sleep(Duration::from_secs(3)));
futures.push(time::sleep(Duration::from_secs(1)));

while let Some(_) = futures.next().await {
    println!("tick");
}

println!("done!");

无序类型可以从迭代器创建

use std::time::Duration;

use tokio::time;
use unicycle::FuturesUnordered;

let mut futures = Vec::new();

futures.push(time::sleep(Duration::from_secs(2)));
futures.push(time::sleep(Duration::from_secs(3)));
futures.push(time::sleep(Duration::from_secs(1)));

let mut futures = futures.into_iter().collect::<FuturesUnordered<_>>();

while let Some(_) = futures.next().await {
    println!("tick");
}

println!("done!");

公平性

可以将像Unicycle这样的抽象视为调度器。它们被提供一组子任务,并试图尽可能地将它们驱动到完成。在这方面,讨论任务的驱动方式中的公平性很有趣。

当前 FuturesUnordered 的实现维护了一个唤醒感兴趣的任务队列。当任务被唤醒时,它会添加到队列的头部以表示其对轮询的兴趣。当 FuturesUnordered 运作时,它会循环清空这个队列并轮询相关的任务。这个过程有一个副作用,即那些积极表示唤醒兴趣的任务会获得优先级并更频繁地被轮询。由于在队列被清空的过程中,它们的兴趣有更大的可能性立即被重新添加到队列头部。这可能导致少量任务导致 FuturesUnordered 的轮询循环异常 自旋。这个问题由 Jon Gjengset 报告,并在 限制 FuturesUnordered 允许的自旋次数 上得到改进。

Unicycle 通过限制每个子任务在每个 轮询周期 中可能被轮询的频率来解决这个问题。这是通过跟踪两个独立的集合中的轮询兴趣来完成的。一旦我们被轮询,我们就更换出活动集合,然后使用更换出的集合作为下一个周期中轮询的基础,同时限制自己每个子任务只轮询 一次。额外的唤醒只记录在换入集合中,它将在下一个周期中被轮询。

这样我们希望达到更高的公平性,从不偏袒任何特定任务的行为。


架构

Unordered 类型将所有正在轮询的未来存储在一个连续存储 slab 中,其中每个未来都存储在一个单独的分配中。这个存储的头是原子引用计数的,可以用来构建一个 waker 而无需额外的分配。

除了 slab 之外,我们维护两个 BitSets,一个 活动 一个 备用。当任务注册唤醒兴趣时,与其实例索引相关的位会在活动集合中设置,并将传递给 Unordered 的最新 waker 调用来唤醒它。一旦 Unordered 被轮询,它会原子地交换活动和备用的 BitSets,等待直到它能够独占访问现在的 备用 BitSet,并从所有标记的索引中清空它,以确定要轮询的任务。然后每个任务按顺序轮询 一次。如果任务是 Ready,则产出其结果。在我们再次获得控制权后,我们继续以这种方式清空备用集合,直到它为空。当这完成时,我们产出一次,然后再次开始周期。

依赖

~0–5MB