6个版本

0.2.1 2022年9月8日
0.2.0 2022年9月4日
0.2.0-beta.12022年8月7日
0.1.0 2022年8月1日
0.0.2 2022年7月10日

#1148 in 并发


用于 2 个crates(通过 zestors-core

MIT/Apache

105KB
2.5K SLoC

tiny-actor

Crates.io Documentation

tiny-actor是一个针对Rust的最小且无偏见的actor库。

该库将Inboxestasks的概念合并,从而得到actor:这个基本构建块允许我们构建具有可靠关闭行为的简单池和监督树。

tiny-actor不会追求更高级的API,而是作为编写良好行为的tokio-actors的简单方式。(如这里所述)

如果您正在寻找一个完整的actor框架,请查看Zestors。它基于tiny-actor的构建块。

概念

以下简要概述了tiny-actor的所有概念。有关使用方法的更多详细信息,请参阅crate文档

通道

通道是连接InboxesAddressesChildren的耦合体。每个通道都包含以下rust结构体:

  • 单个Child(Pool)
  • 一个或多个Addresses
  • 一个或多个Inboxes

以下图表展示了所使用的命名视觉表示

|¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯|
|                            Channel                          |
|  |¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯|  |¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯|  |
|  |              actor                |  |   Child(Pool)  |  |
|  |  |¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯|  |  |________________|  |
|  |  |         process(es)         |  |                      |
|  |  |  |¯¯¯¯¯¯¯¯¯¯|  |¯¯¯¯¯¯¯¯¯|  |  |  |¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯|  |
|  |  |  |   task   |  |  Inbox  |  |  |  |  Address(es)   |  |
|  |  |  |__________|  |_________|  |  |  |________________|  |
|  |  |_____________________________|  |                      |
|  |___________________________________|                      |
|_____________________________________________________________|

actor

actor一词用于描述属于单个Channelprocess组。

process

process一词用于描述与Inbox配对的task

Inbox

Inbox是通道的接收器,主要用于从通道中取出消息。Inboxes可以通过启动新的process来创建,并且应与其启动的task保持耦合:当task退出时,Inbox才应该被丢弃。

Address

AddressChannel 的可克隆发送者,主要用于向 actor 发送消息。当 actor 退出时,可以等待 Addresses,它会返回。

子(池)

Child 是一个由一个 process 组成的 actor 的句柄。它可以被等待以返回派生 task 的退出值。由于 Child 不可克隆,因此它是唯一的 Channel。当它被丢弃时,actor 将会被 halted 并随后 aborted,可以通过断开 Child 来改变这种行为。

ChildPoolChild 类似,但 actor 由多个 processes 组成。可以通过流式传输 ChildPool 来获取所有派生 tasks 的退出值。在 actor 被派生后,可以派生更多 processes,也可以停止 actor 的部分 processes

关闭

一旦 Channel 被关闭,就不再可能向其中发送新消息,但仍可以取出任何留下的消息。关闭的 Channel 的进程不一定必须退出,但可以继续运行。任何发送者都会收到一个 SendError::Closed 通知,而接收者在 Channel 被清空后,将收到 RecvError::ClosedAndEmpty

停止

一个 process 可以被精确地停止一次,通过接收一个 RecvError::Halted,之后它应该退出。一个 actor 可以部分停止,这意味着只有一些它的 processeses 被停止。

终止

可以通过 tokio 的 abort 方法来终止一个 actor。这将导致 tasks 突然退出,并可能留下不良状态,尽可能使用 halt 而不是 abort

退出

退出可以指两个独立的事件,在良好的实践中,这两个事件总是同时发生的。

  • process 可以通过丢弃它的 Inbox 来退出,一旦一个 Channel 中所有 Inboxes 都被丢弃,actor 就已经 exited。这种退出类型可以使用 has_exited 在任何时间从 Channel 中检索。
  • task 可以退出,这意味着 task 已经不再活跃。这只能通过等待 Child(Pool) 或通过调用 is_finished 来从 Child 查询。

一个 actor 可以是 attacheddetached,这表示当 Child(Pool) 被丢弃时应该发生什么。

  • 如果它是 attached,则它会自动停止所有 processes。在 abort-timer 到期后,所有进程都将被 aborted
  • 如果它是 detached,则在丢弃 Child(Pool) 时不会发生任何事情。

容量

一个 Channel 可以是 boundedunbounded

  • 一个有界 Channel 可以接收消息,直到其容量达到。在达到容量后,发送者必须等待直到有空间可用。
  • 一个无界 Channel 没有这个限制,而是应用一个反压算法:在 Channel 中的消息越多,发送者必须等待的时间就越长,才能允许发送。

标识符

每个actor都有一个在生成时生成的唯一标识符,这个标识符在创建后不能更改。

入门指南

基本示例

use std::time::Duration;
use tiny_actor::*;

#[tokio::main]
async fn main() {
    // First we spawn an actor with a default config, and an inbox which receives u32 messages.
    let (mut child, address) = spawn(Config::default(), |mut inbox: Inbox<u32>| async move {
        loop {
            // This loops and receives messages
            match inbox.recv().await {
                Ok(msg) => println!("Received message: {msg}"),
                Err(error) => match error {
                    RecvError::Halted => {
                        println!("actor has received halt signal - Exiting now...");
                        break "Halt";
                    }
                    RecvError::ClosedAndEmpty => {
                        println!("Channel is closed - Exiting now...");
                        break "Closed";
                    }
                },
            }
        }
    });

    // Then we can send it messages
    address.send(10).await.unwrap();
    address.send(5).await.unwrap();

    tokio::time::sleep(Duration::from_millis(10)).await;

    // And finally shut the actor down, 
    // we give it 1 second to exit before aborting it.
    match child.shutdown(Duration::from_secs(1)).await {
        Ok(exit) => {
            assert_eq!(exit, "Halt");
            println!("actor exited with message: {exit}")
        }
        Err(error) => match error {
            ExitError::Panic(_) => todo!(),
            ExitError::Abort => todo!(),
        },
    }
}

带有ChildPool和自定义Config的示例

use futures::stream::StreamExt;
use std::time::Duration;
use tiny_actor::*;

#[tokio::main]
async fn main() {
    // First we spawn an actor with a custom config, and an inbox which receives u32 messages.
    // This will spawn 3 processes, with i = {0, 1, 2}.
    let (mut pool, address) = spawn_many(
        0..3,
        Config {
            link: Link::Attached(Duration::from_secs(1)),
            capacity: Capacity::Unbounded(BackPressure::exponential(
                5,
                Duration::from_nanos(25),
                1.3,
            )),
        },
        |i, mut inbox: Inbox<u32>| async move {
            loop {
                // Now every actor loops in the same way as in the basic example
                match inbox.recv().await {
                    Ok(msg) => println!("Received message on actor {i}: {msg}"),
                    Err(error) => match error {
                        RecvError::Halted => {
                            println!("actor has received halt signal - Exiting now...");
                            break "Halt";
                        }
                        RecvError::ClosedAndEmpty => {
                            println!("Channel is closed - Exiting now...");
                            break "Closed";
                        }
                    },
                }
            }
        },
    );

    tokio::time::sleep(Duration::from_millis(10)).await;

    // Send it the numbers 0..10, they will be spread across all processes.
    for num in 0..10 {
        address.send(num).await.unwrap()
    }

    // And finally shut the actor down, giving it 1 second before aborting.
    let exits = pool
        .shutdown(Duration::from_secs(1))
        .collect::<Vec<_>>() // Await all processes (using `futures::StreamExt::collect`)
        .await;

    // And assert that every exit is `Ok("Halt")`
    for exit in exits {
        match exit {
            Ok(exit) => {
                assert_eq!(exit, "Halt");
                println!("actor exited with message: {exit}")
            }
            Err(error) => match error {
                ExitError::Panic(_) => todo!(),
                ExitError::Abort => todo!(),
            },
        }
    }
}

依赖关系

~3–5MB
~83K SLoC