#actor #macro #key-value-store #macro-derive #thread

bin derive-aktor

使用 derive 宏在 Rust 中创建 actor

6 个版本

0.1.6 2020 年 10 月 14 日
0.1.4 2020 年 6 月 10 日
0.1.3 2020 年 3 月 29 日

#1502过程宏


3 个 Crates 中使用(通过 sqs-lambda

MIT 许可证

28KB
465

derive_aktor

一个生成 Rust actor 的宏

derive_aktor 导出 derive_actor 宏。这个宏可以接受一个结构的 impl 并为其生成一个 actor,其中 actor 具有与 impl 相当的、类型化的 API。

这使得编写类型化、名义化、异步 API 变得容易。

这与许多 actor 实现(如)形成对比,这些实现要么

a) 不强制消息的类型安全 b) 仅暴露一个用于与 actor 通信的 API,如 "send",并强制你构造消息

示例

这里有一个简单的 "KeyValueStore" 示例。我们可以异步地与之交互,跨线程共享它,

pub struct KeyValueStore<U>
    where U: Hash + Eq + Send + 'static
{
    inner_store: HashMap<U, String>,
    self_actor: Option<KeyValueStoreActor<U>>,
}

impl<U: Hash + Eq + Send + 'static> KeyValueStore<U> {
    pub fn new() -> Self {
        Self {
            inner_store: HashMap::new(),
            self_actor: None,
        }
    }
}

// All methods in this block form our Actor's API
#[derive_actor]
impl<U: Hash + Eq + Send + 'static> KeyValueStore<U> {
    pub fn query(&self, key: U, f: Box<dyn Fn(Option<String>) + Send + 'static>) {
        println!("query");
        f(self.inner_store.get(&key).map(String::from))
    }

    pub fn set(&mut self, key: U, value: String) {
        println!("set");
        self.inner_store.insert(key, value);
    }
}


#[tokio::main]
async fn main() {

    let (kv_store, handle) = KeyValueStoreActor::new(KeyValueStore::new()).await;
    
    // We can use an async API that's typed and nominal
    kv_store.query("foo", Box::new(|value| println!("before {:?}", value))).await;
    kv_store.set("foo", "bar".to_owned()).await;
    kv_store.query("foo", Box::new(|value| println!("after {:?}", value))).await;

    // We must drop any references to kv_store before we await the handle, or it will leak!

    drop(kv_store);
    handle.await;
}

示例 - Actor 通信


struct Logger {}

#[derive_actor]
impl Logger {
    pub fn log(&self, data: String) {info!("{}", data)}
}



struct Simple{}

#[derive_actor]
impl Simple {
    pub fn takes_actor(&self, log_actor: LoggerActor) {
        info!("Logging up here");
        log_actor.log("and logging over here".to_owned()).await;
    }
}

#[tokio::main]
async fn main() {

    let (logger, l_handle) = LoggerActor::new(Logger{}).await;
    let (simple, s_handle) = SimpleActor::new(Simple{}).await;

    simple.takes_actor(logger.clone());
    
    drop(logger);
    drop(simple);

    l_handle.await;
    s_handle.await;  // you could also join!(l_handle, s_handle);
}

什么是 Actor?

Actor 是一种并发原语,类似于线程,您通过消息传递与之通信。

实现

术语

  • Actor - 这是生成的 Actor 结构体
  • ActorImpl - 这是您编写的具有 impl 的结构体

Actor

Actor 实际上只是一个围绕 mpsc Sender 的包装器。Actor 提供了一个基于您附加 derive_actor 宏的 impl 块的 API。

例如,

#[derive_actor]
impl MyStruct {
    pub fn my_method(&mut self) {}
}

会生成一个 Actor

struct MyStructActor {
    // ..
}

impl MyStructActor {
    // ...
    async pub fn my_method(&self) {
      // package the message, place it on the internal queue, pass it along
    }
}

Actor 生命周期管理

Actor 在内部是引用计数的。

当 Actor 被释放时

  • Actor 只被自己引用
  • Actor 的队列中没有消息

由于 Rust 缺少异步 drop,这意味着在某些情况下您必须显式地 drop actor。

此外,为了确保 actor 在程序终止前完全处理所有消息,actor 构造返回一个 handle,您可以等待它。这类似于线程 API。如果您不需要依赖于 actor 完成或在其他地方发出完成信号,您可以 drop handle。

错误处理

如果 ActorImpl 发生 panic,错误目前被 吞没

跟踪

目前所有 actor 方法都带有跟踪 instrument 注释,该注释将按其唯一标识符记录 actor。请注意,每个 Actor 都有一个新的身份,即使是 Actor 的克隆也有一个唯一身份。

状态

我对proc宏不是很擅长,欢迎贡献。以下是一些开放的问题

[] 实现块中的泛型不能使用where子句 [] Actor上的泛型是出现在你的actor结构体和方法的泛型之和,这是不必要的。可以生成一个只提升与方法参数实际对应的泛型的Actor。 [] 即使你从未引用self_actor,它仍然存在,这也是不必要的。

依赖项

~7–13MB
~135K SLoC