#actor #tokio #actor-system #actor-model #message-bus #event-bus

tiny-tokio-actor

基于tokio的一个简单小巧的actor库

14个版本

0.3.5 2023年11月7日
0.3.3 2023年1月15日
0.3.1 2021年12月16日
0.3.0 2021年10月27日
0.2.2 2021年6月7日

#135异步

Download history 77/week @ 2024-04-22 27/week @ 2024-04-29 21/week @ 2024-05-06 41/week @ 2024-05-13 45/week @ 2024-05-20 33/week @ 2024-05-27 60/week @ 2024-06-03 16/week @ 2024-06-10 9/week @ 2024-06-17 56/week @ 2024-06-24 28/week @ 2024-07-01 117/week @ 2024-07-08 30/week @ 2024-07-15 14/week @ 2024-07-22 26/week @ 2024-07-29 37/week @ 2024-08-05

111 每月下载量

Apache-2.0

61KB
1K SLoC

微小Tokio Actor

crates.io build

另一个actor库!为什么还需要另一个?我真的很喜欢actor模型进行开发,并希望有一个简单易用的库可以在tokio之上使用。

[dependencies]
tiny-tokio-actor = "0.3"

首先导入必要的crate

use tiny_tokio_actor::*;

然后定义在actor系统的消息总线中发送的消息

// Define the system event bus message
#[derive(Clone, Debug)]
struct TestEvent(String);

impl SystemEvent for TestEvent {}

然后定义actor结构。actor结构必须是Send + Sync,但不需要是Clone。在实现Actor trait时,你可以选择覆盖默认的timeout()supervision_strategy()pre_start()pre_restart()post_stop()方法

struct TestActor {
    counter: usize
}

#[async_trait]
impl Actor<TestEvent> for TestActor {

    // This actor will time out after 5 seconds of not receiving a message
    fn timeout() -> Option<Duration> {
        Some(Duration::from_secs(5))
    }

    // This actor will immediately retry 5 times if it fails to start
    fn supervision_strategy() -> SupervisionStrategy {
        let strategy = supervision::NoIntervalStrategy::new(5);
        SupervisionStrategy::Retry(Box::new(strategy))
    }

    async fn pre_start(&mut self, ctx: &mut ActorContext<TestEvent>) -> Result<(), ActorError> {
        ctx.system.publish(TestEvent(format!("Actor '{}' started.", ctx.path)));
        Ok(())
    }

    async fn pre_restart(&mut self, ctx: &mut ActorContext<TestEvent>, error: Option<&ActorError>) -> Result<(), ActorError> {
        log::error!("Actor '{}' is restarting due to {:#?}", ctx.path, error);
        self.pre_start(ctx).await
    }

    async fn post_stop(&mut self, ctx: &mut ActorContext<TestEvent>) {
        ctx.system.publish(TestEvent(format!("Actor '{}' stopped.", ctx.path)));
    }
}

接下来定义actor要处理的消息。注意,你还需要定义期望从actor收到的响应。如果你不希望收到响应,可以使用()作为响应类型。

#[derive(Clone, Debug)]
struct TestMessage(String);

impl Message for TestMessage {
    type Response = String;
}

现在实现当收到消息时actor想要的行为

#[async_trait]
impl Handler<TestEvent, TestMessage> for TestActor {
    async fn handle(&mut self, msg: TestMessage, ctx: &mut ActorContext<TestEvent>) -> String {
        ctx.system.publish(TestEvent(format!("Message {} received by '{}'", &msg, ctx.path)));
        self.counter += 1;
        "Ping!".to_string()
    }
}

你可以定义更多actor要处理的消息和行为。例如,让我们定义一个actor将处理的OtherMessage

#[derive(Clone, Debug)]
struct OtherMessage(usize);

impl Message for OtherMessage {
    type Response = usize;
}

// What the actor should do with the other message
#[async_trait]
impl Handler<TestEvent, OtherMessage> for TestActor {
    async fn handle(&mut self, msg: OtherMessage, ctx: &mut ActorContext<TestEvent>) -> usize {
        ctx.system.publish(TestEvent(format!("Message {} received by '{}'", &msg, ctx.path)));
        self.counter += msg.0;
        self.counter
    }
}

现在我们可以测试actor,并发送两种消息类型给它

#[tokio::test]
async fn multi_message() {
    if std::env::var("RUST_LOG").is_err() {
        std::env::set_var("RUST_LOG", "trace");
    }
    let _ = env_logger::builder().is_test(true).try_init();

    let actor = TestActor { counter: 0 };

    let bus = EventBus::<TestEvent>::new(1000);
    let system = ActorSystem::new("test", bus);
    let actor_ref = system.create_actor("test-actor", actor).await.unwrap();

    let mut events = system.events();
    tokio::spawn(async move {
        loop {
            match events.recv().await {
                Ok(event) => println!("Received event! {:?}", event),
                Err(err) => println!("Error receivng event!!! {:?}", err)
            }
        }
    });

    tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;

    let msg_a = TestMessage("hello world!".to_string());
    let response_a = actor_ref.ask(msg_a).await.unwrap();
    assert_eq!(response_a, "Ping!".to_string());

    let msg_b = OtherMessage(10);
    let response_b = actor_ref.ask(msg_b).await.unwrap();
    assert_eq!(response_b, 11);
}

因此,这个库基本上提供了以下功能:

  • 一个带有消息总线的actor系统
  • 一个具有一个或多个消息处理器的强类型actor
  • 通过ActorPaths和ActorRefs引用的actor
  • 每个actor类型的监督策略
  • 每个actor类型的超时

请参阅文档示例集成测试以获取更多详细信息。

图书馆仍在孵化中!还有很多工作要做,API仍然不稳定!到目前为止的待办事项

  • 监督层次结构
  • 创建宏以简化演员的定义

值得检查的项目/博客文章

依赖项

~2.8–4.5MB
~78K SLoC