1 个不稳定版本
0.6.0 | 2023 年 5 月 30 日 |
---|
#761 in 并发
190KB
4K SLoC
可能在未来会得到维护。
文档: https://docs.rs/witty-actors
Quickwit actors
另一个 Rust actor 库。这个库专门为 quickwit 的需求而存在。API 在未来可能会改变。
目标
- 生成易于推理的代码:Quickwit 的索引管道本身就非常复杂。
- 易于测试 actors。
- 对运行时的控制。
非目标
- 高消息吞吐量。Quickwit 中交换的大部分消息都是“大”消息。例如,它可以包含具有千兆字节数据的临时目录。处理最多消息的 actor 是索引器和源。消息通常包含一批记录。
特性
- Actor 消息框
- 该框架旨在默认运行异步 actors,但它也可以运行长时间阻塞的 actors。在两种情况下,消息处理方法在技术上都是异步的,但
Actor::runner
方法使得可以在专用线程上运行具有阻塞代码的 actor。 - 一个调度 actor,可以模拟时间。
示例
use std::time::Duration;
use async_trait::async_trait;
use quickwit_actors::{Handler, Actor, Universe, ActorContext, ActorExitStatus, Mailbox};
#[derive(Default)]
struct PingReceiver;
impl Actor for PingReceiver {
type ObservableState = ();
fn observable_state(&self) -> Self::ObservableState {}
}
#[async_trait]
impl Handler<Ping> for PingReceiver {
type Reply = String;
async fn handle(
&mut self,
_msg: Ping,
_ctx: &ActorContext<Self>,
) -> Result<String, ActorExitStatus> {
Ok("Pong".to_string())
}
}
struct PingSender {
peer: Mailbox<PingReceiver>,
}
#[derive(Debug)]
struct Loop;
#[derive(Debug)]
struct Ping;
#[async_trait]
impl Actor for PingSender {
type ObservableState = ();
fn observable_state(&self) -> Self::ObservableState {}
async fn initialize(&mut self, ctx: &ActorContext<Self>) -> Result<(),ActorExitStatus> {
ctx.send_self_message(Loop).await?;
Ok(())
}
}
#[async_trait]
impl Handler<Loop> for PingSender {
type Reply = ();
async fn handle(
&mut self,
_: Loop,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
let reply_msg = ctx.ask(&self.peer, Ping).await.unwrap();
println!("{reply_msg}");
ctx.schedule_self_msg(Duration::from_secs(1), Loop).await;
Ok(())
}
}
#[tokio::main]
async fn main() {
let universe = Universe::new();
let (recv_mailbox, _) =
universe.spawn_actor(PingReceiver::default()).spawn();
let ping_sender = PingSender { peer: recv_mailbox };
let (_, ping_sender_handler) = universe.spawn_actor(ping_sender).spawn();
ping_sender_handler.join().await;
}
依赖关系
~7–14MB
~170K SLoC