1个不稳定版本
0.0.4 | 2024年4月27日 |
---|---|
0.0.3 |
|
0.0.2 |
|
0.0.1 |
|
#412 在 并发
每月118次下载
52KB
986 行
类似于Erlang/OTP的自愈、异步进程分层。
在复杂的并发系统中,确保不同进程之间良好同步是很重要的。特别是,进程需要按正确顺序启动,在出现错误时,需要适当地关闭并清理。当构建需要优雅地处理部分故障的鲁棒并发系统时,这个问题变得尤其困难,通常需要通过某种形式的自动化恢复失败组件。
通常,这些需求被忽视或通过一系列的 ad hoc 机制来满足。如果您曾经发现自己启动了许多异步任务,然后不知道如何处理所有这些 JoinHandle
,Spry提供了一个非常好的默认答案。
Spry 是一个库,它支持一组用于有序启动、关闭和重启并发 [子] 进程分层或 [系统] 的常见约定。其设计灵感来自 Erlang 的 OTP supervisor 行为,适用于 Rust。
Spry 还旨在易于采用,因为它对它所管理的进程风格没有固定看法。进程可以选择在需要时采用 Spry 的约定。
Hello Spry
作为一个简单的例子,我们可以定义一个包含两个管理子进程的最高层系统。
use std::time::Duration;
use tokio::sync::mpsc;
use spry::prelude::*;
// Child processes are defined by "startup functions", asynchronous functions that eventually
// spawn the main child process body. They may also optionally return a value.
// When a child's startup function returns, it's assumed that the child is in a *stable* state.
async fn receiver() -> MainLoop<mpsc::UnboundedSender<String>> {
// While creating a channel is synchronous, we could also perform asynchronous setup work.
let (tx, mut rx) = mpsc::unbounded_channel();
// After startup, we return a join handle pointing at the managed process and, optionally,
// any other data that should be available after this process is ready. In this case, we
// return an mpsc sender used to communicate with this process.
MainLoop::new_returning(tx, tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
println!("Received: {}", msg);
}
}))
}
// Startup functions may also be defined by implementing the `Child` trait. This can be useful
// when a child process is parameterized.
struct Sender(mpsc::UnboundedSender<String>);
impl<'a> Child<'a, ()> for Sender {
// Startup functions receive a `Context` object that provides access to different control
// services offered by Spry. In particular here we'll check to see whether this process has
// been asked to "settle", i.e. gracefully terminate.
//
// Processes that are not responsive to being asked to settle will eventually be forcefully
// aborted. In this case, the process will simply fail to return from the next `.await` point.
async fn start(self, cx: Context) -> MainLoop<()> {
let msgs = ["Hello", "from", "Spry"];
MainLoop::new(tokio::spawn(async move {
// We'll see below that we spawn this child as a "permanent" child. This implies that we
// expect it to run forever, thus the `.cycle()`.
for msg in msgs.iter().cycle() {
// These checks are optional, but they give you more control over process lifecycles
if cx.is_settled() { break; }
// if the process panics on this unwrap, Spry will attempt to restart it
self.0.send(msg.to_string()).unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
}
}))
}
}
// Finally, we define a "system", built from these two child processes. Systems are also defined
// by their start functions, though they take a different form. Due to present type system
// limitations this must be defined via a trait on a type you define.
struct HelloWorld;
impl System<&'static str> for HelloWorld {
async fn start(&self, scope: &mut Scope<&'static str>) {
// This declares the children involved in this system and also how they connect together.
// By default, children are considered "permanent". This implies that if either child
// terminates, normally or otherwise, this system will gracefully shut down and then start
// each child again using this method.
let send_channel = scope.child("receiver", |_| receiver()).spawn().await;
scope.child("sender", Sender(send_channel)).spawn().await;
// This example doesn't include any, but we could also launch sub-systems here as children.
// Failures and restarts in these subsystems are isolated from their parent until they reach
// a configurable limit after which those failures are escalated.
//
// scope.system("subsystem", Subsystem).spawn().await;
}
}
#[tokio::main]
async fn main() {
// Spry begins by declaring a top-level system.
let toplevel = Toplevel::new("app", HelloWorld);
// We can make use of the top-level shutdown token to hook Spry into external signals.
tokio::spawn({
let token = toplevel.shutdown_token().clone();
async move {
// we'll have both ctrl_c and a timer as system termination signals
// 600ms is about 6 messages from our sender
tokio::select! {
_ = tokio::signal::ctrl_c() => {},
_ = tokio::time::sleep(Duration::from_millis(600)) => {}
}
// This will cause a graceful shutdown to cascade through the entire process hierarchy in
// reverse order of startup.
token.signal_shutdown()
}
});
// When using Spry, it's a good idea to disable the default panic handler.
std::panic::set_hook(Box::new(|_| {}));
// We join the top-level system which will start and operate the entire process hierarchy.
//
// This will return either when:
// - the toplevel has gracefully shut down from receiving a ctrl-c signal, or
// - sufficiently frequent failures have occurred that the system decides it is unrecoverable
match toplevel.start().await {
Ok(()) => println!("normal app termination"),
Err(e) => println!("abnormal app termination: {:?}", e),
}
}
进程生命周期
Spry 中最重要的概念是进程的 生命周期。这是一个建立在 Rust 典型异步启动模式之上的覆盖。
一个活动的进程可能处于三种状态之一
- 启动中,此时进程尚未稳定/就绪,后续启动工作必须等待。在此期间,进程启动函数的异步主体正在运行/轮询。一旦返回,该进程被认为是 运行中 的。
- 运行中,此时进程被认为是稳定且准备好执行工作。如果进程停止,则认为是正常终止,但也可能崩溃,是 异常 终止。最后,在任何时候都可能要求进程 解决 外部问题。进程不必响应,但可以选择通过其 [上下文] 观察解决来加入。
- 终止处理,当进程处于待强制终止状态(即被中断)。进程会被分配一定的时间来自清理并正常退出。如果超出了这个时间限制,它将被终止。一个被终止的进程将不会从其下一个
.await
点返回。
活动进程最终可能像上面描述的那样终止。当这种情况发生时,Spry 将观察终止并按照进程配置的生命周期 [策略] 进行响应。
- 永久子进程应无限期运行。如果它们因任何原因而终止,这将触发父系统的重启,并且它们自己也将被重启。
- 临时子进程应运行然后最终正常终止。如果它们崩溃,则将触发父系统的重启,并且它们自己也将被重启。
- 临时子进程可能因任何原因终止而不会导致重启。它们永远不会被重启,即使它们因兄弟姐妹进程的失败而被终止。
"让它崩溃"
Spry 被设计成鼓励一种编程风格,其中进程被设计成在遇到问题时简单地失败并重启。这在 Erlang/OTP 中是一种常见的模式,有时被称为 "让它崩溃"。
这种风格是由观察到的现象所激发的,即通常意外和错误状态是短暂的。我们不是试图用一系列方法来防御性地处理每个意外结果,而是简单地失败、重启并在不久后再次尝试。希望这次错误会被避免。
这种风格一开始可能让人感到不舒服,因为它倾向于偏向性。鼓励广泛使用 .unwrap()
、.expect()
,甚至 panic!()
。Spry 会捕获这些操作并执行部分重启。
尽管如此,让它崩溃不能替代在代码库中那些不能容忍失败和重启的部分的深思熟虑的防御性编程。在这些地方,Rust 强大的错误处理系统至关重要。
更重要的是,并发性很难,高度并发的系统虽然有价值,但往往成为 "复杂系统"。Spry 不是万能的。监督和重启是隔离部分失败的好默认策略。同时,实现健壮性需要许多技术和社会系统。
与 tracing
的集成
为此,Spry 随带内置了对 tracing
crate 的支持。这使得跟踪系统中的进程和理解它们如何交互变得更容易。事实上,这是在 Spry 系统内报告部分失败的唯一方式。
依赖
~4.5–6.5MB
~107K SLoC