#send-message #processor #abstraction #erlang #queue #little #was

mailbox_processor

受到FSharp邮箱处理器启发的轻量级actor抽象,该处理器又受到erlang的启发。

7个版本

0.1.6 2022年2月19日
0.1.5 2022年2月19日

#1152 in 异步

每月21次下载

LGPL-2.1-or-later

12KB
115

邮箱处理器自述文件

邮箱处理器是一个受FSharp邮箱处理器启发的轻量级异步actor抽象,该处理器又受到erlang的启发。

为什么使用这种抽象而不是更全面的抽象,如actix。主要是因为它简单且小巧,有时候你只需要一个简单的抽象。

这个抽象在fsharp社区中得到广泛使用,并已被证明非常有用。

那么这个是什么东西呢?这可能是你能想到的actor最基本的表示形式。它基本上是一个位于事件循环前面的队列。它的异步特性意味着当它没有积极处理消息时,它只占用很少的资源,因此如果你愿意,可以启动很多(不确定有多少,但几十万不会让我惊讶)。您提供处理从队列中接收到的消息的函数。状态作为参数传递给函数,函数的返回值是更新后的状态。您可以通过通道发送响应回原始调用者,或者以“fire_and_forget”的方式发送消息。

大多数情况下,我使用这个抽象来同步事物。我一次用它来接受来自端点的多个请求并序列化输出处理。我另一次用它来在处理运行之间协调对数据快照的获取,这些数据以请求的形式进入系统。

它对于解决各种并发问题非常实用...

这个邮箱处理器有一个FSharp原始版本没有的东西... 它基于有界通道(队列),这意味着您可以在一定程度上控制反压。

这个是基于async-std实现的,但很容易移植到tokio(如果这样做有意义的话)。如果有人在使用它时遇到与tokio相关的问题,我会接受一个tokio版本的pull request,或者我愿意构建一个tokio版本(应该不会花费太多精力)。

不明显,但你可以使用类似的方法从同步上下文中启动邮箱处理器...

task::block_on(mb); // 此方法将返回,当它返回时,事件循环将被启动并准备好接受消息。

发送消息时,你可能仍然需要处于异步上下文中 :)

以下是一个使用邮箱处理器的计数器示例


        enum SendMessageTypes {
            Increment(i32),
            GetCurrentCount,
            Decrement(i32),
        }

        let mb = MailboxProcessor::<SendMessageTypes, i32>::new( 
            BufferSize::Default, 
            0,  
            |msg, state, reply_channel| async move {
                match msg {
                    SendMessageTypes::Increment(x) => {
                        OptionFuture::from(reply_channel.map(|rc| async move {
                            rc.send(state + x).await.unwrap()
                        })).await;
                        state + x
                    },
                    SendMessageTypes::GetCurrentCount => {
                        OptionFuture::from(reply_channel.map(|rc| async move {
                            rc.send(state).await.unwrap()
                        })).await;
                        state
                    },
                    SendMessageTypes::Decrement(x) => {
                        OptionFuture::from(reply_channel.map(|rc| async move {
                            rc.send(state - x).await.unwrap()
                        })).await;
                        state - x
                    },
                }
            }
        ).await;

        assert_eq!(mb.send(SendMessageTypes::GetCurrentCount).await.unwrap(), 0);

        mb.fire_and_forget(SendMessageTypes::Increment(55)).await.unwrap();
        assert_eq!(mb.send(SendMessageTypes::GetCurrentCount).await.unwrap(), 55);

        mb.fire_and_forget(SendMessageTypes::Increment(55)).await.unwrap();
        assert_eq!(mb.send(SendMessageTypes::GetCurrentCount).await.unwrap(), 110);

        mb.fire_and_forget(SendMessageTypes::Decrement(10)).await.unwrap();
        assert_eq!(mb.send(SendMessageTypes::GetCurrentCount).await.unwrap(), 100);

        assert_eq!(mb.send(SendMessageTypes::Increment(55)).await.unwrap(), 155);
        assert_eq!(mb.send(SendMessageTypes::GetCurrentCount).await.unwrap(), 155);

依赖项

~6–18MB
~219K SLoC