5 个版本

0.2.2 2020年1月12日
0.2.1 2019年12月24日
0.2.0 2019年12月24日
0.1.1 2019年12月16日
0.1.0 2019年12月15日

#1423 in 异步

Apache-2.0

29KB
797

Coerce-rs coerce-rs

Coerce-rs 是一个为 Rust 提供的异步 (async/await) Actor 运行时。它允许进行简单而强大的基于 actor 的多线程应用程序开发。

async/await Actors

actor 只是一个计算单元的另一个名称。它可以有可变状态,它可以接收消息并执行操作。但有一个缺点...它一次只能做一件事。这很有用,因为它可以减少线程同步的需求,通常通过锁定(使用 MutexRwLock 等)来实现。

在 Coerce 中如何实现这一点?

Coerce 使用 Tokio 的 MPSC 通道(tokio::sync::mpsc::channel),每个创建的 actor 都会启动一个任务来监听来自 Receiver 的消息,处理并等待消息的结果。每个引用(ActorRef<A: Actor>)都持有可以克隆的 Sender<M> where A: Handler<M>

actor 可以停止,并且可以从应用程序的任何地方通过 ID(目前为 Uuid)检索 actor 引用。匿名 actor 在所有引用都被丢弃时自动丢弃(并 Stopped)。使用全局 fn new_actor 的跟踪 actor 必须停止。

示例

pub struct EchoActor {}

#[async_trait]
impl Actor for EchoActor {}

pub struct EchoMessage(String);

impl Message for EchoMessage {
    type Result = String;
}

#[async_trait]
impl Handler<EchoMessage> for EchoActor {
    async fn handle(
        &mut self,
        message: EchoMessage,
        _ctx: &mut ActorHandlerContext,
    ) -> String {
        message.0.clone()
    }
}

pub async fn run() {
    let mut actor = new_actor(EchoActor {}).await.unwrap();

    let hello_world = "hello, world".to_string();
    let result = actor.send(EchoMessage(hello_world.clone())).await;
    
    assert_eq!(result, Ok(hello_world));
}

计时器示例

pub struct EchoActor {}

#[async_trait]
impl Actor for EchoActor {}

pub struct EchoMessage(String);

impl Message for EchoMessage {
    type Result = String;
}

pub struct PrintTimer(String);

impl TimerTick for PrintTimer {}

#[async_trait]
impl Handler<PrintTimer> for EchoActor {
    async fn handle(&mut self, msg: PrintTimer, _ctx: &mut ActorHandlerContext) {
        println!("{}", msg.0);
    }
}

pub async fn run() {
    let mut actor = new_actor(EchoActor {}).await.unwrap();
    let hello_world = "hello world!".to_string();

    // print "hello world!" every 5 seconds
    let timer = Timer::start(actor.clone(), Duration::from_secs(5), TimerTick(hello_world));
    
    // timer is stopped when handle is out of scope or can be stopped manually by calling `.stop()`
    delay_for(Duration::from_secs(20)).await;
    timer.stop();
}

依赖项

~5MB
~88K SLoC