2个版本

0.1.1 2020年4月16日
0.1.0 2020年4月16日

#42 in #task-queue


用于 kayrx-timer

MIT 协议

93KB
1.5K SLoC

kayrx-karx


lib.rs:

Karx 异步执行引擎

启动

要将future启动到执行器中,我们首先需要在堆上为其分配空间,并与其一起保留一些状态。这种状态表示future是否已准备好轮询、等待被唤醒或已完成。这样的future被称为任务

所有执行器都有某种类型的队列,用于保存可运行的任务

let (sender, receiver) = crossbeam::channel::unbounded();
#
#
#

任务可以通过spawnspawn_local来构建

#
// A future that will be spawned.
let future = async { 1 + 2 };

// A function that schedules the task when it gets woken up.
let schedule = move |task| sender.send(task).unwrap();

// Construct a task.
let (task, handle) = kayrx_karx::spawn(future, schedule, ());

// Push the task into the queue by invoking its schedule function.
task.schedule();

spawn函数的最后一个参数是标签,与任务相关联的任意数据。在大多数执行器中,这通常是任务标识符或任务局部存储。

该函数返回一个可运行的Task和一个可以等待结果的JoinHandle

执行

任务执行器有一些主循环,它推动任务完成。这意味着从队列中取出可运行的任务,并按顺序运行每个任务

#
#
#
#
#
for task in receiver {
    task.run();
}

当运行任务时,它的future将被轮询。如果轮询未完成任务,这意味着它正在等待另一个future,需要进入睡眠状态。当唤醒时,它的调度函数将被调用,将其推回队列以便再次运行。

取消

TaskJoinHandle都有取消任务的方法。当取消时,任务的future将不再被轮询,而是被丢弃。

如果被Task实例取消,任务将立即被销毁。如果被JoinHandle实例取消,它将再次被调度,下一次尝试运行它将简单地销毁它。

然后JoinHandle异步操作将评估为None,但仅在任务的异步操作被释放之后。

性能

任务构造会引发一个分配,用于存储其状态、调度函数以及未来的结果(如果已完成)。

任务布局相当于4个usize,然后是调度函数,然后是未来及其输出的联合。

唤醒

方便的waker_fn构造函数可以将任何函数转换为Waker。每次唤醒时,该函数都会被调用

let waker = kayrx_karx::waker_fn(|| println!("Wake!"));

// Prints "Wake!" twice.
waker.wake_by_ref();
waker.wake_by_ref();

这对于实现像block_on这样的单未来执行器非常有用。

动态任务内部。受golang运行时启发。

在任务内部进行阻塞是可行的,内部将检测到这一点,并扩展线程池。

use std::thread;
use std::time::Duration;

use futures_timer::Delay;

fn main() {
    kayrx_karx::exec(async {
        for _ in 0..10 {
            Delay::new(Duration::from_secs(1)).await;
            println!("Non-blocking Hello World");
        }
    });

    kayrx_karx::exec(async {
        for _ in 0..10 {
            thread::sleep(Duration::from_secs(1));
            println!("Blocking Hello World");
        }
    });

    thread::sleep(Duration::from_secs(11));
}

依赖关系

~2MB
~31K SLoC