14个版本
0.3.5 | 2023年11月7日 |
---|---|
0.3.3 | 2023年1月15日 |
0.3.1 | 2021年12月16日 |
0.3.0 | 2021年10月27日 |
0.2.2 | 2021年6月7日 |
#135 在 异步
111 每月下载量
61KB
1K SLoC
微小Tokio Actor
另一个actor库!为什么还需要另一个?我真的很喜欢actor模型进行开发,并希望有一个简单易用的库可以在tokio之上使用。
[dependencies]
tiny-tokio-actor = "0.3"
首先导入必要的crate
use tiny_tokio_actor::*;
然后定义在actor系统的消息总线中发送的消息
// Define the system event bus message
#[derive(Clone, Debug)]
struct TestEvent(String);
impl SystemEvent for TestEvent {}
然后定义actor结构。actor结构必须是Send + Sync,但不需要是Clone。在实现Actor
trait时,你可以选择覆盖默认的timeout()
、supervision_strategy()
、pre_start()
、pre_restart()
和post_stop()
方法
struct TestActor {
counter: usize
}
#[async_trait]
impl Actor<TestEvent> for TestActor {
// This actor will time out after 5 seconds of not receiving a message
fn timeout() -> Option<Duration> {
Some(Duration::from_secs(5))
}
// This actor will immediately retry 5 times if it fails to start
fn supervision_strategy() -> SupervisionStrategy {
let strategy = supervision::NoIntervalStrategy::new(5);
SupervisionStrategy::Retry(Box::new(strategy))
}
async fn pre_start(&mut self, ctx: &mut ActorContext<TestEvent>) -> Result<(), ActorError> {
ctx.system.publish(TestEvent(format!("Actor '{}' started.", ctx.path)));
Ok(())
}
async fn pre_restart(&mut self, ctx: &mut ActorContext<TestEvent>, error: Option<&ActorError>) -> Result<(), ActorError> {
log::error!("Actor '{}' is restarting due to {:#?}", ctx.path, error);
self.pre_start(ctx).await
}
async fn post_stop(&mut self, ctx: &mut ActorContext<TestEvent>) {
ctx.system.publish(TestEvent(format!("Actor '{}' stopped.", ctx.path)));
}
}
接下来定义actor要处理的消息。注意,你还需要定义期望从actor收到的响应。如果你不希望收到响应,可以使用()
作为响应类型。
#[derive(Clone, Debug)]
struct TestMessage(String);
impl Message for TestMessage {
type Response = String;
}
现在实现当收到消息时actor想要的行为
#[async_trait]
impl Handler<TestEvent, TestMessage> for TestActor {
async fn handle(&mut self, msg: TestMessage, ctx: &mut ActorContext<TestEvent>) -> String {
ctx.system.publish(TestEvent(format!("Message {} received by '{}'", &msg, ctx.path)));
self.counter += 1;
"Ping!".to_string()
}
}
你可以定义更多actor要处理的消息和行为。例如,让我们定义一个actor将处理的OtherMessage
。
#[derive(Clone, Debug)]
struct OtherMessage(usize);
impl Message for OtherMessage {
type Response = usize;
}
// What the actor should do with the other message
#[async_trait]
impl Handler<TestEvent, OtherMessage> for TestActor {
async fn handle(&mut self, msg: OtherMessage, ctx: &mut ActorContext<TestEvent>) -> usize {
ctx.system.publish(TestEvent(format!("Message {} received by '{}'", &msg, ctx.path)));
self.counter += msg.0;
self.counter
}
}
现在我们可以测试actor,并发送两种消息类型给它
#[tokio::test]
async fn multi_message() {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "trace");
}
let _ = env_logger::builder().is_test(true).try_init();
let actor = TestActor { counter: 0 };
let bus = EventBus::<TestEvent>::new(1000);
let system = ActorSystem::new("test", bus);
let actor_ref = system.create_actor("test-actor", actor).await.unwrap();
let mut events = system.events();
tokio::spawn(async move {
loop {
match events.recv().await {
Ok(event) => println!("Received event! {:?}", event),
Err(err) => println!("Error receivng event!!! {:?}", err)
}
}
});
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
let msg_a = TestMessage("hello world!".to_string());
let response_a = actor_ref.ask(msg_a).await.unwrap();
assert_eq!(response_a, "Ping!".to_string());
let msg_b = OtherMessage(10);
let response_b = actor_ref.ask(msg_b).await.unwrap();
assert_eq!(response_b, 11);
}
因此,这个库基本上提供了以下功能:
- 一个带有消息总线的actor系统
- 一个具有一个或多个消息处理器的强类型actor
- 通过ActorPaths和ActorRefs引用的actor
- 每个actor类型的监督策略
- 每个actor类型的超时
图书馆仍在孵化中!还有很多工作要做,API仍然不稳定!到目前为止的待办事项
- 监督层次结构
- 创建宏以简化演员的定义
值得检查的项目/博客文章
依赖项
~2.8–4.5MB
~78K SLoC