#actor #tokio #macro #async #msg #enums #process

tokio-actor

基于宏的异步actor系统

2个版本

0.1.1 2022年5月5日
0.1.0 2022年5月4日

#836过程宏

MIT 许可证

19KB
221

项目Tokio Actor

对于Rust来说,有很多actor实现,例如:actix。许多这些实现定义了traits,并需要用户逐一实现这些traits

我认为,在某种程度上,这些工作是重复的,真的很无聊。幸运的是,macro来拯救我们!

如果我们能做一些非常简单且代码量很少的事情会怎样?

use tokio;
use tokio_actor::actors;

#[actors]
mod my_actors {
    pub enum ThingMsg {
        MsgOne { value: i32, resp: i32 },
        MsgTwo { value: f64, resp: f64 },
    }

    pub struct Thing {}

    impl Thing {
        async fn process(&mut self, msg: ThingMsg) {
            match msg {
                ThingMsg::MsgOne { resp, value } => {
                    println!("handling msg1");
                    if let Some(v) = resp {
                        let _r = v.send(value + 100);
                    }
                }
                ThingMsg::MsgTwo { resp, value } => {
                    println!("handling msg2");
                    if let Some(v) = resp {
                        let _r = v.send(value * 10.0);
                    }
                }
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let mut a = my_actors::ActorThing::new().await;
    {
        let r = a
            .msg_one(my_actors::ThingMsg::MsgOne {
                value: 1,
                resp: None,
            })
            .await
            .unwrap();
        println!("{}", r);
    }
    {
        let r = a
            .msg_two(my_actors::ThingMsg::MsgTwo {
                value: 3.1415926,
                resp: None,
            })
            .await
            .unwrap();
        println!("{}", r);
    }
}

在上面的示例中,大部分脏活/魔法工作都是由macro完成的:actors

幕后的情况是什么?

  1. 它分析了my_actos模块,并智能地检测到结构体Thing是一个合适的actor processor,因为它有一个名为processimpl,并且它还在同一模块中定义了一个名为ThingMsgenum
  2. 它生成了一组辅助方法,以enum ThingMsg的变体名称命名。当然是在snake_case中。
  3. 用户可以按照以下名称约定调用这些方法:一个MsgOne枚举变体意味着存在msg_onemsg_one_no_wait方法,可以在ActorThing结构体上调用。
  4. ActorThing 将执行一个 tokio::spawn 操作,监听 tokio::sync::mpsc::UnboundedReceiver 以接收 ThingMsgprocess 它。它将结果写入 tokio::sync::oneshot 通道。正如你所猜测的,msg_one_no_wait 简单地不关心等待结果返回。

让我们看看这个例子中 mod my_actors 生成的 tokenstream

mod my_actors {
    pub enum ThingMsg {
        MsgOne {
            value: i32,
            resp: Option<tokio::sync::oneshot::Sender<i32>>,
        },
        MsgTwo {
            value: f64,
            resp: Option<tokio::sync::oneshot::Sender<f64>>,
        },
    }
    pub struct Thing {
        receiver: tokio::sync::mpsc::UnboundedReceiver<ThingMsg>,
    }
    impl Thing {
        async fn process(&mut self, msg: ThingMsg) {
            match msg {
                ThingMsg::MsgOne { resp, value } => {
                    println!("handling msg1");
                    if let Some(v) = resp {
                        let _r = v.send(value + 100);
                    }
                }
                ThingMsg::MsgTwo { resp, value } => {
                    println!("handling msg2");
                    if let Some(v) = resp {
                        let _r = v.send(value * 10.0);
                    }
                }
            }
        }
    }
    pub struct ActorThing {
        sender: tokio::sync::mpsc::UnboundedSender<ThingMsg>,
    }
    impl ActorThing {
        pub async fn new() -> Self {
            let (s, r) = tokio::sync::mpsc::unbounded_channel();
            let mut a = Thing::new(r);
            tokio::spawn(async move {
                a.run().await;
            });
            return Self { sender: s };
        }
    }
    impl Thing {
        fn new(r: tokio::sync::mpsc::UnboundedReceiver<ThingMsg>) -> Self {
            return Self { receiver: r };
        }
        async fn run(&mut self) {
            while let Some(msg) = self.receiver.recv().await {
                self.process(msg).await;
            }
        }
    }
    impl ActorThing {
        pub async fn msg_one(&mut self, mut msg: ThingMsg) -> Result<i32, &'static str> {
            match msg {
                ThingMsg::MsgOne { ref mut resp, .. } => {
                    let (mut s, mut r) = tokio::sync::oneshot::channel();
                    *resp = Some(s);
                    self.sender.send(msg).map_err(|_e| {
                        return "send failed";
                    })?;
                    match r.await {
                        Ok(v) => {
                            return Ok(v);
                        }
                        _ => {
                            return Err("mailbox closed");
                        }
                    };
                }
                _ => {
                    return Err("invalid msg type");
                }
            };
        }
    }
    impl ActorThing {
        pub async fn msg_one_no_wait(&mut self, mut msg: ThingMsg) -> Result<(), &'static str> {
            match msg {
                ThingMsg::MsgOne { .. } => {
                    self.sender.send(msg).map_err(|_e| {
                        return "send failed";
                    })?;
                    return Ok(());
                }
                _ => {
                    return Err("invalid msg type");
                }
            };
        }
    }
    impl ActorThing {
        pub async fn msg_two(&mut self, mut msg: ThingMsg) -> Result<f64, &'static str> {
            match msg {
                ThingMsg::MsgTwo { ref mut resp, .. } => {
                    let (mut s, mut r) = tokio::sync::oneshot::channel();
                    *resp = Some(s);
                    self.sender.send(msg).map_err(|_e| {
                        return "send failed";
                    })?;
                    match r.await {
                        Ok(v) => {
                            return Ok(v);
                        }
                        _ => {
                            return Err("mailbox closed");
                        }
                    };
                }
                _ => {
                    return Err("invalid msg type");
                }
            };
        }
    }
    impl ActorThing {
        pub async fn msg_two_no_wait(&mut self, mut msg: ThingMsg) -> Result<(), &'static str> {
            match msg {
                ThingMsg::MsgTwo { .. } => {
                    self.sender.send(msg).map_err(|_e| {
                        return "send failed";
                    })?;
                    return Ok(());
                }
                _ => {
                    return Err("invalid msg type");
                }
            };
        }
    }
}

常见问题解答

  • 接下来是什么?
    • 将允许多个发送者和多个演员处理它们
  • 我们必须为演员定义一个 mod 吗?
    • 是的,目前和可预见的未来都是如此。因为我们需要分析 structenumimpl 一起,在 Rust 中组织它们最好的方式是 mod
  • 那么,一个 Actor 要由宏生成,需要满足什么要求?
    • 你需要一个名为 XXX 的 struct 和一个名为 XXXMsg 的 enum
    • enum XXXMsg 必须至少有 1 个 variant,该 variant 需要具有像示例中那样命名的字段,并且我们需要一个特定的 named field 被称为 resp。这个 named fieldtype 决定了 msg function 返回类型。
    • XXX struct 需要实现一个 process 方法,该方法以 msg:XXXMsg 作为输入参数。这里就是实际的消息处理发生的地方。

依赖关系

~3.5–4.5MB
~84K SLoC