#tokio #tokio-task #async-io #async #io #non-blocking #future

tokio-task-manager

允许异步Tokio应用程序优雅地关闭,等待所有任务完成

2个不稳定版本

0.2.0 2022年5月28日
0.1.0 2022年5月27日

#634异步

Download history 132/week @ 2024-03-13 140/week @ 2024-03-20 107/week @ 2024-03-27 115/week @ 2024-04-03 103/week @ 2024-04-10 169/week @ 2024-04-17 86/week @ 2024-04-24 100/week @ 2024-05-01 102/week @ 2024-05-08 118/week @ 2024-05-15 123/week @ 2024-05-22 103/week @ 2024-05-29 204/week @ 2024-06-05 102/week @ 2024-06-12 50/week @ 2024-06-19 27/week @ 2024-06-26

393 每月下载量
用于 3 crates

MIT 许可证

22KB
171

tokio-task-manager

一个crate,提供同步原语,主要目标是允许使用Tokio运行时的异步应用程序能够优雅地关闭,即理想情况下,只有当所有活动任务完成它们正在进行的工作后,服务器进程才退出。

Crates.io Docs.rs MIT licensed Build Status

支持的Rust版本

v1.61.0及以上,语言版本 2021

示例

use std::time::Duration;
use tokio_task_manager::TaskManager;

#[tokio::main]
async fn main() {
    // An application requires only a single TaskManager,
    // created usually where you also build and start the Tokio runtime.
    let tm = TaskManager::new(Duration::from_millis(200));

    // In this example we spawn 10 tasks,
    // where each of them also spawns a task themselves,
    // resulting in a total of 20 tasks which we'll want to wait for.
    let (tx, mut rx) = tokio::sync::mpsc::channel(20);
    for i in 0..10 {
        let tx = tx.clone();
        let n = i;

        // create a task per task that we spawn, such that:
        // - the application can wait until the task is dropped,
        //   identifying the spawned task is finished;
        // - the spawn task knows that the application is gracefully shutting down (.wait);
        let mut task = tm.task();
        tokio::spawn(async move {
            // spawn also child task to test task cloning,
            // a task is typically cloned for tasks within tasks,
            // each cloned task also needs to be dropped prior to
            // the application being able to gracefully shut down.
            let mut child_task = task.clone();
            let child_tx = tx.clone();
            let m = n;
            tokio::spawn(async move {
                tokio::time::sleep(Duration::from_millis(m * 10)).await;
                // Using the tokio::select! macro you can allow a task
                // to either get to its desired work, or quit already
                // in case the application is planning to shut down.
                //
                // A typical use case of this is a server which is waiting
                // for an incoming request, which is a text-book example
                // of a task in idle state.
                tokio::select! {
                    result = child_tx.send((m+1)*10) => assert!(result.is_ok()),
                    _ = child_task.wait() => (),
                }
            });
            // Do the actual work.
            tokio::time::sleep(Duration::from_millis(n * 10)).await;
            tokio::select! {
                result = tx.send(n) => assert!(result.is_ok()),
                _ = task.wait() => (),
            }
        });
    }

    // we also create a task for something that will never finish,
    // just to show that the tokio::select! approach does work...
    let mut task = tm.task();
    tokio::spawn(async move {
        // spawn also child task to test task cloning
        let mut child_task = task.clone();
        tokio::spawn(async move {
            // should shut down rather than block for too long
            tokio::select! {
                _ = child_task.wait() => (),
                _ = tokio::time::sleep(Duration::from_secs(60)) => (),
            }
        });
        // should shut down rather than block for too long
        tokio::select! {
            _ = task.wait() => (),
            _ = tokio::time::sleep(Duration::from_secs(60)) => (),
        }
    });

    // sleep for 100ms, just to ensure that all child tasks have been spawned as well
    tokio::time::sleep(Duration::from_millis(100)).await;

    // drop our sender such that rx.recv().await will return None,
    // once our other senders have been dropped and the channel's buffer is empty
    drop(tx);

    // notify all spawned tasks that we wish to gracefully shut down
    // and wait until they do. The resulting boolean is true if the
    // waiting terminated gracefully (meaning all tasks quit on their own while they were idle).
    assert!(tm.wait().await);

    // collect all our results,
    // which we can do all at once given the channel
    // was created with a sufficient buffer size.
    let mut results = Vec::with_capacity(20);
    while let Some(n) = rx.recv().await {
        results.push(n);
    }

    // test to proof we received all expected results
    results.sort_unstable();
    assert_eq!(
        &results,
        &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
    );
}

如果您的应用程序的根任务是无限循环,您可能希望仅在收到SIGINT(CTRL+C)信号后才能优雅地关闭。在这种情况下,您将使用tm.wait().await而不是tm.shutdown_gracefully_on_ctrl_c().await

您可以在包含此crate的TCP Echo Server Example中看到这个概念的实际应用。您可以使用以下命令运行它

cargo run --example tcp-echo-server

在另一个终端中,您可以使用telnet等工具连接到它,以查看您的消息被返回

$ telnet 127.0.0.1 4000
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello

依赖项

~3–14MB
~132K SLoC