#actor-framework #quickwit #message #dedicated #thread #blocking #handler

witty-actors

quickwit-actors 的分支,quickwit 中使用的 Actor 框架

1 个不稳定版本

0.6.0 2023 年 5 月 30 日

#761 in 并发

MIT 许可证

190KB
4K SLoC

https://github.com/quickwit-oss/quickwit/blob/83041f78a21072df091f6d945cc1b5859cf72326/quickwit/quickwit-common/ 分支。

可能在未来会得到维护。

文档: https://docs.rs/witty-actors

Quickwit actors

另一个 Rust actor 库。这个库专门为 quickwit 的需求而存在。API 在未来可能会改变。

目标

  • 生成易于推理的代码:Quickwit 的索引管道本身就非常复杂。
  • 易于测试 actors。
  • 对运行时的控制。

非目标

  • 高消息吞吐量。Quickwit 中交换的大部分消息都是“大”消息。例如,它可以包含具有千兆字节数据的临时目录。处理最多消息的 actor 是索引器和源。消息通常包含一批记录。

特性

  • Actor 消息框
  • 该框架旨在默认运行异步 actors,但它也可以运行长时间阻塞的 actors。在两种情况下,消息处理方法在技术上都是异步的,但 Actor::runner 方法使得可以在专用线程上运行具有阻塞代码的 actor。
  • 一个调度 actor,可以模拟时间。

示例

use std::time::Duration;
use async_trait::async_trait;
use quickwit_actors::{Handler, Actor, Universe, ActorContext, ActorExitStatus, Mailbox};

#[derive(Default)]
struct PingReceiver;

impl Actor for PingReceiver {
    type ObservableState = ();
    fn observable_state(&self) -> Self::ObservableState {}
}

#[async_trait]
impl Handler<Ping> for PingReceiver {
    type Reply = String;
    async fn handle(
        &mut self,
        _msg: Ping,
        _ctx: &ActorContext<Self>,
    ) -> Result<String, ActorExitStatus> {
        Ok("Pong".to_string())
    }
}

struct PingSender {
    peer: Mailbox<PingReceiver>,
}

#[derive(Debug)]
struct Loop;

#[derive(Debug)]
struct Ping;

#[async_trait]
impl Actor for PingSender {
    type ObservableState = ();
    fn observable_state(&self) -> Self::ObservableState {}

    async fn initialize(&mut self, ctx: &ActorContext<Self>) -> Result<(),ActorExitStatus> {
        ctx.send_self_message(Loop).await?;
        Ok(())
    }
}

#[async_trait]
impl Handler<Loop> for PingSender {
    type Reply = ();

    async fn handle(
        &mut self,
        _: Loop,
        ctx: &ActorContext<Self>,
    ) -> Result<(), ActorExitStatus> {
        let reply_msg = ctx.ask(&self.peer, Ping).await.unwrap();
        println!("{reply_msg}");
        ctx.schedule_self_msg(Duration::from_secs(1), Loop).await;
        Ok(())
    }
}

#[tokio::main]
async fn main() {
    let universe = Universe::new();

    let (recv_mailbox, _) =
        universe.spawn_actor(PingReceiver::default()).spawn();

    let ping_sender = PingSender { peer: recv_mailbox };
    let (_, ping_sender_handler) = universe.spawn_actor(ping_sender).spawn();

    ping_sender_handler.join().await;
}

依赖关系

~7–14MB
~170K SLoC