16个不稳定版本 (3个破坏性更新)
0.4.1 | 2022年3月29日 |
---|---|
0.4.0 | 2022年3月28日 |
0.3.6 | 2022年3月28日 |
0.3.3 | 2022年2月18日 |
0.1.3 | 2022年2月6日 |
#573 in 异步
33KB
625 代码行
acu
构建异步演员的实用工具crate。
在使用此crate之前,我建议您了解Rust中的actor模式,Alice Ryhl创建了一篇非常有用的博客文章。
入门
将crate添加到依赖项
使用 cargo-edit
cargo add acu
或者手动...
构建您的第一个Actor
use tokio::sync::oneshot;
#[derive(Debug)]
enum Message {
Increment,
Get { respond_to: oneshot::Sender<usize> },
}
impl acu::Message for Message {}
struct MyActor {
receiver: acu::Receiver<Message, &'static str>,
counter: usize,
}
impl MyActor {
async fn run(&mut self) {
while let Some(message) = self.receiver.recv().await {
match message {
Message::Increment => self.counter += 1,
Message::Get { respond_to } => respond_to.send(self.counter).unwrap(),
}
}
}
}
#[derive(Debug, Clone)]
struct MyActorHandle {
sender: acu::Sender<Message, &'static str>,
}
impl MyActorHandle {
pub fn new() -> Self {
let (sender, receiver) = acu::channel(8, "MyActor");
let mut actor = MyActor {
receiver,
counter: 0,
};
tokio::spawn(async move { actor.run().await });
Self { sender }
}
pub async fn increment(&self) {
self.sender.notify_with(|| Message::Increment).await
}
pub async fn get(&self) -> usize {
self.sender
.call_with(|respond_to| Message::Get { respond_to })
.await
}
}
#[tokio::main]
async fn main() {
let handle = MyActorHandle::new();
println!("initial counter: {}", handle.get().await);
for _ in 0..100 {
handle.increment().await;
}
println!("counter after 100 increments: {}", handle.get().await);
}
或者如果您想使用日志功能,您需要初始化 log
,例如使用 simple-log crate
// at the top of the main function
simple_log::quick!("debug");
然后actor上的每个调用/通知都将被记录。
主从模式
您需要启用crate的 master-slave
功能。
您需要做出的决定是,Actor消息是否实现了 Clone
trait,如果是的话,您可以使用 BroadcasterMasterHandle
,它允许您直接使用actor方法;如果不是,您将只能使用 MasterHandle
,您不能使用actor方法。
使用 BroadcasterMasterHandle
(Message: Clone)
use acu::BroadcasterMasterHandle;
use acu::MasterExt;
use tokio::sync::broadcast;
#[derive(Debug, Clone, PartialEq, PartialOrd)]
enum Name {
Master,
MyActorA,
MyActorB,
}
impl acu::MasterName for Name {
fn master_name() -> Self {
Self::Master
}
}
impl AsRef<str> for Name {
fn as_ref(&self) -> &str {
match self {
Name::Master => "master",
Name::MyActorA => "my-actor-a",
Name::MyActorB => "my-actor-b",
}
}
}
impl std::fmt::Display for Name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s: &str = self.as_ref();
f.write_str(s)
}
}
#[derive(Debug, Clone)]
enum Message {
Increment,
Fetch {
respond_to: broadcast::Sender<usize>,
},
}
impl acu::Message for Message {}
struct MyActor {
receiver: acu::Receiver<Message, Name>,
counter: usize,
}
impl MyActor {
async fn run(&mut self) {
while let Some(message) = self.receiver.recv().await {
match message {
Message::Increment => self.counter += 1,
Message::Fetch { respond_to } => {
respond_to.send(self.counter).unwrap();
}
}
}
}
}
fn my_actor(name: Name) -> MyActorHandle {
let (sender, receiver) = acu::channel(name);
let mut actor = MyActor {
receiver,
counter: 0,
};
tokio::spawn(async move { actor.run().await });
MyActorHandle { sender }
}
type MyActorHandle = acu::Handle<Message, Name>;
use async_trait::async_trait;
#[async_trait]
trait MyActorExt {
async fn increment(&self);
async fn fetch(&self) -> Vec<usize>;
}
#[async_trait]
impl MyActorExt for MyActorHandle {
async fn increment(&self) {
self.sender.notify_with(|| Message::Increment).await
}
async fn fetch(&self) -> Vec<usize> {
self.sender
.call_many_with(|respond_to| Message::Fetch { respond_to }, 8)
.await
}
}
#[tokio::main]
async fn main() {
let handle_a = my_actor(Name::MyActorA);
let handle_b = my_actor(Name::MyActorB);
let master = {
let master = BroadcasterMasterHandle::new();
master.push(handle_a).await;
master.push(handle_b).await;
master
};
let get_values = || async {
let results = master.fetch().await;
assert_eq!(results.len(), 2);
(results[0], results[1])
};
let print_values = || async {
let values = get_values().await;
println!("counter of MyActorA = {}", values.0);
println!("counter of MyActorB = {}", values.1);
println!();
};
for _ in 0..100 {
master.increment().await;
print_values().await;
}
print_values().await;
{
let actor_a = master.find(Name::MyActorA).await.unwrap();
for _ in 0..10 {
actor_a.increment().await;
}
}
print_values().await;
}
使用 MasterHandle
(Message: ?Clone)
use acu::MasterHandle;
use acu::MasterExt;
use tokio::sync::oneshot;
#[derive(Debug, Clone, PartialEq, PartialOrd)]
enum Name {
Master,
MyActorA,
MyActorB,
}
impl acu::MasterName for Name {
fn master_name() -> Self {
Self::Master
}
}
impl AsRef<str> for Name {
fn as_ref(&self) -> &str {
match self {
Name::Master => "master",
Name::MyActorA => "my-actor-a",
Name::MyActorB => "my-actor-b",
}
}
}
impl std::fmt::Display for Name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s: &str = self.as_ref();
f.write_str(s)
}
}
#[derive(Debug)]
enum Message {
Increment,
Fetch {
respond_to: oneshot::Sender<usize>,
},
}
impl acu::Message for Message {}
struct MyActor {
receiver: acu::Receiver<Message, Name>,
counter: usize,
}
impl MyActor {
async fn run(&mut self) {
while let Some(message) = self.receiver.recv().await {
match message {
Message::Increment => self.counter += 1,
Message::Fetch { respond_to } => {
respond_to.send(self.counter).unwrap();
}
}
}
}
}
fn my_actor(name: Name) -> MyActorHandle {
let (sender, receiver) = acu::channel(name);
let mut actor = MyActor {
receiver,
counter: 0,
};
tokio::spawn(async move { actor.run().await });
MyActorHandle { sender }
}
type MyActorHandle = acu::Handle<Message, Name>;
use async_trait::async_trait;
#[async_trait]
trait MyActorExt {
async fn increment(&self);
async fn fetch(&self) -> usize;
}
#[async_trait]
impl MyActorExt for MyActorHandle {
async fn increment(&self) {
self.sender.notify_with(|| Message::Increment).await
}
async fn fetch(&self) -> usize {
self.sender
.call_with(|respond_to| Message::Fetch { respond_to })
.await
}
}
#[tokio::main]
async fn main() {
let handle_a = my_actor(Name::MyActorA);
let handle_b = my_actor(Name::MyActorB);
let master = {
let master = MasterHandle::new();
master.push(handle_a).await;
master.push(handle_b).await;
master
};
let get_handles = || async {
let handle_a = master.find(Name::MyActorA).await.unwrap();
let handle_b = master.find(Name::MyActorA).await.unwrap();
(handle_a, handle_b)
};
let get_values = || async {
let (handle_a, handle_b) = get_handles().await;
(handle_a.fetch().await, handle_b.fetch().await)
};
let print_values = || async {
let values = get_values().await;
println!("counter of MyActorA = {}", values.0);
println!("counter of MyActorB = {}", values.1);
println!();
};
for _ in 0..100 {
let (handle_a, handle_b) = get_handles().await;
handle_a.increment().await;
handle_b.increment().await;
print_values().await;
}
print_values().await;
{
let actor_a = master.find(Name::MyActorA).await.unwrap();
for _ in 0..10 {
actor_a.increment().await;
}
}
print_values().await;
}
所有示例都可以在 examples/
目录中找到。
动机
我想在我的几个项目中使用一些结构和函数,包括 Houseflow。我认为这也许对其他项目也有用。
依赖项
~3.5–5.5MB
~92K SLoC