1 个不稳定版本
0.0.1 | 2024年7月10日 |
---|
#10 在 #reply
118 每月下载量
8KB
88 行
Mailboxxy
一个小型、受F#和Erlang启发的微actor库
我从未使用过像Akka这样的“大型”Actory系统。我使用并喜欢F#中的MailboxProcessor
,这是一个极轻量级的actor实现。
简而言之,它允许创建代理,这些代理内部以同步方式运行(这极大地简化了它们的实现),但以异步和并行方式相互以及与主线程运行。
在我的经验中,如果您有一个与多个调用者共享的状态资源,这种模式非常有用。
调用者不必同步访问资源,而是异步地向资源发送消息,资源在循环中处理它们,并可选择以答案响应。
针对actor的编程接口可能看起来有点不寻常,而不是接口(在.NET中)/ 特性(在Rust中),您定义一个具有所有可能调用作为联合情况的区分联合。通过返回消息的成员声明为类型 ReplyChannel<T>
。
我相信示例代码将使这更清楚
一个简单的“计数器”actor,它可以增加(每次增加1),减少(按指定值减少),或可以查询其当前值。
enum CounterMsg {
Increment,
Decrement(i32),
GetValue(ReplyChannel<i32>)
}
如果您有actor的引用
let mb : Mailbox<CounterMsg> = // ...
则可以使用两个函数 post
和 ask
发送消息。这两个都会异步发送消息,ask
将返回一个包含返回值的 Future
mb.post(CounterMsg::Increment);
mb.post(CounterMsg::Increment);
mb.post(CounterMsg::Increment);
let val = mb.ask(|rc| CounterMsg::GetValue(rc)).await;
assert_eq!(val, 3);
mb.post(CounterMsg::Decrement(1));
let val = mb.ask(|rc| CounterMsg::GetValue(rc)).await;
assert_eq!(val, 2);
这种模式的惊人优势(在我看来)是在具有许多线程的过程中,您可以有多个对此单个Counter actor的引用,并且在每个位置都可以向其发送消息,而无需担心同步或所有权。actor会内部同步一切。
当然,如果另一个线程在并行向上述代码块发送Increment或Decrement消息,那么GetValue可能会返回不同的值,分别是3和2,具体取决于调用的内容。
关于如何针对actor编程的问题已经解决,那么如何实现actor本身呢?
很简单,作为一个简单的循环,读取消息并处理它
// define the actor function
async fn mailbox_fn(ctx:MailboxContext<CounterMsg>) {
// local state
let mut count = 0;
loop {
let msg: CounterMsg = ctx.dequeue().await;
match msg {
CounterMsg::Increment => count = count + 1,
CounterMsg::Decrement(n) => count = count - n,
CounterMsg::GetValue(rc) => rc.reply(count)
}
}
}
// start the actor
let mb = start_mailbox(mailbox_fn);
(请注意,为了简洁,此代码已简化并省略了一些参数,有关详细信息,请参阅lib.rs中的单元测试)
许可证
0BSD
参考文献
如上文所述,这部分内容深受 F# MailboxProcess 的启发(而 F# MailboxProcess 的灵感似乎来源于 Erlang)。
在我自己实现这一功能之前,我搜索了现有的微actor框架,并找到了 Gary Watson 开发的一个不同的 MailboxProcessor 调用。Gary Watson 的实现与 F# 实现(以及我的实现)不同之处在于,每个actor只有一个可能的返回值,而不是每个消息案例。它似乎没有移植 ReplyChannel 的概念。
依赖项
~5–14MB
~173K SLoC