2个不稳定版本
0.3.0 | 2022年6月7日 |
---|---|
0.1.0 | 2022年3月28日 |
#922 in 异步
在 4 个crate中(直接使用3个)中使用
165KB
3.5K SLoC
Quickwit actors
又一个Rust的actor crate。这个crate专门用于满足quickwit的需求。API可能在将来发生变化。
目标
- 生产易于推理的代码:Quickwit的索引管道本身就非常复杂。
- 易于测试actors。
- 对运行时的控制。
非目标
- 高消息吞吐量。Quickwit中交换的大部分消息都是“大”的。例如,它可以存储包含数GB数据的临时目录。处理最多消息的actors是索引器和源。一条消息通常包含一批记录。
特性
- Actor消息框
- 框架默认用于运行异步actors,但它也可以运行长时间阻塞的actors。在两种情况下,消息处理方法都是异步的,但
Actor::runner
方法使得在专用线程上运行带有阻塞代码的actor成为可能。 - 一个调度actor,使其能够模拟时间。
示例
use std::time::Duration;
use async_trait::async_trait;
use quickwit_actors::{Handler, Actor, Universe, ActorContext, ActorExitStatus, Mailbox};
#[derive(Default)]
struct PingReceiver;
impl Actor for PingReceiver {
type ObservableState = ();
fn observable_state(&self) -> Self::ObservableState {}
}
#[async_trait]
impl Handler<Ping> for PingReceiver {
type Reply = String;
async fn handle(
&mut self,
_msg: Ping,
_ctx: &ActorContext<Self>,
) -> Result<String, ActorExitStatus> {
Ok("Pong".to_string())
}
}
struct PingSender {
peer: Mailbox<PingReceiver>,
}
#[derive(Debug)]
struct Loop;
#[derive(Debug)]
struct Ping;
#[async_trait]
impl Actor for PingSender {
type ObservableState = ();
fn observable_state(&self) -> Self::ObservableState {}
async fn initialize(&mut self, ctx: &ActorContext<Self>) -> Result<(),ActorExitStatus> {
ctx.send_self_message(Loop).await?;
Ok(())
}
}
#[async_trait]
impl Handler<Loop> for PingSender {
type Reply = ();
async fn handle(
&mut self,
_: Loop,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
let reply_msg = ctx.ask(&self.peer, Ping).await.unwrap();
println!("{reply_msg}");
ctx.schedule_self_msg(Duration::from_secs(1), Loop).await;
Ok(())
}
}
#[tokio::main]
async fn main() {
let universe = Universe::new();
let (recv_mailbox, _) =
universe.spawn_actor(PingReceiver::default()).spawn();
let ping_sender = PingSender { peer: recv_mailbox };
let (_, ping_sender_handler) = universe.spawn_actor(ping_sender).spawn();
ping_sender_handler.join().await;
}
依赖项
~14–27MB
~410K SLoC