7个版本
0.1.6 | 2022年2月19日 |
---|---|
0.1.5 | 2022年2月19日 |
#1152 in 异步
每月21次下载
12KB
115 行
邮箱处理器自述文件
邮箱处理器是一个受FSharp邮箱处理器启发的轻量级异步actor抽象,该处理器又受到erlang的启发。
为什么使用这种抽象而不是更全面的抽象,如actix。主要是因为它简单且小巧,有时候你只需要一个简单的抽象。
这个抽象在fsharp社区中得到广泛使用,并已被证明非常有用。
那么这个是什么东西呢?这可能是你能想到的actor最基本的表示形式。它基本上是一个位于事件循环前面的队列。它的异步特性意味着当它没有积极处理消息时,它只占用很少的资源,因此如果你愿意,可以启动很多(不确定有多少,但几十万不会让我惊讶)。您提供处理从队列中接收到的消息的函数。状态作为参数传递给函数,函数的返回值是更新后的状态。您可以通过通道发送响应回原始调用者,或者以“fire_and_forget”的方式发送消息。
大多数情况下,我使用这个抽象来同步事物。我一次用它来接受来自端点的多个请求并序列化输出处理。我另一次用它来在处理运行之间协调对数据快照的获取,这些数据以请求的形式进入系统。
它对于解决各种并发问题非常实用...
这个邮箱处理器有一个FSharp原始版本没有的东西... 它基于有界通道(队列),这意味着您可以在一定程度上控制反压。
这个是基于async-std实现的,但很容易移植到tokio(如果这样做有意义的话)。如果有人在使用它时遇到与tokio相关的问题,我会接受一个tokio版本的pull request,或者我愿意构建一个tokio版本(应该不会花费太多精力)。
不明显,但你可以使用类似的方法从同步上下文中启动邮箱处理器...
task::block_on(mb); // 此方法将返回,当它返回时,事件循环将被启动并准备好接受消息。
发送消息时,你可能仍然需要处于异步上下文中 :)
以下是一个使用邮箱处理器的计数器示例
enum SendMessageTypes {
Increment(i32),
GetCurrentCount,
Decrement(i32),
}
let mb = MailboxProcessor::<SendMessageTypes, i32>::new(
BufferSize::Default,
0,
|msg, state, reply_channel| async move {
match msg {
SendMessageTypes::Increment(x) => {
OptionFuture::from(reply_channel.map(|rc| async move {
rc.send(state + x).await.unwrap()
})).await;
state + x
},
SendMessageTypes::GetCurrentCount => {
OptionFuture::from(reply_channel.map(|rc| async move {
rc.send(state).await.unwrap()
})).await;
state
},
SendMessageTypes::Decrement(x) => {
OptionFuture::from(reply_channel.map(|rc| async move {
rc.send(state - x).await.unwrap()
})).await;
state - x
},
}
}
).await;
assert_eq!(mb.send(SendMessageTypes::GetCurrentCount).await.unwrap(), 0);
mb.fire_and_forget(SendMessageTypes::Increment(55)).await.unwrap();
assert_eq!(mb.send(SendMessageTypes::GetCurrentCount).await.unwrap(), 55);
mb.fire_and_forget(SendMessageTypes::Increment(55)).await.unwrap();
assert_eq!(mb.send(SendMessageTypes::GetCurrentCount).await.unwrap(), 110);
mb.fire_and_forget(SendMessageTypes::Decrement(10)).await.unwrap();
assert_eq!(mb.send(SendMessageTypes::GetCurrentCount).await.unwrap(), 100);
assert_eq!(mb.send(SendMessageTypes::Increment(55)).await.unwrap(), 155);
assert_eq!(mb.send(SendMessageTypes::GetCurrentCount).await.unwrap(), 155);
依赖项
~6–18MB
~219K SLoC