6个版本
0.2.1 | 2022年9月8日 |
---|---|
0.2.0 | 2022年9月4日 |
0.2.0-beta.1 | 2022年8月7日 |
0.1.0 | 2022年8月1日 |
0.0.2 | 2022年7月10日 |
#1148 in 并发
用于 2 个crates(通过 zestors-core)
105KB
2.5K SLoC
tiny-actor
tiny-actor是一个针对Rust的最小且无偏见的actor库。
该库将Inboxes
和tasks
的概念合并,从而得到actor:这个基本构建块允许我们构建具有可靠关闭行为的简单池和监督树。
tiny-actor不会追求更高级的API,而是作为编写良好行为的tokio-actors的简单方式。(如这里所述)
如果您正在寻找一个完整的actor框架,请查看Zestors。它基于tiny-actor的构建块。
概念
以下简要概述了tiny-actor的所有概念。有关使用方法的更多详细信息,请参阅crate文档。
通道
通道是连接Inboxes
、Addresses
和Children
的耦合体。每个通道都包含以下rust结构体:
- 单个
Child(Pool)
。 - 一个或多个
Addresses
。 - 一个或多个
Inboxes
。
以下图表展示了所使用的命名视觉表示
|¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯|
| Channel |
| |¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯| |¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯| |
| | actor | | Child(Pool) | |
| | |¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯| | |________________| |
| | | process(es) | | |
| | | |¯¯¯¯¯¯¯¯¯¯| |¯¯¯¯¯¯¯¯¯| | | |¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯| |
| | | | task | | Inbox | | | | Address(es) | |
| | | |__________| |_________| | | |________________| |
| | |_____________________________| | |
| |___________________________________| |
|_____________________________________________________________|
actor
actor
一词用于描述属于单个Channel
的process
组。
process
process
一词用于描述与Inbox
配对的task
。
Inbox
Inbox
是通道的接收器,主要用于从通道中取出消息。Inboxes
可以通过启动新的process
来创建,并且应与其启动的task
保持耦合:当task
退出时,Inbox
才应该被丢弃。
Address
Address
是 Channel
的可克隆发送者,主要用于向 actor
发送消息。当 actor
退出时,可以等待 Addresses
,它会返回。
子(池)
Child
是一个由一个 process
组成的 actor
的句柄。它可以被等待以返回派生 task
的退出值。由于 Child
不可克隆,因此它是唯一的 Channel
。当它被丢弃时,actor
将会被 halted
并随后 aborted
,可以通过断开 Child
来改变这种行为。
ChildPool
与 Child
类似,但 actor
由多个 processes
组成。可以通过流式传输 ChildPool
来获取所有派生 tasks
的退出值。在 actor
被派生后,可以派生更多 processes
,也可以停止 actor
的部分 processes
。
关闭
一旦 Channel
被关闭,就不再可能向其中发送新消息,但仍可以取出任何留下的消息。关闭的 Channel
的进程不一定必须退出,但可以继续运行。任何发送者都会收到一个 SendError::Closed
通知,而接收者在 Channel
被清空后,将收到 RecvError::ClosedAndEmpty
。
停止
一个 process
可以被精确地停止一次,通过接收一个 RecvError::Halted
,之后它应该退出。一个 actor
可以部分停止,这意味着只有一些它的 processeses
被停止。
终止
可以通过 tokio 的 abort 方法来终止一个 actor
。这将导致 tasks
突然退出,并可能留下不良状态,尽可能使用 halt
而不是 abort
。
退出
退出可以指两个独立的事件,在良好的实践中,这两个事件总是同时发生的。
process
可以通过丢弃它的Inbox
来退出,一旦一个Channel
中所有Inboxes
都被丢弃,actor
就已经exited
。这种退出类型可以使用has_exited
在任何时间从Channel
中检索。task
可以退出,这意味着task
已经不再活跃。这只能通过等待Child(Pool)
或通过调用is_finished
来从Child
查询。
链接
一个 actor
可以是 attached
或 detached
,这表示当 Child(Pool)
被丢弃时应该发生什么。
- 如果它是
attached
,则它会自动停止所有processes
。在abort-timer
到期后,所有进程都将被aborted
。 - 如果它是
detached
,则在丢弃Child(Pool)
时不会发生任何事情。
容量
一个 Channel
可以是 bounded
或 unbounded
。
- 一个有界
Channel
可以接收消息,直到其容量达到。在达到容量后,发送者必须等待直到有空间可用。 - 一个无界
Channel
没有这个限制,而是应用一个反压算法:在Channel
中的消息越多,发送者必须等待的时间就越长,才能允许发送。
标识符
每个actor都有一个在生成时生成的唯一标识符,这个标识符在创建后不能更改。
入门指南
基本示例
use std::time::Duration;
use tiny_actor::*;
#[tokio::main]
async fn main() {
// First we spawn an actor with a default config, and an inbox which receives u32 messages.
let (mut child, address) = spawn(Config::default(), |mut inbox: Inbox<u32>| async move {
loop {
// This loops and receives messages
match inbox.recv().await {
Ok(msg) => println!("Received message: {msg}"),
Err(error) => match error {
RecvError::Halted => {
println!("actor has received halt signal - Exiting now...");
break "Halt";
}
RecvError::ClosedAndEmpty => {
println!("Channel is closed - Exiting now...");
break "Closed";
}
},
}
}
});
// Then we can send it messages
address.send(10).await.unwrap();
address.send(5).await.unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
// And finally shut the actor down,
// we give it 1 second to exit before aborting it.
match child.shutdown(Duration::from_secs(1)).await {
Ok(exit) => {
assert_eq!(exit, "Halt");
println!("actor exited with message: {exit}")
}
Err(error) => match error {
ExitError::Panic(_) => todo!(),
ExitError::Abort => todo!(),
},
}
}
带有ChildPool和自定义Config的示例
use futures::stream::StreamExt;
use std::time::Duration;
use tiny_actor::*;
#[tokio::main]
async fn main() {
// First we spawn an actor with a custom config, and an inbox which receives u32 messages.
// This will spawn 3 processes, with i = {0, 1, 2}.
let (mut pool, address) = spawn_many(
0..3,
Config {
link: Link::Attached(Duration::from_secs(1)),
capacity: Capacity::Unbounded(BackPressure::exponential(
5,
Duration::from_nanos(25),
1.3,
)),
},
|i, mut inbox: Inbox<u32>| async move {
loop {
// Now every actor loops in the same way as in the basic example
match inbox.recv().await {
Ok(msg) => println!("Received message on actor {i}: {msg}"),
Err(error) => match error {
RecvError::Halted => {
println!("actor has received halt signal - Exiting now...");
break "Halt";
}
RecvError::ClosedAndEmpty => {
println!("Channel is closed - Exiting now...");
break "Closed";
}
},
}
}
},
);
tokio::time::sleep(Duration::from_millis(10)).await;
// Send it the numbers 0..10, they will be spread across all processes.
for num in 0..10 {
address.send(num).await.unwrap()
}
// And finally shut the actor down, giving it 1 second before aborting.
let exits = pool
.shutdown(Duration::from_secs(1))
.collect::<Vec<_>>() // Await all processes (using `futures::StreamExt::collect`)
.await;
// And assert that every exit is `Ok("Halt")`
for exit in exits {
match exit {
Ok(exit) => {
assert_eq!(exit, "Halt");
println!("actor exited with message: {exit}")
}
Err(error) => match error {
ExitError::Panic(_) => todo!(),
ExitError::Abort => todo!(),
},
}
}
}
依赖关系
~3–5MB
~83K SLoC