2 个版本
0.1.1 | 2021 年 3 月 27 日 |
---|---|
0.1.0 | 2021 年 3 月 24 日 |
#852 在 异步
17KB
136 行
cooper
一个简单的 Rust 进程内 Actor 库。
这是一个最小化的库,用于创建在单个进程内异步或线程执行上下文中运行的 Actor。它不是(也永远不会是)用于在不同应用程序或不同网络主机之间通信的方法。
这个库在某种程度上是独特的,因为发送给 Actor 的消息是 函数 和/或 闭包,指示它如何修改或查询内部状态。因此,传递给 Actor 的消息是执行 Actor 的函数本身。具体用户定义的类型可以利用这个核心轻松构建公共 API,而不必暴露消息传递基础设施。对外部程序而言,Actor 就像任何其他对象一样,尽管它的 API 按定义是高性能的、线程安全的和异步的。
该库提供了两种类型的实现
Actor<T>
兼容 async/await 并需要运行时,例如tokio
、async-std
或smol
ThreadedActor<T>
使用每个 Actor 的操作系统线程,但不需要额外的运行时。
如果你的应用程序需要使用数千个 Actor,频繁地启动和停止它们,或者将它们用于大量的 I/O 密集型操作,那么异步 Actor 将非常高效。但是,如果应用程序只需要几个 Actor,没有使用异步运行时,或者将执行大量的 CPU 密集型操作,那么线程 Actor 可能很有用。
根据需要,不同的 Actor 类型也可以在应用程序内混合使用。
开发状态
这是一个非常早期的概念验证,目前尚未准备就绪。
该项目正在积极开发中,以实现一个可以在生产环境中使用的最小可行产品(MVP)。在此期间,预期会有很多重大更改。
出于历史原因,这个初始实现使用 smol
库进行通道和执行器,但在发布时,它将要么对运行时不可知,或者允许您在 tokio
、async-std
或 smol
之间进行选择。
巨大的感谢
这个项目在 GitHub 上闲置了三年,直到我等待异步/await 稳定下来,然后试图理解它。经过几次过于复杂的失败尝试……比如编写一个完整的串行执行器……我意识到我只想有一个简单的异步循环来执行发送给它的闭包。如果我能想出如何排队这些闭包……
向@Darksonn表示衷心的感谢,他教我如何在通道中发送异步闭包。
https://users.rust-lang.org/t/sending-futures-through-a-channel/57229
请查看她的博客,关于简单Actor的内容,这与本库的核心内容在精神上非常相似。
贡献
我们非常欢迎任何贡献,包括提示、技巧、窍门或其他建议。
请将任何拉取请求针对develop
分支,因为我们将会用它进行集成和测试。在初期开发期间,我们将努力保持master
分支的相对稳定。
基础
Actor是一个包含内部状态和私有执行上下文的对象,可以用来更新该状态。应用程序和其他Actor通过向它发送消息与之通信,Actor按顺序顺序处理这些消息。由于状态仅对内部执行上下文可用,因此不存在数据竞争或争用,也不需要锁。
本库借鉴了Erlang和Elixir语言的一些想法和术语
-
cast
是一种异步消息,将其排入Actor的队列,没有返回值。它纯粹是“发射和忘记”。cast仅仅将请求放入Actor的队列中,并不等待其执行。 -
call
是一种同步操作,它等待请求执行并返回一个结果。调用者将请求放入actor的队列中,然后阻塞当前任务,直到请求执行并返回一个值。这可以用来查询actor的状态,在不需要返回结果的情况下也很有用,甚至可以提供背压。
请注意,这里的术语在“同步”和“阻塞”调用方面有些混淆。整个库都在Rust的async
环境中运行。但是,await一个cast
操作仅仅是等待请求被排队,而await一个call
操作则是等待请求在actor的任务中执行并返回结果。
该库的Actor
是围绕内部状态类型S
的一个泛型结构体。Actor实例接受一个初始状态值,或使用S::default()
,并将其发送到一个生成的任务以等待请求。实际的Actor
只是一个围绕可以用于使用cast
和call
函数将请求排队到内部任务的通道Sender
的包装器。Actor可以根据需要克隆并发送到其他任务和线程。当最后一个离开作用域时,内部任务退出,并丢弃状态对象。
可以很容易地围绕Actor
构建具体的actor类型,通过提供状态类型、值和操作它的函数/闭包。
以下是一个示例,这是一个可以在不同任务之间共享的键/值映射
/// The internal state type for the Actor
type State = HashMap<String, String>;
/// An actor that can act as a shared key/value store of strings.
#[derive(Clone)]
pub struct SharedMap {
actor: Actor<State>,
}
impl SharedMap {
/// Create a new actor to share a key/value map of string.
pub fn new() -> Self {
Self { actor: Actor::new() }
}
/// Insert a value into the shared map.
pub async fn insert(&self, key: String, val: String) {
self.actor.cast(|state| Box::pin(async move {
state.insert(key, val);
})).await
}
/// Gets the value, if any, from the shared map that is
/// associated with the key.
pub async fn get(&self, key: String) -> Option<String> {
self.actor.call(|state| Box::pin(async move {
state.get(&key).map(|v| v.to_string())
})).await
}
}
闭包的state
参数是State
类型的对象的可变引用。所以,明确地说
self.actor.cast(|state: &mut State| ... ).await
因此,闭包被赋予了在每次cast/call操作中对actor状态对象的可变引用。每个闭包都拥有对状态对象的独家、可变访问权限,并保证按照接收的顺序运行到完成,因此在状态上是原子的。
继续以上面的例子,共享的map可以被应用程序或其他actor在异步块中使用,如下所示:
async {
let map = SharedMap::new();
map.insert("city", "Boston").await;
assert_eq!(
map.get("city").await,
Some("Boston".to_string())
);
});
依赖项
~5–17MB
~176K SLoC