8个版本
0.2.2 | 2024年3月22日 |
---|---|
0.2.1 |
|
0.2.0 |
|
0.1.4 | 2024年3月22日 |
0.1.0-alpha4 | 2021年9月29日 |
#53 in 异步
39,872 每月下载量
在 9 个crates中使用 (6 直接)
49KB
602 行
async-shutdown
异步代码中优雅关闭的无感知一站式解决方案。
该crate解决了两个相互关联但不同的问题,涉及优雅关闭。
- 当接收到关闭信号时,您必须能够停止正在运行的未来。
- 您必须能够等待未来完成潜在的清理。
- 您想知道关闭的原因(例如,设置进程退出代码)。
所有这些问题都由ShutdownManager
结构体处理。
停止正在运行的未来
您可以使用ShutdownManager::wait_shutdown_triggered()
获取一个未来,使其等待关闭信号。在这种情况下,您必须编写异步代码以适当响应关闭信号。
或者,您可以使用ShutdownManager::wrap_cancel()
将未来包装在取消(通过丢弃)中,当关闭被触发时。这不需要包装的未来知道任何关于关闭信号的信息,但它也不允许未来运行自定义关闭代码。
要触发关闭信号,只需调用 ShutdownManager::trigger_shutdown(reason)
。关闭原因可以是任何类型,只要它实现了 Clone
。如果您想传递一个非 Clone
对象或复制成本高昂的对象,可以将它包装在一个 Arc
中。
等待未来完成。
在实际关闭之前,您可能还需要等待某些未来完成,而不是简单地丢弃它们。这可能对于干净地关闭和防止数据丢失很重要。您可以使用 ShutdownManager::wait_shutdown_complete()
来做到这一点。该函数返回一个只有在关闭“完成”时才会完成的未来。
您还必须通过调用 ShutdownManager::delay_shutdown_token()
或 ShutdownManager::wrap_delay_shutdown()
来防止关闭过早完成。该 ShutdownManager::delay_shutdown_token()
函数为您提供 DelayShutdownToken
,它阻止关闭完成。要允许关闭完成,只需丢弃令牌。或者,ShutdownManager::wrap_delay_shutdown()
包装现有的未来,并将防止关闭完成,直到未来完成或被丢弃。
请注意,只有当关闭尚未完成时,您才能延迟关闭完成。如果关闭已经完成,这些函数将返回一个错误。
您还可以使用令牌来包装一个未来,以 DelayShutdownToken::wrap_future()
。如果您已经有了令牌,这允许您在不担心关闭可能已经完成的情况下包装未来。
自动触发关闭
您还可以使用 TriggerShutdownToken
自动触发关闭。通过调用 ShutdownManager::trigger_shutdown_token()
来获取令牌。当令牌被丢弃时,将触发关闭。
您可以使用 ShutdownManager::wrap_trigger_shutdown()
或 TriggerShutdownToken::wrap_future()
来包装一个future。当包装的future完成(或被丢弃)时,将触发关闭。这可以在关键任务停止时作为一个方便的关闭触发方式。
Future与Task的区别
在使用 JoinHandles
时要小心,不要将其视为常规future。根据您的异步运行时,当您丢弃一个 JoinHandle
时,这通常不会导致任务停止。它可能只是将连接句柄从任务中分离出来,这意味着您的任务仍在运行。如果不小心,这仍可能导致关闭时的数据丢失。一般来说,您通常应该在将future在新的任务上启动之前先进行包装。
示例
此示例是一个基于tokio的TCP回声服务器。它简单地将其从对等方接收到的所有内容回显给同一对等方,并且它使用此crate进行优雅关闭。
如果您想本地运行它,此示例也存储在存储库中,名称为 tcp-echo-server
。
use async_shutdown::ShutdownManager;
use std::net::SocketAddr;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
#[tokio::main]
async fn main() {
// Create a new shutdown object.
// We will clone it into all tasks that need it.
let shutdown = ShutdownManager::new();
// Spawn a task to wait for CTRL+C and trigger a shutdown.
tokio::spawn({
let shutdown = shutdown.clone();
async move {
if let Err(e) = tokio::signal::ctrl_c().await {
eprintln!("Failed to wait for CTRL+C: {}", e);
std::process::exit(1);
} else {
eprintln!("\nReceived interrupt signal. Shutting down server...");
shutdown.trigger_shutdown(0).ok();
}
}
});
// Run the server and set a non-zero exit code if we had an error.
let exit_code = match run_server(shutdown.clone(), "[::]:9372").await {
Ok(()) => {
shutdown.trigger_shutdown(0).ok();
},
Err(e) => {
eprintln!("Server task finished with an error: {}", e);
shutdown.trigger_shutdown(1).ok();
},
};
// Wait for clients to run their cleanup code, then exit.
// Without this, background tasks could be killed before they can run their cleanup code.
let exit_code = shutdown.wait_shutdown_complete().await;
std::process::exit(exit_code);
}
async fn run_server(shutdown: ShutdownManager<i32>, bind_address: &str) -> std::io::Result<()> {
let server = TcpListener::bind(&bind_address).await?;
eprintln!("Server listening on {}", bind_address);
// Simply use `wrap_cancel` for everything, since we do not need clean-up for the listening socket.
// See `handle_client` for a case where a future is given the time to perform logging after the shutdown was triggered.
while let Ok(connection) = shutdown.wrap_cancel(server.accept()).await {
let (stream, address) = connection?;
tokio::spawn(handle_client(shutdown.clone(), stream, address));
}
Ok(())
}
async fn handle_client(shutdown: ShutdownManager<i32>, mut stream: TcpStream, address: SocketAddr) {
eprintln!("Accepted new connection from {}", address);
// Make sure the shutdown doesn't complete until the delay token is dropped.
//
// Getting the token will fail if the shutdown has already started,
// in which case we just log a message and return.
//
// If you already have a future that should be allowed to complete,
// you can also use `shutdown.wrap_delay_shutdown(...)`.
// Here it is easier to use a token though.
let _delay_token = match shutdown.delay_shutdown_token() {
Ok(token) => token,
Err(_) => {
eprintln!("Shutdown already started, closing connection with {}", address);
return;
}
};
// Now run the echo loop, but cancel it when the shutdown is triggered.
match shutdown.wrap_cancel(echo_loop(&mut stream)).await {
Ok(Err(e)) => eprintln!("Error in connection {}: {}", address, e),
Ok(Ok(())) => eprintln!("Connection closed by {}", address),
Err(_exit_code) => eprintln!("Shutdown triggered, closing connection with {}", address),
}
// The delay token will be dropped here, allowing the shutdown to complete.
}
async fn echo_loop(stream: &mut TcpStream) -> std::io::Result<()> {
// Echo everything we receive back to the peer in a loop.
let mut buffer = vec![0; 512];
loop {
let read = stream.read(&mut buffer).await?;
if read == 0 {
break;
}
stream.write(&buffer[..read]).await?;
}
Ok(())
}