#优雅关闭 #关闭信号 #关闭 #优雅 #异步

async-shutdown

异步优雅关闭的一站式解决方案

8个版本

0.2.2 2024年3月22日
0.2.1 2023年10月8日
0.2.0 2023年9月26日
0.1.4 2024年3月22日
0.1.0-alpha42021年9月29日

#53 in 异步

Download history 7092/week @ 2024-04-07 7585/week @ 2024-04-14 12000/week @ 2024-04-21 15444/week @ 2024-04-28 11994/week @ 2024-05-05 15723/week @ 2024-05-12 21066/week @ 2024-05-19 18915/week @ 2024-05-26 19081/week @ 2024-06-02 11942/week @ 2024-06-09 14798/week @ 2024-06-16 17837/week @ 2024-06-23 13444/week @ 2024-06-30 8601/week @ 2024-07-07 8356/week @ 2024-07-14 9040/week @ 2024-07-21

39,872 每月下载量
9 个crates中使用 (6 直接)

BSD-2-Clause OR Apache-2.0

49KB
602

async-shutdown

异步代码中优雅关闭的无感知一站式解决方案。

该crate解决了两个相互关联但不同的问题,涉及优雅关闭。

  • 当接收到关闭信号时,您必须能够停止正在运行的未来。
  • 您必须能够等待未来完成潜在的清理。
  • 您想知道关闭的原因(例如,设置进程退出代码)。

所有这些问题都由ShutdownManager结构体处理。

停止正在运行的未来

您可以使用ShutdownManager::wait_shutdown_triggered()获取一个未来,使其等待关闭信号。在这种情况下,您必须编写异步代码以适当响应关闭信号。

或者,您可以使用ShutdownManager::wrap_cancel()将未来包装在取消(通过丢弃)中,当关闭被触发时。这不需要包装的未来知道任何关于关闭信号的信息,但它也不允许未来运行自定义关闭代码。

要触发关闭信号,只需调用 ShutdownManager::trigger_shutdown(reason)。关闭原因可以是任何类型,只要它实现了 Clone。如果您想传递一个非 Clone 对象或复制成本高昂的对象,可以将它包装在一个 Arc 中。

等待未来完成。

在实际关闭之前,您可能还需要等待某些未来完成,而不是简单地丢弃它们。这可能对于干净地关闭和防止数据丢失很重要。您可以使用 ShutdownManager::wait_shutdown_complete() 来做到这一点。该函数返回一个只有在关闭“完成”时才会完成的未来。

您还必须通过调用 ShutdownManager::delay_shutdown_token()ShutdownManager::wrap_delay_shutdown() 来防止关闭过早完成。该 ShutdownManager::delay_shutdown_token() 函数为您提供 DelayShutdownToken,它阻止关闭完成。要允许关闭完成,只需丢弃令牌。或者,ShutdownManager::wrap_delay_shutdown() 包装现有的未来,并将防止关闭完成,直到未来完成或被丢弃。

请注意,只有当关闭尚未完成时,您才能延迟关闭完成。如果关闭已经完成,这些函数将返回一个错误。

您还可以使用令牌来包装一个未来,以 DelayShutdownToken::wrap_future()。如果您已经有了令牌,这允许您在不担心关闭可能已经完成的情况下包装未来。

自动触发关闭

您还可以使用 TriggerShutdownToken 自动触发关闭。通过调用 ShutdownManager::trigger_shutdown_token() 来获取令牌。当令牌被丢弃时,将触发关闭。

您可以使用 ShutdownManager::wrap_trigger_shutdown()TriggerShutdownToken::wrap_future() 来包装一个future。当包装的future完成(或被丢弃)时,将触发关闭。这可以在关键任务停止时作为一个方便的关闭触发方式。

Future与Task的区别

在使用 JoinHandles 时要小心,不要将其视为常规future。根据您的异步运行时,当您丢弃一个 JoinHandle 时,这通常不会导致任务停止。它可能只是将连接句柄从任务中分离出来,这意味着您的任务仍在运行。如果不小心,这仍可能导致关闭时的数据丢失。一般来说,您通常应该在将future在新的任务上启动之前先进行包装。

示例

此示例是一个基于tokio的TCP回声服务器。它简单地将其从对等方接收到的所有内容回显给同一对等方,并且它使用此crate进行优雅关闭。

如果您想本地运行它,此示例也存储在存储库中,名称为 tcp-echo-server

use async_shutdown::ShutdownManager;
use std::net::SocketAddr;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};

#[tokio::main]
async fn main() {
    // Create a new shutdown object.
    // We will clone it into all tasks that need it.
    let shutdown = ShutdownManager::new();

    // Spawn a task to wait for CTRL+C and trigger a shutdown.
    tokio::spawn({
        let shutdown = shutdown.clone();
        async move {
            if let Err(e) = tokio::signal::ctrl_c().await {
                eprintln!("Failed to wait for CTRL+C: {}", e);
                std::process::exit(1);
            } else {
                eprintln!("\nReceived interrupt signal. Shutting down server...");
                shutdown.trigger_shutdown(0).ok();
            }
        }
    });

    // Run the server and set a non-zero exit code if we had an error.
    let exit_code = match run_server(shutdown.clone(), "[::]:9372").await {
        Ok(()) => {
            shutdown.trigger_shutdown(0).ok();
        },
        Err(e) => {
            eprintln!("Server task finished with an error: {}", e);
            shutdown.trigger_shutdown(1).ok();
        },
    };

    // Wait for clients to run their cleanup code, then exit.
    // Without this, background tasks could be killed before they can run their cleanup code.
    let exit_code = shutdown.wait_shutdown_complete().await;

    std::process::exit(exit_code);
}

async fn run_server(shutdown: ShutdownManager<i32>, bind_address: &str) -> std::io::Result<()> {
    let server = TcpListener::bind(&bind_address).await?;
    eprintln!("Server listening on {}", bind_address);

    // Simply use `wrap_cancel` for everything, since we do not need clean-up for the listening socket.
    // See `handle_client` for a case where a future is given the time to perform logging after the shutdown was triggered.
    while let Ok(connection) = shutdown.wrap_cancel(server.accept()).await {
        let (stream, address) = connection?;
        tokio::spawn(handle_client(shutdown.clone(), stream, address));
    }

    Ok(())
}

async fn handle_client(shutdown: ShutdownManager<i32>, mut stream: TcpStream, address: SocketAddr) {
    eprintln!("Accepted new connection from {}", address);

    // Make sure the shutdown doesn't complete until the delay token is dropped.
    //
    // Getting the token will fail if the shutdown has already started,
    // in which case we just log a message and return.
    //
    // If you already have a future that should be allowed to complete,
    // you can also use `shutdown.wrap_delay_shutdown(...)`.
    // Here it is easier to use a token though.
    let _delay_token = match shutdown.delay_shutdown_token() {
        Ok(token) => token,
        Err(_) => {
            eprintln!("Shutdown already started, closing connection with {}", address);
            return;
        }
    };

    // Now run the echo loop, but cancel it when the shutdown is triggered.
    match shutdown.wrap_cancel(echo_loop(&mut stream)).await {
        Ok(Err(e)) => eprintln!("Error in connection {}: {}", address, e),
        Ok(Ok(())) => eprintln!("Connection closed by {}", address),
        Err(_exit_code) => eprintln!("Shutdown triggered, closing connection with {}", address),
    }

    // The delay token will be dropped here, allowing the shutdown to complete.
}

async fn echo_loop(stream: &mut TcpStream) -> std::io::Result<()> {
    // Echo everything we receive back to the peer in a loop.
    let mut buffer = vec![0; 512];
    loop {
        let read = stream.read(&mut buffer).await?;
        if read == 0 {
            break;
        }
        stream.write(&buffer[..read]).await?;
    }

    Ok(())
}

无运行时依赖