2个不稳定版本

0.3.0 2022年6月7日
0.1.0 2022年3月28日

#922 in 异步


4 个crate中(直接使用3个)中使用

AGPL-3.0-or-later

165KB
3.5K SLoC

Quickwit actors

又一个Rust的actor crate。这个crate专门用于满足quickwit的需求。API可能在将来发生变化。

目标

  • 生产易于推理的代码:Quickwit的索引管道本身就非常复杂。
  • 易于测试actors。
  • 对运行时的控制。

非目标

  • 高消息吞吐量。Quickwit中交换的大部分消息都是“大”的。例如,它可以存储包含数GB数据的临时目录。处理最多消息的actors是索引器和源。一条消息通常包含一批记录。

特性

  • Actor消息框
  • 框架默认用于运行异步actors,但它也可以运行长时间阻塞的actors。在两种情况下,消息处理方法都是异步的,但Actor::runner方法使得在专用线程上运行带有阻塞代码的actor成为可能。
  • 一个调度actor,使其能够模拟时间。

示例

use std::time::Duration;
use async_trait::async_trait;
use quickwit_actors::{Handler, Actor, Universe, ActorContext, ActorExitStatus, Mailbox};

#[derive(Default)]
struct PingReceiver;

impl Actor for PingReceiver {
    type ObservableState = ();
    fn observable_state(&self) -> Self::ObservableState {}
}

#[async_trait]
impl Handler<Ping> for PingReceiver {
    type Reply = String;
    async fn handle(
        &mut self,
        _msg: Ping,
        _ctx: &ActorContext<Self>,
    ) -> Result<String, ActorExitStatus> {
        Ok("Pong".to_string())
    }
}

struct PingSender {
    peer: Mailbox<PingReceiver>,
}

#[derive(Debug)]
struct Loop;

#[derive(Debug)]
struct Ping;

#[async_trait]
impl Actor for PingSender {
    type ObservableState = ();
    fn observable_state(&self) -> Self::ObservableState {}

    async fn initialize(&mut self, ctx: &ActorContext<Self>) -> Result<(),ActorExitStatus> {
        ctx.send_self_message(Loop).await?;
        Ok(())
    }
}

#[async_trait]
impl Handler<Loop> for PingSender {
    type Reply = ();

    async fn handle(
        &mut self,
        _: Loop,
        ctx: &ActorContext<Self>,
    ) -> Result<(), ActorExitStatus> {
        let reply_msg = ctx.ask(&self.peer, Ping).await.unwrap();
        println!("{reply_msg}");
        ctx.schedule_self_msg(Duration::from_secs(1), Loop).await;
        Ok(())
    }
}

#[tokio::main]
async fn main() {
    let universe = Universe::new();

    let (recv_mailbox, _) =
        universe.spawn_actor(PingReceiver::default()).spawn();

    let ping_sender = PingSender { peer: recv_mailbox };
    let (_, ping_sender_handler) = universe.spawn_actor(ping_sender).spawn();

    ping_sender_handler.join().await;
}

依赖项

~14–27MB
~410K SLoC