14 个稳定版本

1.2.10 2024 年 7 月 8 日
1.2.7 2023 年 12 月 27 日
1.2.4 2023 年 10 月 28 日
1.1.2 2023 年 9 月 30 日

#230异步

Download history 389/week @ 2024-04-25 274/week @ 2024-07-04 39/week @ 2024-07-11

772 每月下载量

MIT 许可证

25KB
556

行为者

使用方法

  1. 在 Cargo.toml 中添加行为者
$ cargo add xan-actor
  1. 创建一个可变的行为者
use xan_actor::ActorSystem;
...

let mut actor_system = ActorSystem::new();
  1. 声明行为者以进行注册
use crate::xan_actor::{Actor, Handler, Message, ActorError};

#[derive(Clone, Debug)]
pub enum MyMessage {
  A(String),
  B(String),
  Exit,
}

#[derive(thiserror::Error, Debug)]
enum MyError<T, R>
where
  T: Sized + Send + Clone,
  R: Sized + Send,
{
  #[error("bye")]
  Exit,
  #[error(transparent)]
  ActorError(#[from] ActorError<T, R>),
}

struct MyActor {
  address: String,
}

#[async_trait::async_trait]
impl Actor<MyMessage, (), MyError<MyMessage, ()>, String> for MyActor
where
    Self: Sized + 'static,
{
  fn address(&self) -> &str {
    &self.address
  }

  async fn new(params: String) -> Result<Self, MyError<MyMessage, ()>> {
    Ok(Self { address: params })
  }

  async fn actor(
    &mut self, msg: MyMessage,
  ) -> Result<(), MyError<MyMessage, ()>> {
    match msg {
      MyMessage::A(s) => {
        println!("got A: {}", s);
      }
      MyMessage::B(s) => {
        println!("got B: {}", s);
      }
      MyMessage::Exit => {
        println!("got Exit");
        return Err(MyError::Exit);
      }
    }
    Ok(())
  }

  async fn pre_start(&mut self) {}
  async fn pre_restart(&mut self) {}
  async fn post_stop(&mut self) {}
  async fn post_restart(&mut self) {}
}
  1. 将行为者注册到行为者系统中
let actor = MyActor::new("some-address".to_string()).await.unwrap();
actor.register(&mut actor_system).await;
  1. 使用它
let _ = actor_system.send(
  "some-address".to_string(), /* address */
  MyMessage::A("a".to_string()), /* message */
).await;
let result = actor_system.send_and_recv(
  "some-address".to_string(), /* address */
  MyMessage::B("b".to_string()), /* message */
).await;

// restart actor
actor_system.restart(
  "some-address".to_string(), /* address */
);
// it needs some time. TODO: handle it inside of restart function
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;

let result = actor_system.send_and_recv(
  "some-address".to_string(), /* address */
  MyMessage::A("a".to_string()), /* message */
).await;

// kill and unregister actor
actor_system.unregister(
  "some-address".to_string(), /* address */
);

工作

  • 如果你在某个时间或某些迭代中发送消息,可以使用工作
use xan_actor::JobSpec;
...

let job = JobSpec::new(
  Some(2), /* max_iter */
  Some(std::time::Duration::from_secs(3)), /* interval */
  std::time::SystemTime::now(), /* start_at */
);
if let Some(recv_rx) = actor_system.run_job(
  "some-address".to_string(), /* address */
  true, /* whether subscribe the handler result or not(true => Some(rx)) */
  job, /* job as JobSpec */
  MyMessage::C("c".to_string()), /* message */
) {
    while let Some(result) = recv_rx.recv().await {
        println!("result returned");
    }
}

依赖关系

~2.7–9MB
~76K SLoC