14 个稳定版本
1.2.10 | 2024 年 7 月 8 日 |
---|---|
1.2.7 | 2023 年 12 月 27 日 |
1.2.4 | 2023 年 10 月 28 日 |
1.1.2 | 2023 年 9 月 30 日 |
#230 在 异步
772 每月下载量
25KB
556 行
行为者
使用方法
- 在 Cargo.toml 中添加行为者
$ cargo add xan-actor
- 创建一个可变的行为者
use xan_actor::ActorSystem;
...
let mut actor_system = ActorSystem::new();
- 声明行为者以进行注册
use crate::xan_actor::{Actor, Handler, Message, ActorError};
#[derive(Clone, Debug)]
pub enum MyMessage {
A(String),
B(String),
Exit,
}
#[derive(thiserror::Error, Debug)]
enum MyError<T, R>
where
T: Sized + Send + Clone,
R: Sized + Send,
{
#[error("bye")]
Exit,
#[error(transparent)]
ActorError(#[from] ActorError<T, R>),
}
struct MyActor {
address: String,
}
#[async_trait::async_trait]
impl Actor<MyMessage, (), MyError<MyMessage, ()>, String> for MyActor
where
Self: Sized + 'static,
{
fn address(&self) -> &str {
&self.address
}
async fn new(params: String) -> Result<Self, MyError<MyMessage, ()>> {
Ok(Self { address: params })
}
async fn actor(
&mut self, msg: MyMessage,
) -> Result<(), MyError<MyMessage, ()>> {
match msg {
MyMessage::A(s) => {
println!("got A: {}", s);
}
MyMessage::B(s) => {
println!("got B: {}", s);
}
MyMessage::Exit => {
println!("got Exit");
return Err(MyError::Exit);
}
}
Ok(())
}
async fn pre_start(&mut self) {}
async fn pre_restart(&mut self) {}
async fn post_stop(&mut self) {}
async fn post_restart(&mut self) {}
}
- 将行为者注册到行为者系统中
let actor = MyActor::new("some-address".to_string()).await.unwrap();
actor.register(&mut actor_system).await;
- 使用它
let _ = actor_system.send(
"some-address".to_string(), /* address */
MyMessage::A("a".to_string()), /* message */
).await;
let result = actor_system.send_and_recv(
"some-address".to_string(), /* address */
MyMessage::B("b".to_string()), /* message */
).await;
// restart actor
actor_system.restart(
"some-address".to_string(), /* address */
);
// it needs some time. TODO: handle it inside of restart function
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
let result = actor_system.send_and_recv(
"some-address".to_string(), /* address */
MyMessage::A("a".to_string()), /* message */
).await;
// kill and unregister actor
actor_system.unregister(
"some-address".to_string(), /* address */
);
工作
- 如果你在某个时间或某些迭代中发送消息,可以使用工作
use xan_actor::JobSpec;
...
let job = JobSpec::new(
Some(2), /* max_iter */
Some(std::time::Duration::from_secs(3)), /* interval */
std::time::SystemTime::now(), /* start_at */
);
if let Some(recv_rx) = actor_system.run_job(
"some-address".to_string(), /* address */
true, /* whether subscribe the handler result or not(true => Some(rx)) */
job, /* job as JobSpec */
MyMessage::C("c".to_string()), /* message */
) {
while let Some(result) = recv_rx.recv().await {
println!("result returned");
}
}
依赖关系
~2.7–9MB
~76K SLoC