7个稳定版本

2.1.0 2023年8月3日
2.0.4 2023年7月23日
2.0.3 2023年7月21日
1.1.0 2023年7月17日
1.0.0 2023年6月6日

#182 in 异步

MIT 许可证

150KB
3.5K SLoC

Wasmcompute Actors

一个Rust actor模型库,旨在以一种有见地的模式围绕tokio进行包装,以提供类型安全的计算。

关于

尽管存在其他actor框架,但我希望创建自己的框架,提供对actor生命周期的高层次安全和控制。wasmcompute actors专注于忠实地遵循actor模型系统的理念,不允许程序化的actor执行异步任务。相反,它的编程模型要求用户启动匿名actor来处理异步任务。

我希望这个库可以完全围绕 tokio 提供包装,以便用户通过这个库与之交互。actor通过从 tokio 创建的通道相互连接。actor作为 tokio::task 运行,并分配一个通道以接收来自其他actor和其管理员的消息。

这个库旨在尝试不通过 Pin<Box<_>> 来暴露编程泛型futures的需求。

安装

通过将以下内容添加到您的 Cargo.toml 依赖项中安装 wasmcompute actors。

[dependencies]
am = "0.1"

Worlds

actor系统是在 Worlds 中创建的。这些worlds允许用户向系统传达他们想要的输入类型。world接收的事件将由用户提供的一个函数处理。world可以监听多种类型的事件。

预期此函数将处理路由并调用现有的actor系统。

use tokactor::{World}

struct Router {
    db: Db
}
impl Actor for Router {}

fn main() {
    let world = World::new().unwrap();
    
    let db = world.with_state(async || Db::connect().await.unwrap());

    let router = Router { db };

    let tcp_input = world.tcp_component("localhost:8080", router);

    world.on_input(tcp_input);

    world.block_until_completion();
}

系统输入应该是轻量级的,并且包含很少的数据。系统内部actor应该能够使用这些输入对象来找到更大的数据。

与actor一起工作

actor是轻量级的 tokio::tasks,并且始终在单个线程上运行。消息逐个处理,没有并行处理消息的可能性。

任何actor模型库都需要,这里有一个ping pong示例。

use tokactor::{Actor, Ask, Ctx, Message};

/// Actor that keeps count of the number of ping pong message it receives
pub struct PingPong {
    counter: u8,
}

/// This is the types of message [PingPong] supports
#[derive(Debug, Clone)]
pub enum Msg {
    Ping,
    Pong,
}
impl Msg {
    // retrieve the next message in the sequence
    fn next(&self) -> Self {
        match self {
            Self::Ping => Self::Pong,
            Self::Pong => Self::Ping,
        }
    }
    // print out this message
    fn print(&self) {
        match self {
            Self::Ping => print!("ping.."),
            Self::Pong => print!("pong.."),
        }
    }
}

impl Actor for PingPong {}
impl Ask<Msg> for PingPong {
    type Result = Msg;

    // This is our main message handler
    fn handle(&mut self, message: Msg, _: &mut Ctx<Self>) -> Self::Result {
        message.print();
        self.counter += 1;
        message.next()
    }
}

#[tokio::main]
async fn main() {
    let handle = PingPong { counter: 0 }.start();
    let mut message = Msg::Ping;
    for _ in 0..10 {
        message = handle.ask(message).await.unwrap();
    }
    let actor = handle
        .await
        .expect("Ping-pong actor failed to exit properly");
    assert_eq!(actor.counter, 10);
    println!("\nProcessed {} messages", actor.counter);
}

消息actor

由于消息是顺序处理的,所以无法使用另一个actor来停止当前正在执行的消息处理。相反,只有两个级别的邮箱队列。

  1. 由另一个actor产生的actor,能够从其生产者(管理者)接收更新。目前,这就是管理者关闭子actor的方式。
  2. 给定actor的正常邮箱,其中消息是顺序处理的。

消息类型

您可以发送给actor的三种不同类型的消息。它们是:sendaskasync_ask。每种都有自己的用途,但也每种都有成本,因此只有当需要时才使用下一个格式。

  1. send表示actor实现了tokactor::Handler特质。这种实现不返回答案。
  2. ask表示actor实现了tokactor::Ask特质。这种实现适用于返回一些预计算的州。可以返回一个预定的答案。
  3. async_ask表示actor实现了tokactor::AsyncAsk特质。这种实现需要actor返回一个可以返回给定答案的匿名异步actor。在需要更多处理来找到答案时最好使用。

内部消息与正常消息放在同一个邮箱中。它们为所有通用actor有自己的消息系统。主要用于通过await一个ActorRef来停止actor并返回其状态。这将销毁actor在程序其余部分的地址。

停止actor

有两种方式可以请求actor停止。

  1. 让actor在其生命周期的某个时刻停止
  2. 等待ActorRef,这将向actor发送一条消息以停止执行并返回自身

如果您想继续等待ActorRef,但不想停止其执行,而是订阅其完成,则可以在ActorRef上调用.wait_for_completion()。这将允许actor执行,直到它决定停止,此时,actor上保存的值将被返回。

其他类型的actor

此actor库提供实用actor来处理不同的需求。目前,实用actor提供了以下功能

  • 路由器
  • 工作流
  • 通用

路由器

适用于创建多个相同的基本actor,并以轮询配置向它们发送请求。

let builder = RouterBuilder::new(5);
let router = Router::<ChoosenActor>::new(builder);
let address = router.start();
for _ in 0..5 {
    for i in 0..5 {
        let actor = address.async_ask(Id(())).await.unwrap();
        assert_eq!(actor.number, i + 1);
    }
}
let _ = address.await;

工作流

与此actor库一起工作可能是一个非常艰难的体验。例如,库只提供用户对同步函数的访问,并且任何异步函数都必须在tokio任务上执行。追踪从actor到actor再到actor的系统中的消息链可能很困难。为了使数据流更容易跟踪,在核心actor库之上构建了实用概念。

工作流要么是异步函数,要么是处理给定输入并返回某种类型输出的ActorRef

通用actor

有时在创建一个actor时,您希望它具有极高的可配置性,从而导致一个通用的重型实现。共享此actor的地址将需要您的整个程序实现具有通用参数的actors。然而,通过使用CtxBuilder构建actor,您可以创建一个具有通用重型实现的actor,然后通过多个不同的地址为给定消息提供访问权限。

let test = Test {
    _a: 0_u8,
    _b: 0_u16,
    _c: 0_u32,
};
let ctx = CtxBuilder::new(test);
let ctx = ctx.sender::<MsgA<u8>>();
let ctx = ctx.asker::<MsgB<u16>>();
let ctx = ctx.ask_asyncer::<MsgC<u32>>();
// each address relates to one message
let (a1, a2, a3) = ctx.run();
a1.send(MsgA(1_u8)).await.unwrap();
a2.ask(MsgB(1_u16)).await.unwrap();
a3.ask_async(MsgC(1_u32)).await.unwrap();

路线图

库中缺少一些功能,添加这些功能将很智能。这些是我希望添加到库中以使其达到1.0.0版本的功能。

  • 长时间运行的actor,直到它们自己停止时才发送消息给自己
  • 提供不要求tokio作为依赖项的方法
  • 处理访问tcp的actor
  • 处理访问udp的actor
  • 处理访问文件系统的actor
  • 处理访问终端的actor
  • 允许supervisor actors在保持状态完整的情况下重启失败的actor
  • 提供更多实用函数,用于创建可以硬编码的更大工作流(工作流构建器)
  • 添加跟踪
  • 提高测试数量
  • 在PR上发布包含基准结果的评论

依赖项

~3–11MB
~110K SLoC