8 个版本

0.2.7 2020 年 10 月 14 日
0.2.6 2020 年 6 月 10 日
0.2.5 2020 年 3 月 29 日

#891 in 异步

36 每月下载次数
4 个 Crates (2 个直接) 中使用

MIT 许可协议

9KB
105 代码行

受 Akka/Pykka 启发的 Rust 行为者库

使用 'fibers' Crates 构建。

示例

行为者

我们首先像其他任何结构一样声明我们的 MyActor 结构。我们获取一个 'handle' 以便我们可以生成子行为者。(注意,id 完全不必要,ActorRef 将有自己的唯一标识符。)

    struct MyActor<H>
        where H: Send + Spawn + Clone + 'static
    {
        handle: H,
        id: String,
    }

    impl<H> MyActor<H>
        where H: Send + Spawn + Clone + 'static
    {
        pub fn new(h: H) -> MyActor<H> {
            MyActor {
                handle: h,
                id: Uuid::new_v4().to_string(),
            }
        }
    }

然后我们为 MyActor 实现 Actor。我们获取一个消息 (Box<Any + Send>) 并将其下转换为期望的类型。

然后我们创建一个子行为者并向它发送自己的消息(我们的消息 + 1)。

这将无限循环(并发,每次 on_message 调用后让出),永恒地计数。

    impl<H> Actor for MyActor<H>
        where H: Send + Spawn + Clone + 'static
    {
        fn on_message(&mut self, msg: Box<Any + Send>) {
            if let Some(number) = msg.downcast_ref::<u64>() {
                if number % 1000 == 0 {
                    println!("{} got {}", self.id, number);
                }

                //                if *number == 0 {panic!("zero!")};
                let new_actor = Box::new(MyActor::new(self.handle.clone())) as Box<Actor>;
                let actor_ref = actor_of(self.handle.clone(), new_actor);
                actor_ref.sender.send(Box::new(number + 1));
                drop(actor_ref);
            } else {
                panic!("downcast error");
            }
        }
    }

要实际执行行为者,我们需要创建一个执行上下文,并使用 actor_of 函数。

    let system = ThreadPoolExecutor::with_thread_count(2).unwrap();

    let actor = MyActor::new(system.handle());
    let mut actor_ref = actor_of(system.handle(), Box::new(actor));
    actor_ref.send(Box::new(0 as u64));
    drop(actor_ref);

    let _ = system.run();

监管者

使用上面的 MyActor,我们创建一个 ChildSpec,它将返回一个 MyActor。监管者使用此函数生成新行为者并替换已死亡的行为者。

然后我们创建预定义的监管者,并将其传递给 actor_of。

然后我们可以使用 SuperVisorMessage 结构向监管者发送消息。我们提供给子行为者赋予的名称,监管者使用它内部路由消息。

(目前监管者不会捕获 panic - 这将改变)

        let system = ThreadPoolExecutor::with_thread_count(2).unwrap();
        let handle = system.handle();

        let child_spec = ChildSpec::new("worker child".to_owned(),
                                        move |handle| Box::new(MyActor::new(handle)) as Box<Actor>,
                                        Restart::Temporary,
                                        Shutdown::Eventually,
                                        ActorKind::Worker);

        let mut supervisor_ref =
            actor_of(handle.clone(),
                     Box::new(Supervisor::new(handle.clone(), vec![child_spec])) as Box<Actor>);

        supervisor_ref.send(Box::new(SupervisorMessage {
            id: "worker child".to_owned(),
            msg: Box::new(1000000 as u64),
        }));


        drop(supervisor_ref);

        let _ = system.run();

依赖项

~4MB
~68K SLoC