7个稳定版本
2.1.0 | 2023年8月3日 |
---|---|
2.0.4 | 2023年7月23日 |
2.0.3 | 2023年7月21日 |
1.1.0 | 2023年7月17日 |
1.0.0 | 2023年6月6日 |
#182 in 异步
150KB
3.5K SLoC
Wasmcompute Actors
一个Rust actor模型库,旨在以一种有见地的模式围绕tokio进行包装,以提供类型安全的计算。
关于
尽管存在其他actor框架,但我希望创建自己的框架,提供对actor生命周期的高层次安全和控制。wasmcompute actors专注于忠实地遵循actor模型系统的理念,不允许程序化的actor执行异步任务。相反,它的编程模型要求用户启动匿名actor来处理异步任务。
我希望这个库可以完全围绕 tokio
提供包装,以便用户通过这个库与之交互。actor通过从 tokio
创建的通道相互连接。actor作为 tokio::task
运行,并分配一个通道以接收来自其他actor和其管理员的消息。
这个库旨在尝试不通过 Pin<Box<_>>
来暴露编程泛型futures的需求。
安装
通过将以下内容添加到您的 Cargo.toml 依赖项中安装 wasmcompute actors。
[dependencies]
am = "0.1"
Worlds
actor系统是在 Worlds 中创建的。这些worlds允许用户向系统传达他们想要的输入类型。world接收的事件将由用户提供的一个函数处理。world可以监听多种类型的事件。
预期此函数将处理路由并调用现有的actor系统。
use tokactor::{World}
struct Router {
db: Db
}
impl Actor for Router {}
fn main() {
let world = World::new().unwrap();
let db = world.with_state(async || Db::connect().await.unwrap());
let router = Router { db };
let tcp_input = world.tcp_component("localhost:8080", router);
world.on_input(tcp_input);
world.block_until_completion();
}
系统输入应该是轻量级的,并且包含很少的数据。系统内部actor应该能够使用这些输入对象来找到更大的数据。
与actor一起工作
actor是轻量级的 tokio::tasks
,并且始终在单个线程上运行。消息逐个处理,没有并行处理消息的可能性。
任何actor模型库都需要,这里有一个ping pong示例。
use tokactor::{Actor, Ask, Ctx, Message};
/// Actor that keeps count of the number of ping pong message it receives
pub struct PingPong {
counter: u8,
}
/// This is the types of message [PingPong] supports
#[derive(Debug, Clone)]
pub enum Msg {
Ping,
Pong,
}
impl Msg {
// retrieve the next message in the sequence
fn next(&self) -> Self {
match self {
Self::Ping => Self::Pong,
Self::Pong => Self::Ping,
}
}
// print out this message
fn print(&self) {
match self {
Self::Ping => print!("ping.."),
Self::Pong => print!("pong.."),
}
}
}
impl Actor for PingPong {}
impl Ask<Msg> for PingPong {
type Result = Msg;
// This is our main message handler
fn handle(&mut self, message: Msg, _: &mut Ctx<Self>) -> Self::Result {
message.print();
self.counter += 1;
message.next()
}
}
#[tokio::main]
async fn main() {
let handle = PingPong { counter: 0 }.start();
let mut message = Msg::Ping;
for _ in 0..10 {
message = handle.ask(message).await.unwrap();
}
let actor = handle
.await
.expect("Ping-pong actor failed to exit properly");
assert_eq!(actor.counter, 10);
println!("\nProcessed {} messages", actor.counter);
}
消息actor
由于消息是顺序处理的,所以无法使用另一个actor来停止当前正在执行的消息处理。相反,只有两个级别的邮箱队列。
- 由另一个actor产生的actor,能够从其生产者(管理者)接收更新。目前,这就是管理者关闭子actor的方式。
- 给定actor的正常邮箱,其中消息是顺序处理的。
消息类型
您可以发送给actor的三种不同类型的消息。它们是:send
、ask
和async_ask
。每种都有自己的用途,但也每种都有成本,因此只有当需要时才使用下一个格式。
send
表示actor实现了tokactor::Handler
特质。这种实现不返回答案。ask
表示actor实现了tokactor::Ask
特质。这种实现适用于返回一些预计算的州。可以返回一个预定的答案。async_ask
表示actor实现了tokactor::AsyncAsk
特质。这种实现需要actor返回一个可以返回给定答案的匿名异步actor。在需要更多处理来找到答案时最好使用。
内部消息与正常消息放在同一个邮箱中。它们为所有通用actor有自己的消息系统。主要用于通过await
一个ActorRef
来停止actor并返回其状态。这将销毁actor在程序其余部分的地址。
停止actor
有两种方式可以请求actor停止。
- 让actor在其生命周期的某个时刻停止
- 等待
ActorRef
,这将向actor发送一条消息以停止执行并返回自身
如果您想继续等待ActorRef
,但不想停止其执行,而是订阅其完成,则可以在ActorRef
上调用.wait_for_completion()
。这将允许actor执行,直到它决定停止,此时,actor上保存的值将被返回。
其他类型的actor
此actor库提供实用actor来处理不同的需求。目前,实用actor提供了以下功能
- 路由器
- 工作流
- 通用
路由器
适用于创建多个相同的基本actor,并以轮询配置向它们发送请求。
let builder = RouterBuilder::new(5);
let router = Router::<ChoosenActor>::new(builder);
let address = router.start();
for _ in 0..5 {
for i in 0..5 {
let actor = address.async_ask(Id(())).await.unwrap();
assert_eq!(actor.number, i + 1);
}
}
let _ = address.await;
工作流
与此actor库一起工作可能是一个非常艰难的体验。例如,库只提供用户对同步函数的访问,并且任何异步函数都必须在tokio任务上执行。追踪从actor到actor再到actor的系统中的消息链可能很困难。为了使数据流更容易跟踪,在核心actor库之上构建了实用概念。
工作流要么是异步函数,要么是处理给定输入并返回某种类型输出的ActorRef
。
通用actor
有时在创建一个actor时,您希望它具有极高的可配置性,从而导致一个通用的重型实现。共享此actor的地址将需要您的整个程序实现具有通用参数的actors。然而,通过使用CtxBuilder
构建actor,您可以创建一个具有通用重型实现的actor,然后通过多个不同的地址为给定消息提供访问权限。
let test = Test {
_a: 0_u8,
_b: 0_u16,
_c: 0_u32,
};
let ctx = CtxBuilder::new(test);
let ctx = ctx.sender::<MsgA<u8>>();
let ctx = ctx.asker::<MsgB<u16>>();
let ctx = ctx.ask_asyncer::<MsgC<u32>>();
// each address relates to one message
let (a1, a2, a3) = ctx.run();
a1.send(MsgA(1_u8)).await.unwrap();
a2.ask(MsgB(1_u16)).await.unwrap();
a3.ask_async(MsgC(1_u32)).await.unwrap();
路线图
库中缺少一些功能,添加这些功能将很智能。这些是我希望添加到库中以使其达到1.0.0
版本的功能。
- 长时间运行的actor,直到它们自己停止时才发送消息给自己
- 提供不要求tokio作为依赖项的方法
- 处理访问tcp的actor
- 处理访问udp的actor
- 处理访问文件系统的actor
- 处理访问终端的actor
- 允许supervisor actors在保持状态完整的情况下重启失败的actor
- 提供更多实用函数,用于创建可以硬编码的更大工作流(工作流构建器)
- 添加跟踪
- 提高测试数量
- 在PR上发布包含基准结果的评论
依赖项
~3–11MB
~110K SLoC