5 个版本

新功能 0.1.0 2024年8月15日
0.0.4 2024年8月14日
0.0.3 2024年8月13日
0.0.2 2024年8月5日
0.0.1 2024年7月18日

#159 in 过程宏

Download history 115/week @ 2024-07-14 18/week @ 2024-07-21 6/week @ 2024-07-28 119/week @ 2024-08-04 367/week @ 2024-08-11

每月下载量 523次

MIT 许可证

53KB
1K SLoC

Actix Async Handler

一个属性宏,用于支持为Actix演员编写 async 消息处理器

使用此宏可以将 此示例

fn handle(&mut self, _: Msg, _: &mut Context<Self>) -> Self::Result {
    AtomicResponse::new(Box::pin(
        async {}
            .into_actor(self)
            .map(|_, this, _| {
                this.0 = 30;
            })
            .then(|_, this, _| {
                sleep(Duration::from_secs(3)).into_actor(this)
            })
            .map(|_, this, _| {
                this.0 -= 1;
                this.0
            }),
    ))
}

转换为更易读的异步处理器

async fn handle(&mut self, _msg: Msg, _ctx: &mut Context<Self>) -> Self::Result {
    self.0 = 30;
    sleep(Duration::from_secs(3)).await;
    self.0 -= 1;
    self.0
}

用法

将 actix_async_handler 添加为开发依赖。

cargo add --dev actix_async_handler

如果您打算使用循环,也请添加 futures 作为依赖

cargo add futures

实现异步处理器时,用 #[async_handler] 属性标记它,如下所示


#[async_handler]
impl Handler<Msg> for MyActor {
    type Result = u64; // or whatever your message handler returns, no enclosing ResponseActFuture or AtomicFuture needed 
    async fn handle(&mut self, _msg: Msg, _ctx: &mut Context<Self>) -> Self::Result {
        // your handler code, for example
        self.other_actor_addr.send(OtherMsg()).await // yay! we can use await
    }

就这样!享受吧。

默认情况下,返回的未来将是 AtomicFuture,因此您的演员在完全解析任何await调用之前将不会处理任何其他传入的消息。这是对Hewitt原始模型的大部分尊重,让您可以在代码中抽象await,并像同步版本一样使用它。如果您更希望让您的演员在await之间处理消息,您可以通过用 #[async_handler(non_atomic) 而不是来注释您的处理器,将其更改为 ResponseActFuture

已知限制

已知无法正确转换的语言功能列表,以及可能存在的解决方案。

await结果的链式操作

以下代码(尚未)得到很好的翻译

let result = self.delegate_actor_addr.send(MyMsg).await.or_else(0) + 3

将可等待的调用隔离到其自己的表达式中

let await_result = self.delegate_actor_addr.send(MyMsg).await;
let result = await_result.or_else(0) + 3

if表达式

if表达式内的可变变量

以下代码将无法按预期工作

let mut result = None;

if some_condition {
    let returned_value = self.delegate_actor.send(message).await;
    result = returned_value.ok();
}

println!("{}", result); // Always prints None regardless of some_condition and returned_value

async_handler 宏将您的异步代码转换为“灾难之塔”,以确保正确地移动变量的最新值。

例如,以下代码


let a = call_a().await;
let b = call_b(a).await;
let c = call_c(b).await;
println!("{}, {}, {}", a, b, c)

变为(简化版)

wrap_future(call_a())
    .then(move |__res, __self, __ctx| {
        let a = __res;
        wrap_future(call_b(a))    
            .then(move |__res, __self, __ctx| {
                let b = __res;
                wrap_future(call_c(b))
                    .then(move |__res, __self, __ctx| {
                        let c = __res;
                        println!("{}, {}, {}", a, b, c)
                    })
            })
    })

这种方式使得最新的行位于 then 链的最内层,因此它们会移动作用域变量的正确值。

当您使用 if 条件时,问题就会出现。在这里,由于有不同分支,then 是外部应用的。

对于第一个例子,翻译后的代码看起来像(再次简化)

    let mut result = None;

    (if some_condition {
        wrap_future(self.delegate_actor.send(message))
            .then(move |__res, __self, __ctx| {
                let returned_value = __res;
                result = returned_value.ok(); // updates the local copy of result, useless
            } 
    } else {
        wrap_future(fut::ready(())) // both if branches need to return a future. 
    }).then(move |__res, __self, __ctx| {
        println!("{}", result);
    })

if 后的 then 被放在条件链外部,因此它会捕获原始变量值。因此,从打印的角度来看,值保持不变。

为了解决这个问题,您应该确保您的条件始终返回您需要更新的值。

在上面的代码中,您应该这样做


let mut result = None;

result = if some_condition {
    let returned_value = self.delegate_actor.send(message).await;
    returned_value.ok()
}

println!("{}", result);

如果您希望更新多个变量,可以将它们打包成一个元组


let mut a = 0, mut b = 0, mut c = 0;

(a, b, c) = if some_condition {
    a = call_a().await;
    b = call_b(b).await;
    c = call_c(c).await;
    (a, b, c)
} else {
    (a, b, c) // return the defaults. It is mandatory to have an else
}

显式设置 if 表达式的返回类型的需求

这段代码无法编译

let result = if some_condition {
    let a = call_a().await
    a.ok()
} else {
    None
}

因为翻译代码不够智能,无法推断出 a.ok() 的返回类型

因此,您应该像这样对编译器进行类型检查

let result: Option<CallAResultType> = if some_condition {
    let a = call_a().await // image return type to be Result<CallAResultType, Err>
    a.ok()
} else {
    None
}

if 表达式中的早期返回

这段代码不会做您期望的事情


if some_early_exit_condition {
    call_a().await;
    return;
}

call_b(a).await;
...

因为 then 链在包含 if 的闭包外部,它不会避免执行 await 后的代码。

相反,您应该写一个包含其余内容的 else 块

if some_early_exit_condition {
    call_a().await;
} else {
    call_b(a).await;
    ... // rest of the code
}

之前的结果变量声明

这段代码无法编译,因为 Cannot assign to `a` as it is not declared mutable

let a;

if condition {
    a = call_a().await;
}

由于您无法在 then 块之外使用 for,只需将其声明为局部变量。如果您想“返回” await 调用的结果,请参阅 if 表达式内的变量修改

匹配表达式

await 语句目前不支持在 match 表达式中使用。用链式 if let 表达式代替

match action {
    Move(x, y) => call_move_async(x, y).await,
    Talk(msg) => say_async(msg).await,
    _ => println!("unknown action");
}

变为

if let Move(x, y) = action {
    call_move_async(x, y).await
} else if let Talk(msg) = action {
    say_async(msg).await
} else {
    println!("unknown action");
}

循环

在包含 await 的循环块的情况下

  • for 循环是目前唯一支持的。
    • while 循环的条件仅依赖于 actor 状态时,可以通过 take_while 一个无限流(wrap_stream(futures::stream::iter(iter::repeat())))
    • 根据作用域累加器(即 let mut i = 0; while i < 3 { i += 1})的while循环需要创建一个TryActorStream,特别是TryFold,以便将当前累加器值传递给条件表达式闭包。
  • 用于for表达式的迭代器将被移动,所以如果你想保留可迭代对象的引用(例如,当遍历actor的状态字段时),可能需要使用.clone()
  • breakcontinue不被支持。continue可以通过将其替换为早期return来轻松实现。而break则需要创建一个TryActorStream,特别是TryFold,与while的情况相同。
  • for表达式本身不能有await子句。首先将其提取到一个变量中。
  • 目前,您不能迭代流,尽管通过一些泛型魔法,我们可能可以避免期望一个IntoIterator,并在for表达式中直接接受一个Stream。但无论如何,在消息处理程序中这样做可能是个坏主意。您应该使用Actor::add_stream

在循环之后使用内部变量

与条件语句一样,变量被移动到for块内部。如果你需要在循环之后继续使用更新后的值,我们支持循环的赋值语法,如下所示

let mut i;
i = for other_actor in self.other_actors {
    i += 1;
    other_actor.send(i).await;
}
println!("{}", i)

这是有效的Rust语法,但普通的for循环总是返回unit。在这种情况下,宏变得“智能”,并且由于内部for实现实际上是一个Fold,返回值将是累加器的值;它将被故意填充为你放在赋值中使用的任何变量名。

如果您需要多个,则应该

let mut i, j;
(i, j) = for other_actor in self.other_actors {
    i += 1;
    j = i + 1;
    other_actor.send(i).await;
}
println!("{}, {}", i, j)

依赖关系

~330–780KB
~18K SLoC