2 个不稳定版本
0.2.0 | 2022年11月27日 |
---|---|
0.1.0 | 2022年7月1日 |
7 #coerce
在 coerce 中使用
10KB
189 行
Coerce-rs
Coerce-rs 是一个异步(async/await)Actor 运行时和分布式系统框架,用于 Rust。它允许以极其简单但强大的方式开发基于 Actor 的分布式系统。通过最少的代码,你可以构建一个高度可扩展、容错性强的现代 Actor 驱动型应用。
包
包 | 用途 | 最新版本 |
---|---|---|
coerce | Coerce 的主要运行时和框架 | |
coerce-redis | Redis 的 Actor 持久化提供程序。允许从 Redis 读取和写入事件源和快照。 | |
coerce-macros | 有用的宏,允许快速实现快照、可 JSON 序列化的远程消息等。 | |
coerce-k8s | Kubernetes 发现提供程序,基于可配置的 pod 选择标签自动发现托管在 Kubernetes 中的集群对等体。 |
在您的项目中使用 Coerce
在您的项目中使用 Coerce 的第一步是添加 coerce 包依赖项,这可以通过向您的 Cargo.toml 添加以下内容来完成
[dependencies]
coerce = { version = "0.8", features = ["full"] }
可选:启用 tracing / valuable
Coerce 提供了对 tracing / valuable
的支持,这可以用于丰富日志中关于 Actor 上下文的信息。这目前是一个不稳定的功能,可以通过添加 coerce / tracing-unstable
功能和以下部分到您的 .cargo/config.toml
文件来启用
[build]
rustflags = ["--cfg", "tracing_unstable"]
注意:如果您的项目已经依赖于 tracing
包,您还需要启用 valuable
功能!
特性
Actor
- 类型安全的 Actor
- 监督 / 子进程创建
- 位置透明的
ActorRef<A>
类型(ActorRef 可能包含LocalActorRef<A>
或RemoteActorRef<A>
) - 开箱即用的度量指标
远程通信
- 从集群中的任何位置与演员进行通信
- 演员可以本地部署或部署到其他远程节点
- Protobuf 网络协议
- 演员驱动的网络层
分布式分片
- 演员 ID 可以解析到特定的分片中,这些分片可以分布在整个 Coerce 节点集群中
- 自动负载均衡,分片将在集群中公平分配
- 节点丢失时自动恢复,演员可以在其他健康节点上自动重启
持久性
- 日志记录/事件溯源
- 快照
- 可插拔的存储提供者(内存和 redis 现成可用,MySQL 计划中)
分布式 Pub/Sub
- 演员可以从集群中的任何位置订阅可编程主题
- 提供系统级主题以接收更新后的系统状态(例如,新节点加入,节点丢失等)
HTTP API
- 易于访问的指标和信息,有助于诊断
构建和测试 Coerce 库
构建 Coerce 很简单。您只需要安装最新版本的 Rust 稳定版或夜间版,以及 Cargo。
# Clone the repository
git clone https://github.com/leonhartley/coerce-rs && cd coerce-rs
## run Cargo build to build the entire workspace, including the examples and the tests
cargo build
## Alternatively, if you'd like to build the library, dependencies and run the tests
cargo test --all-features
如何运行示例
分片聊天示例
ActorSystem
每个演员都属于一个 ActorSystem。
async/await 演员
演员只是计算单元的另一种说法。它可以有可变状态,可以接收消息并执行操作。但是有一个限制...它一次只能做一件事。这很有用,因为它可以减少对线程同步的需求,通常通过锁定(使用 Mutex
、RwLock
等)来实现。
在 Coerce 中是如何实现的呢?
Coerce 使用 Tokio 的 MPSC 通道(tokio::sync::mpsc::channel),每个创建的演员都会启动一个任务,该任务监听来自 Receiver
的消息,处理并等待消息的结果。每个引用(ActorRef<A: Actor>
)都持有 Sender<M> where A: Handler<M>
),可以被克隆。
演员可以被停止,并且可以从应用程序的任何位置通过 ID 获取演员引用。ID 是 String
,但如果在创建时未提供 ID,则将生成一个新的 Uuid
。匿名演员在所有引用都丢失时(和 Stopped
)自动删除。使用全局 fn new_actor
的跟踪演员必须停止。
基本 ActorSystem + EchoActor 示例
示例
pub struct EchoActor {}
#[async_trait]
impl Actor for EchoActor {}
pub struct EchoMessage(String);
impl Message for EchoMessage {
type Result = String;
}
#[async_trait]
impl Handler<EchoMessage> for EchoActor {
async fn handle(
&mut self,
message: EchoMessage,
_ctx: &mut ActorContext,
) -> String {
message.0.clone()
}
}
pub async fn run() {
let mut actor = new_actor(EchoActor {}).await.unwrap();
let hello_world = "hello, world".to_string();
let result = actor.send(EchoMessage(hello_world.clone())).await;
assert_eq!(result, Ok(hello_world));
}
计时器示例
pub struct EchoActor {}
#[async_trait]
impl Actor for EchoActor {}
pub struct EchoMessage(String);
impl Message for EchoMessage {
type Result = String;
}
pub struct PrintTimer(String);
impl TimerTick for PrintTimer {}
#[async_trait]
impl Handler<PrintTimer> for EchoActor {
async fn handle(&mut self, msg: PrintTimer, _ctx: &mut ActorContext) {
println!("{}", msg.0);
}
}
pub async fn run() {
let mut actor = new_actor(EchoActor {}).await.unwrap();
let hello_world = "hello world!".to_string();
// print "hello world!" every 5 seconds
let timer = Timer::start(actor.clone(), Duration::from_secs(5), TimerTick(hello_world));
// timer is stopped when handle is out of scope or can be stopped manually by calling `.stop()`
sleep(Duration::from_secs(20)).await;
timer.stop();
}
依赖项
~1.5MB
~35K SLoC