#分布式系统 #actor #redis #事件溯源 #actor 框架 #快照 #actor 模型

coerce-redis

Coerce 的 Redis Actor 持久化提供者。支持事件溯源和快照功能

8 个版本

0.4.4 2023 年 10 月 16 日
0.4.3 2023 年 5 月 27 日
0.4.1 2023 年 4 月 22 日
0.3.0 2022 年 12 月 4 日
0.1.0 2022 年 7 月 1 日

#202 in 异步

每月 33 次下载

Apache-2.0

1MB
21K SLoC

Coerce-rs coerce-rs

Coerce-rs 是一个异步(async/await)Actor 运行时和分布式系统框架,用于 Rust。它允许以极其简单而强大的方式开发基于 Actor 的分布式系统。通过最小化代码,您可以构建一个高度可扩展、容错性强的现代 Actor 驱动型应用程序。

用途 最新版本
coerce Coerce 的主要运行时和框架 crates.io
coerce-redis Redis Actor 持久化提供者。允许从 Redis 读取和写入事件源和快照。 crates.io
coerce-macros 有用的宏,允许快速实现快照、可 JSON 序列化的远程消息等。 crates.io
coerce-k8s Kubernetes 发现提供者,根据可配置的 pod 选择标签自动发现托管在 Kubernetes 中的集群对等体。 crates.io

在自己的项目中使用 Coerce

在项目中使用 Coerce 的第一步是添加 coerce crate 依赖,这可以通过在您的 Cargo.toml 中添加以下内容来完成

[dependencies]
coerce = { version = "0.8", features = ["full"] }

可选:启用 tracing/valuable

Coerce 提供了对 tracing/valuable 的支持,它可以用来丰富日志中的 Actor 上下文信息。这目前是一个不稳定的功能,可以通过添加 coerce/tracing-unstable 功能和以下部分到您的 .cargo/config.toml 文件来启用

[build]
rustflags = ["--cfg", "tracing_unstable"]

注意:如果您的项目已经依赖于 tracing crate,您还需要启用 valuable 功能!

功能

Actor

  • 类型安全的 Actor
  • 监督/子进程创建
  • 位置透明的 ActorRef<A> 类型(ActorRef 可能包含 LocalActorRef<A>RemoteActorRef<A>
  • 开箱即用的指标

远程通信

  • 在集群的任何地方与演员进行通信
  • 演员可以本地部署或部署到其他远程节点
  • Protobuf 网络协议
  • 演员驱动的网络层

分布式分片

  • 演员 ID 可以解析为特定的分片,这些分片可以分布在 Coerce 节点的集群中
  • 自动负载均衡,分片将在集群中公平分配
  • 节点丢失时自动恢复,演员可以在其他健康节点上自动重启

持久化

  • 日志记录/事件源
  • 快照
  • 可插拔的存储提供商(内存和 Redis 立即可用,MySQL 计划中)

分布式 Pub/Sub

  • 演员可以从集群的任何地方订阅可编程主题
  • 系统级主题提供以接收更新的系统状态(例如新节点加入,节点丢失等。)

HTTP API

  • 易于访问的指标和信息,有助于诊断

构建和测试 Coerce 库

构建 Coerce 很简单。您只需要安装最新的 Rust 稳定或夜间版本,以及 Cargo。

# Clone the repository
git clone https://github.com/leonhartley/coerce-rs && cd coerce-rs

## run Cargo build to build the entire workspace, including the examples and the tests
cargo build

## Alternatively, if you'd like to build the library, dependencies and run the tests
cargo test --all-features

如何运行示例

分片聊天示例

ActorSystem

每个演员都属于一个 ActorSystem。

async/await 演员们

演员只是计算单元的另一个名字。它可以有可变状态,它可以接收消息并执行操作。但有一个缺点...它一次只能做一件事。这很有用,因为它可以减少线程同步的需要,通常通过锁定(使用 MutexRwLock 等)来实现。

在 Coerce 中是如何实现的?

Coerce 使用 Tokio 的 MPSC 通道(tokio::sync::mpsc::channel),每个创建的演员都会启动一个任务,该任务监听来自 Receiver 的消息,处理并等待消息的结果。每个引用(ActorRef<A: Actor>)都持有一个 Sender<M> where A: Handler<M>,它可以被克隆。

演员可以被停止,并且可以从应用程序的任何地方通过 ID 获取演员引用。ID 是 String,但如果在创建时没有提供 ID,将生成一个新的 Uuid。匿名演员在所有引用丢失时(和 Stopped)自动删除。跟踪演员(使用全局函数 new_actor)必须停止。

基本的 ActorSystem + EchoActor 示例

示例

pub struct EchoActor {}

#[async_trait]
impl Actor for EchoActor {}

pub struct EchoMessage(String);

impl Message for EchoMessage {
    type Result = String;
}

#[async_trait]
impl Handler<EchoMessage> for EchoActor {
    async fn handle(
        &mut self,
        message: EchoMessage,
        _ctx: &mut ActorContext,
    ) -> String {
        message.0.clone()
    }
}

pub async fn run() {
    let mut actor = new_actor(EchoActor {}).await.unwrap();

    let hello_world = "hello, world".to_string();
    let result = actor.send(EchoMessage(hello_world.clone())).await;

    assert_eq!(result, Ok(hello_world));
}

计时器示例

pub struct EchoActor {}

#[async_trait]
impl Actor for EchoActor {}

pub struct EchoMessage(String);

impl Message for EchoMessage {
    type Result = String;
}

pub struct PrintTimer(String);

impl TimerTick for PrintTimer {}

#[async_trait]
impl Handler<PrintTimer> for EchoActor {
    async fn handle(&mut self, msg: PrintTimer, _ctx: &mut ActorContext) {
        println!("{}", msg.0);
    }
}

pub async fn run() {
    let mut actor = new_actor(EchoActor {}).await.unwrap();
    let hello_world = "hello world!".to_string();

    // print "hello world!" every 5 seconds
    let timer = Timer::start(actor.clone(), Duration::from_secs(5), TimerTick(hello_world));

    // timer is stopped when handle is out of scope or can be stopped manually by calling `.stop()`
    sleep(Duration::from_secs(20)).await;
    timer.stop();
}

依赖关系

~10–23MB
~314K SLoC