#actors #future

cooper

一个简单的、进程内、异步 Actor 库,适用于 Rust

2 个版本

0.1.1 2021 年 3 月 27 日
0.1.0 2021 年 3 月 24 日

#852异步

MIT 许可证

17KB
136

cooper

一个简单的 Rust 进程内 Actor 库。

这是一个最小化的库,用于创建在单个进程内异步或线程执行上下文中运行的 Actor。它不是(也永远不会是)用于在不同应用程序或不同网络主机之间通信的方法。

这个库在某种程度上是独特的,因为发送给 Actor 的消息是 函数 和/或 闭包,指示它如何修改或查询内部状态。因此,传递给 Actor 的消息是执行 Actor 的函数本身。具体用户定义的类型可以利用这个核心轻松构建公共 API,而不必暴露消息传递基础设施。对外部程序而言,Actor 就像任何其他对象一样,尽管它的 API 按定义是高性能的、线程安全的和异步的。

该库提供了两种类型的实现

  • Actor<T> 兼容 async/await 并需要运行时,例如 tokioasync-stdsmol
  • ThreadedActor<T> 使用每个 Actor 的操作系统线程,但不需要额外的运行时。

如果你的应用程序需要使用数千个 Actor,频繁地启动和停止它们,或者将它们用于大量的 I/O 密集型操作,那么异步 Actor 将非常高效。但是,如果应用程序只需要几个 Actor,没有使用异步运行时,或者将执行大量的 CPU 密集型操作,那么线程 Actor 可能很有用。

根据需要,不同的 Actor 类型也可以在应用程序内混合使用。

开发状态

这是一个非常早期的概念验证,目前尚未准备就绪。

该项目正在积极开发中,以实现一个可以在生产环境中使用的最小可行产品(MVP)。在此期间,预期会有很多重大更改。

出于历史原因,这个初始实现使用 smol 库进行通道和执行器,但在发布时,它将要么对运行时不可知,或者允许您在 tokioasync-stdsmol 之间进行选择。

巨大的感谢

这个项目在 GitHub 上闲置了三年,直到我等待异步/await 稳定下来,然后试图理解它。经过几次过于复杂的失败尝试……比如编写一个完整的串行执行器……我意识到我只想有一个简单的异步循环来执行发送给它的闭包。如果我能想出如何排队这些闭包……

向@Darksonn表示衷心的感谢,他教我如何在通道中发送异步闭包。

https://users.rust-lang.org/t/sending-futures-through-a-channel/57229

请查看她的博客,关于简单Actor的内容,这与本库的核心内容在精神上非常相似。

使用Tokio的Actor

贡献

我们非常欢迎任何贡献,包括提示、技巧、窍门或其他建议。

请将任何拉取请求针对develop分支,因为我们将会用它进行集成和测试。在初期开发期间,我们将努力保持master分支的相对稳定。

基础

Actor是一个包含内部状态和私有执行上下文的对象,可以用来更新该状态。应用程序和其他Actor通过向它发送消息与之通信,Actor按顺序顺序处理这些消息。由于状态仅对内部执行上下文可用,因此不存在数据竞争或争用,也不需要锁。

本库借鉴了Erlang和Elixir语言的一些想法和术语

  • cast是一种异步消息,将其排入Actor的队列,没有返回值。它纯粹是“发射和忘记”。cast仅仅将请求放入Actor的队列中,并不等待其执行。

  • call是一种同步操作,它等待请求执行并返回一个结果。调用者将请求放入actor的队列中,然后阻塞当前任务,直到请求执行并返回一个值。这可以用来查询actor的状态,在不需要返回结果的情况下也很有用,甚至可以提供背压。

请注意,这里的术语在“同步”和“阻塞”调用方面有些混淆。整个库都在Rust的async环境中运行。但是,await一个cast操作仅仅是等待请求被排队,而await一个call操作则是等待请求在actor的任务中执行并返回结果。

该库的Actor是围绕内部状态类型S的一个泛型结构体。Actor实例接受一个初始状态值,或使用S::default(),并将其发送到一个生成的任务以等待请求。实际的Actor只是一个围绕可以用于使用castcall函数将请求排队到内部任务的通道Sender的包装器。Actor可以根据需要克隆并发送到其他任务和线程。当最后一个离开作用域时,内部任务退出,并丢弃状态对象。

可以很容易地围绕Actor构建具体的actor类型,通过提供状态类型、值和操作它的函数/闭包。

以下是一个示例,这是一个可以在不同任务之间共享的键/值映射

/// The internal state type for the Actor
type State = HashMap<String, String>;

/// An actor that can act as a shared key/value store of strings.
#[derive(Clone)]
pub struct SharedMap {
    actor: Actor<State>,
}

impl SharedMap {
    /// Create a new actor to share a key/value map of string.
    pub fn new() -> Self {
        Self { actor: Actor::new() }
    }

    /// Insert a value into the shared map.
    pub async fn insert(&self, key: String, val: String) {
        self.actor.cast(|state| Box::pin(async move {
            state.insert(key, val);
        })).await
    }


    /// Gets the value, if any, from the shared map that is
    /// associated with the key.
    pub async fn get(&self, key: String) -> Option<String> {
        self.actor.call(|state| Box::pin(async move {
            state.get(&key).map(|v| v.to_string())
        })).await
    }
}

闭包的state参数是State类型的对象的可变引用。所以,明确地说

self.actor.cast(|state: &mut State| ... ).await

因此,闭包被赋予了在每次cast/call操作中对actor状态对象的可变引用。每个闭包都拥有对状态对象的独家、可变访问权限,并保证按照接收的顺序运行到完成,因此在状态上是原子的。

继续以上面的例子,共享的map可以被应用程序或其他actor在异步块中使用,如下所示:


    async {
        let map = SharedMap::new();

        map.insert("city", "Boston").await;

        assert_eq!(
            map.get("city").await,
            Some("Boston".to_string())
        );
    });

依赖项

~5–17MB
~176K SLoC