8 个版本
0.3.1 | 2019 年 5 月 7 日 |
---|---|
0.3.0 | 2019 年 5 月 7 日 |
0.2.2 | 2019 年 2 月 27 日 |
0.2.1 | 2019 年 1 月 16 日 |
0.1.2 | 2019 年 1 月 12 日 |
#16 in #任务调度器
30KB
566 行
包含 (ZIP 文件,17KB) sketch.odg
mpmc-scheduler
一个公平的、按通道可取消的、在 tokio 上运行的多-mpmc 任务调度器。
它将多个 mpmc 通道捆绑在一起,并在允许的最大工作者数量中对传入的工作进行公平的速率限制调度。
示例
use mpmc_scheduler;
use tokio::runtime::Runtime;
use futures::future::Future;
let (controller, scheduler) = mpmc_scheduler::Scheduler::new(
4,
|v| {
println!("Processing {}", v);
v
},
Some(|r| println!("Finalizing {}", r)),
true
);
let mut runtime = Runtime::new().unwrap();
let tx = controller.channel(1,4);
runtime.spawn(scheduler);
for i in 0..4 {
tx.try_send(i);
}
drop(tx); // drop tx so scheduler & runtime shut down
runtime.shutdown_on_idle().wait().unwrap();
详细信息
可以将其视为一个轮询调度器,用于速率限制的工作者,它们始终运行相同的函数。
o- -x
\ /
o--|--Scheduler --|--x
/ \
o- -x
在此图中,我们有 n 个生产者 o
和 m 个工作者 x
。我们想要以公平的方式处理来自 o
的所有传入工作。这样,如果一个生产者有 20 个作业,另一个有 2 个,两者都将以轮询的方式得到同等处理。
每个通道队列可以被清除,这样所有要调度的作业都会被丢弃。
为了允许停止当前正在运行的(昂贵)操作,这些操作可以分为两个部分(函数)。不能取消的 worker_fn
和如果作业被标记为取消则不会调用的 worker_fn_finalize
。
例如,http 请求的结果存储到数据库中。如果我们中断存储操作之前,我们可以防止一个通道的所有未完成的操作以及剩余的作业。我们创建 fetch-http 作为阻塞部分,数据库存储作为可选部分。
在迭代过程中检测到关闭的通道,并将它们从调度器中移除。您可以通过在控制器上调用 gc
来手动触发调度器的 tick。
性能
如果您有空闲工作者,处理一个作业大约需要 1ms 或更少的时间。根据您的工作者/生产者比例,您的性能可能会有所不同。例如,在 Arcane Magic 基准测试中,在 i7-6700HQ 处理 100 万个作业时,结果为 56ms/作业,8 个并行生产通道和 8 个工作者,每个通道绑定 1024 个。请注意,在每个调度间隔内最多进行两次往返(因此每个间隔最多调度 16 个作业),并且我们必须不断地重新发送。这意味着上述数字包括迭代和轮询启动-停止费用。
限制
-
由于缺少其他通道的特质,mpmc-scheduler 只能与其自己的生产者通道一起使用。futures mpsc 也不适用,因为它们不会唤醒调度器。
-
通道的绑定必须是 2 的幂。
-
每个
Scheduler
只能定义一个工作处理函数,并且之后无法更改。您可以通过传递一个包含要调度的动态函数的Box<dyn Fn>
来解决这个问题。请参见动态调度示例。
谢谢
感谢 udoprog 帮助减少泛型数量,并使将 Controller 和 Scheduler 存储为非泛型方式成为可能。
许可证
本项目采用 MIT 许可证。
贡献
除非您明确声明,否则您提交给 mpmc-scheduler 的任何有意贡献都应按 MIT 许可证许可,不附加任何额外条款或条件。
依赖项
~0.6–6MB
~14K SLoC