16个不稳定版本 (3个破坏性更新)

0.4.1 2022年3月29日
0.4.0 2022年3月28日
0.3.6 2022年3月28日
0.3.3 2022年2月18日
0.1.3 2022年2月6日

#573 in 异步

MIT 许可证

33KB
625 代码行

acu

构建异步演员的实用工具crate。

在使用此crate之前,我建议您了解Rust中的actor模式,Alice Ryhl创建了一篇非常有用的博客文章

入门

将crate添加到依赖项

使用 cargo-edit

cargo add acu

或者手动...

构建您的第一个Actor

use tokio::sync::oneshot;

#[derive(Debug)]
enum Message {
    Increment,
    Get { respond_to: oneshot::Sender<usize> },
}

impl acu::Message for Message {}

struct MyActor {
    receiver: acu::Receiver<Message, &'static str>,
    counter: usize,
}

impl MyActor {
    async fn run(&mut self) {
        while let Some(message) = self.receiver.recv().await {
            match message {
                Message::Increment => self.counter += 1,
                Message::Get { respond_to } => respond_to.send(self.counter).unwrap(),
            }
        }
    }
}

#[derive(Debug, Clone)]
struct MyActorHandle {
    sender: acu::Sender<Message, &'static str>,
}

impl MyActorHandle {
    pub fn new() -> Self {
        let (sender, receiver) = acu::channel(8, "MyActor");
        let mut actor = MyActor {
            receiver,
            counter: 0,
        };
        tokio::spawn(async move { actor.run().await });
        Self { sender }
    }

    pub async fn increment(&self) {
        self.sender.notify_with(|| Message::Increment).await
    }

    pub async fn get(&self) -> usize {
        self.sender
            .call_with(|respond_to| Message::Get { respond_to })
            .await
    }
}

#[tokio::main]
async fn main() {
    let handle = MyActorHandle::new();
    println!("initial counter: {}", handle.get().await);
    for _ in 0..100 {
        handle.increment().await;
    }
    println!("counter after 100 increments: {}", handle.get().await);
}

或者如果您想使用日志功能,您需要初始化 log,例如使用 simple-log crate

// at the top of the main function
simple_log::quick!("debug");

然后actor上的每个调用/通知都将被记录。

主从模式

您需要启用crate的 master-slave 功能。

您需要做出的决定是,Actor消息是否实现了 Clone trait,如果是的话,您可以使用 BroadcasterMasterHandle,它允许您直接使用actor方法;如果不是,您将只能使用 MasterHandle,您不能使用actor方法。

使用 BroadcasterMasterHandle(Message: Clone)

use acu::BroadcasterMasterHandle;
use acu::MasterExt;
use tokio::sync::broadcast;

#[derive(Debug, Clone, PartialEq, PartialOrd)]
enum Name {
    Master,
    MyActorA,
    MyActorB,
}

impl acu::MasterName for Name {
    fn master_name() -> Self {
        Self::Master
    }
}

impl AsRef<str> for Name {
    fn as_ref(&self) -> &str {
        match self {
            Name::Master => "master",
            Name::MyActorA => "my-actor-a",
            Name::MyActorB => "my-actor-b",
        }
    }
}

impl std::fmt::Display for Name {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        let s: &str = self.as_ref();
        f.write_str(s)
    }
}

#[derive(Debug, Clone)]
enum Message {
    Increment,
    Fetch {
        respond_to: broadcast::Sender<usize>,
    },
}

impl acu::Message for Message {}

struct MyActor {
    receiver: acu::Receiver<Message, Name>,
    counter: usize,
}

impl MyActor {
    async fn run(&mut self) {
        while let Some(message) = self.receiver.recv().await {
            match message {
                Message::Increment => self.counter += 1,
                Message::Fetch { respond_to } => {
                    respond_to.send(self.counter).unwrap();
                }
            }
        }
    }
}

fn my_actor(name: Name) -> MyActorHandle {
    let (sender, receiver) = acu::channel(name);
    let mut actor = MyActor {
        receiver,
        counter: 0,
    };
    tokio::spawn(async move { actor.run().await });
    MyActorHandle { sender }
}

type MyActorHandle = acu::Handle<Message, Name>;

use async_trait::async_trait;

#[async_trait]
trait MyActorExt {
    async fn increment(&self);
    async fn fetch(&self) -> Vec<usize>;
}

#[async_trait]
impl MyActorExt for MyActorHandle {
    async fn increment(&self) {
        self.sender.notify_with(|| Message::Increment).await
    }

    async fn fetch(&self) -> Vec<usize> {
        self.sender
            .call_many_with(|respond_to| Message::Fetch { respond_to }, 8)
            .await
    }
}

#[tokio::main]
async fn main() {
    let handle_a = my_actor(Name::MyActorA);
    let handle_b = my_actor(Name::MyActorB);
    let master = {
        let master = BroadcasterMasterHandle::new();
        master.push(handle_a).await;
        master.push(handle_b).await;
        master
    };
    let get_values = || async {
        let results = master.fetch().await;
        assert_eq!(results.len(), 2);
        (results[0], results[1])
    };
    let print_values = || async {
        let values = get_values().await;
        println!("counter of MyActorA = {}", values.0);
        println!("counter of MyActorB = {}", values.1);
        println!();
    };
    for _ in 0..100 {
        master.increment().await;
        print_values().await;
    }
    print_values().await;
    {
        let actor_a = master.find(Name::MyActorA).await.unwrap();
        for _ in 0..10 {
            actor_a.increment().await;
        }
    }
    print_values().await;
}

使用 MasterHandle(Message: ?Clone)

use acu::MasterHandle;
use acu::MasterExt;
use tokio::sync::oneshot;

#[derive(Debug, Clone, PartialEq, PartialOrd)]
enum Name {
    Master,
    MyActorA,
    MyActorB,
}

impl acu::MasterName for Name {
    fn master_name() -> Self {
        Self::Master
    }
}

impl AsRef<str> for Name {
    fn as_ref(&self) -> &str {
        match self {
            Name::Master => "master",
            Name::MyActorA => "my-actor-a",
            Name::MyActorB => "my-actor-b",
        }
    }
}

impl std::fmt::Display for Name {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        let s: &str = self.as_ref();
        f.write_str(s)
    }
}

#[derive(Debug)]
enum Message {
    Increment,
    Fetch {
        respond_to: oneshot::Sender<usize>,
    },
}

impl acu::Message for Message {}

struct MyActor {
    receiver: acu::Receiver<Message, Name>,
    counter: usize,
}

impl MyActor {
    async fn run(&mut self) {
        while let Some(message) = self.receiver.recv().await {
            match message {
                Message::Increment => self.counter += 1,
                Message::Fetch { respond_to } => {
                    respond_to.send(self.counter).unwrap();
                }
            }
        }
    }
}

fn my_actor(name: Name) -> MyActorHandle {
    let (sender, receiver) = acu::channel(name);
    let mut actor = MyActor {
        receiver,
        counter: 0,
    };
    tokio::spawn(async move { actor.run().await });
    MyActorHandle { sender }
}

type MyActorHandle = acu::Handle<Message, Name>;

use async_trait::async_trait;

#[async_trait]
trait MyActorExt {
    async fn increment(&self);
    async fn fetch(&self) -> usize;
}

#[async_trait]
impl MyActorExt for MyActorHandle {
    async fn increment(&self) {
        self.sender.notify_with(|| Message::Increment).await
    }

    async fn fetch(&self) -> usize {
        self.sender
            .call_with(|respond_to| Message::Fetch { respond_to })
            .await
    }
}

#[tokio::main]
async fn main() {
    let handle_a = my_actor(Name::MyActorA);
    let handle_b = my_actor(Name::MyActorB);
    let master = {
        let master = MasterHandle::new();
        master.push(handle_a).await;
        master.push(handle_b).await;
        master
    };
    let get_handles = || async {
        let handle_a = master.find(Name::MyActorA).await.unwrap();
        let handle_b = master.find(Name::MyActorA).await.unwrap();
        (handle_a, handle_b)
    };
    let get_values = || async {
        let (handle_a, handle_b) = get_handles().await;
        (handle_a.fetch().await, handle_b.fetch().await)
    };
    let print_values = || async {
        let values = get_values().await;
        println!("counter of MyActorA = {}", values.0);
        println!("counter of MyActorB = {}", values.1);
        println!();
    };
    for _ in 0..100 {
        let (handle_a, handle_b) = get_handles().await;
        handle_a.increment().await;
        handle_b.increment().await;
        print_values().await;
    }
    print_values().await;
    {
        let actor_a = master.find(Name::MyActorA).await.unwrap();
        for _ in 0..10 {
            actor_a.increment().await;
        }
    }
    print_values().await;
}

所有示例都可以在 examples/ 目录中找到。

动机

我想在我的几个项目中使用一些结构和函数,包括 Houseflow。我认为这也许对其他项目也有用。

依赖项

~3.5–5.5MB
~92K SLoC