#async-task #task #async-executor #future #executor #async #no-std

no-std smartpoll

简化轮询未来的'Task'抽象

3个版本 (1个稳定版)

2.0.0 2023年10月7日
1.0.0 2023年10月5日
0.1.1 2023年6月18日
0.1.0 2023年6月18日

#1504 in 异步

MIT 许可证

67KB
732

Smartpoll

Smartpoll通过提供一个简化轮询Rust的Task抽象,使得构建自己的多线程异步Rust执行器变得简单。

有关更多信息,请参阅文档

许可证

本项目采用MIT许可证


lib.rs:

Smartpoll提供了一个Task类型,使得编写自己的多线程异步Rust执行器变得容易。

可以从任何没有输出且实现了SendFuture创建一个Task。要轮询任务,只需提供一个闭包来安排任务再次被轮询。如果任务未完成,则将调用此闭包,但只有在任务准备好重新安排时才会调用。

任务还可以存储任何实现了Send的类型的数据。

轮询任务比直接轮询它们的future简单得多,因为您无需处理同步、固定或提供Waker。以下是一个使用Smartpoll和标准库的基本多线程执行器示例

use smartpoll::Task;
use std::{
    sync::{
        atomic::{AtomicUsize, Ordering},
        mpsc, Arc,
    },
    thread,
    time::Duration,
};

// the executor has a work queue,
let (queue_tx, queue_rx) = mpsc::channel::<Task<()>>();
// a counter that tracks the number of unfinished tasks (which is shared with each worker),
let num_unfinished_tasks = Arc::new(AtomicUsize::new(0));
// and a local counter that tracks which worker to send the next task to
let mut next_worker = 0;

// to spawn a new task:
let spawn_task = {
    let queue_tx = queue_tx.clone();
    let num_unfinished_tasks = num_unfinished_tasks.clone();
    move |task| {
        // increment the 'unfinished tasks' counter
        num_unfinished_tasks.fetch_add(1, Ordering::SeqCst);
        // and add the task onto the work queue
        queue_tx.send(task).unwrap();
    }
};

// to reschedule a task, add it back onto the work queue
let reschedule_task = move |task| queue_tx.send(task).unwrap();

// for each worker:
let num_workers = thread::available_parallelism().unwrap().into();
let workers = (0..num_workers)
    .map(|_| {
        let num_unfinished_tasks = num_unfinished_tasks.clone();
        let reschedule_task = reschedule_task.clone();

        // create a channel for sending tasks to the worker
        let (work_tx, work_rx) = mpsc::sync_channel::<Task<()>>(1);

        // on a new thread:
        let join_handle = thread::spawn(move || {
            // for each task that is sent to this worker, until the channel closes:
            while let Ok(task) = work_rx.recv() {
                // poll the task
                if task.poll(reschedule_task.clone()).is_ready() {
                    // and if it has completed then decrement the 'unfinished tasks' counter
                    num_unfinished_tasks.fetch_sub(1, Ordering::SeqCst);
                }
            }
        });
        (work_tx, join_handle)
    })
    .collect::<Vec<_>>();

// spawn some tasks
spawn_task(Task::new((), async {
    // async code...
}));
spawn_task(Task::new((), async {
    // async code...
}));

// while there are unfinished tasks:
while num_unfinished_tasks.load(Ordering::SeqCst) > 0 {
    // wait until a task is available from the queue
    if let Ok(task) = queue_rx.recv_timeout(Duration::from_millis(100)) {
        // send the task to the next available worker
        let mut task = Some(task);
        while let Err(mpsc::TrySendError::Full(returned_task)) =
            workers[next_worker].0.try_send(task.take().unwrap())
        {
            // whenever a worker's channel is full, try the next worker
            task = Some(returned_task);
            next_worker += 1;
            if next_worker == workers.len() {
                next_worker = 0;
            }
        }
    }
}

// once all of the tasks have completed
for (work_tx, join_handle) in workers.into_iter() {
    // close each worker's channel
    drop(work_tx);
    // and wait for each worker's thread to finish
    join_handle.join().unwrap();
}

有关库的工作原理及其正确性的说明,请参阅源代码

请确保传递给重新调度回调的任务元数据与传递给原始任务的元数据相同。

无运行时依赖