#tasks #tokio #async-task #async #task

tokio-tasker

允许您停止和连接Tokio任务组

3个稳定版本

1.2.0 2022年3月19日
1.1.0 2022年3月3日
1.0.1 2021年11月12日

#1818异步

Download history 697/week @ 2024-03-13 605/week @ 2024-03-20 565/week @ 2024-03-27 345/week @ 2024-04-03 596/week @ 2024-04-10 317/week @ 2024-04-17 651/week @ 2024-04-24 1076/week @ 2024-05-01 957/week @ 2024-05-08 353/week @ 2024-05-15 713/week @ 2024-05-22 677/week @ 2024-05-29 398/week @ 2024-06-05 505/week @ 2024-06-12 408/week @ 2024-06-19 174/week @ 2024-06-26

1,626 每月下载次数

自定义许可

28KB
434

tokio-tasker

Build Status
crates.io
docs.rs

允许您停止和连接Tokio任务组。

请参阅文档示例


lib.rs:

允许您停止和连接Tokio任务组。

这是一个小型库,旨在帮助程序或服务中的许多独立任务以及需要关闭步骤的任务优雅地关闭。

用法

库的入口点是代表任务组的Tasker类型。

添加任务

任务可以通过 [spawn][Tasker::spawn()], [spawn_local][Tasker::spawn_local()], 或 [add_handle][Tasker::add_handle()] 添加到组中。

Tasker可以被自由克隆,克隆可以发送到其他任务/线程,也可以在克隆上启动任务。然而,除了 'main' 之外的所有克隆都需要被丢弃,以便 'main' 可以连接到所有任务。可以使用 [finish][Tasker::finish()] 来丢弃克隆,这建议用于明确性。这是为了避免在 Tasker 尝试连接时任务被启动的竞态条件。

警告:如果您持续向 Tasker 添加任务(如网络连接处理程序等),随着时间的推移,其内部存储可能会不合理地增长,因为它需要为每个任务保留一个句柄。为了解决这个问题,请定期使用 [poll_join][Tasker::poll_join()] 或 [try_poll_join][Tasker::try_poll_join()](例如,每几百个连接左右),这将定期清理 Tasker 存储中的完成任务。

停止任务

Tasker 提供了 Stopper,这是在任务组停止时解决的小型 future。这些可以用来包装单个 future、流或用于 select!() 等,详见 [stopper()][Tasker::stopper()]。

要向组发出停止所有任务的信号,可以在任何 Tasker 克隆上调用 [stop()][Tasker::stop()]。这将解决所有 Stopper 实例。注意,这不是竞态条件,你仍然可以获取额外的 Stopper(例如在其他线程等)并且可以设置新的任务,它们将被立即停止。

或者,你可以使用 Signaller 通过 signaller() 获取一个特殊的 Tasker 克隆,该克隆提供了 .stop() 方法,但在任务连接之前不需要完成/丢弃。

加入任务组

通常会有一个 'main' 实例的 Tasker,它将用于连接任务。(这不必是原始克隆,任何克隆都可以使用。)

在你想收集任务的地方调用 [join().await][Tasker::join()],例如在 main() 的末尾或类似位置。这将首先等待所有其他 Tasker 克隆完成/丢弃,然后逐个等待所有任务的连接句柄。

如果任何任务崩溃,join() 将传播崩溃。使用 [try_join()][Tasker::try_join()] 来自行处理连接结果。

还有 [poll_join()][Tasker::poll_join()] 和 [try_poll_join()][Tasker::try_poll_join()] 非异步变体,它们在不等待的情况下连接已完成的任务,并释放其句柄使用的内存。

最后,还有 [join_stream()][Tasker::join_stream()],它允许你异步接收任务结果,即任务终止时。

示例

main() 中使用 Tasker 的简单示例

#
#
#[tokio::main]
async fn main() -> Result<()> {
    let tasker = Tasker::new();

    let tasker2 = tasker.clone();
    // Spawn a task that will spawn some subtasks.
    // It uses the tasker2 clone to do this.
    tasker.spawn(async move {
        let pending = future::pending::<()>().unless(tasker2.stopper());
        tasker2.spawn(pending);

        let interval = time::interval(Duration::from_millis(10_000));
        let mut interval = IntervalStream::new(interval).take_until(tasker2.stopper());
        tasker2.spawn(async move { while let Some(_) = interval.next().await {} });

        // We're done spawning tasks on this clone.
        tasker2.finish();
    });

    // Get a Signaller clone for stopping the group in another task
    let signaller = tasker.signaller();
    tokio::spawn(async move {
        // .stop() the task group after 1s.
        time::sleep(Duration::from_millis(1_000)).await;
        signaller.stop();
    });

    // Join all the tasks.
    tasker.join().await;

    Ok(())
}

examples/echo.rs 中还有一个示例回声服务器。

依赖项

~2.9–9.5MB
~70K SLoC