24 个版本

0.8.11 2023年10月16日
0.8.10 2023年5月27日
0.8.8 2023年4月22日
0.8.5 2023年2月8日
0.0.0 2019年12月16日

#12#事件驱动架构

Download history 1051/week @ 2024-03-13 1404/week @ 2024-03-20 1171/week @ 2024-03-27 1422/week @ 2024-04-03 865/week @ 2024-04-10 542/week @ 2024-04-17 827/week @ 2024-04-24 856/week @ 2024-05-01 388/week @ 2024-05-08 1906/week @ 2024-05-15 550/week @ 2024-05-22 810/week @ 2024-05-29 441/week @ 2024-06-05 1123/week @ 2024-06-12 805/week @ 2024-06-19 248/week @ 2024-06-26

2,690 每月下载量
用于 11 个 crates (4 直接)

Apache-2.0

1MB
20K SLoC

Coerce-rs coerce-rs

Coerce-rs 是一个为 Rust 提供的异步 (async/await) Actor 运行时和分布式系统框架。它允许您用最少的代码构建一个高度可扩展、容错性强的现代 Actor 驱动型应用程序。

功能

Actors

  • 类型安全的 Actors
  • 监督 / 子进程生成
  • 位置透明的 ActorRef<A> 类型(ActorRef 可能包含一个 LocalActorRef<A> 或一个 RemoteActorRef<A>
  • 开箱即用的指标

远程通信

  • 在集群中的任何位置与 Actor 通信
  • Actors 可以部署在本地或远程节点上
  • Protobuf 网络协议
  • Actor 驱动的网络层

分布式 Sharding

  • Actor IDs 可以解析到特定的 shards,这些 shards 可以分布在一个 Coerce 节点的集群中
  • 自动负载均衡,shards 将在集群中公平分配
  • 节点丢失时的自我恢复,Actors 可以在健康的节点上自动重启

持久化

  • 日志记录 / 事件源
  • 快照
  • 可插拔的存储提供商(内存中和 Redis 可用,MySQL 计划中)

分布式 PubSub

  • Actors 可以在任何位置订阅可编程的主题
  • 系统级主题,用于接收更新的系统状态(例如新节点加入、节点丢失等)

HTTP API

  • 易于访问的指标和信息,有助于诊断

如何构建

构建 Coerce 很简单。您只需要安装最新的 Rust 稳定版或 nightly 版本,以及 Cargo。

# Clone the repository
git clone https://github.com/leonhartley/coerce-rs && cd coerce-rs

## run Cargo build to build the entire workspace, including the examples and the tests
cargo build

## Alternatively, if you'd like to build the library, dependencies and run the tests
cargo test

如何运行示例

Sharded Chat 示例

ActorSystem

每个 Actor 都属于一个 ActorSystem。

async/await Actors

“actor”是计算单元的另一种说法。它可以拥有可变状态,可以接收消息并执行动作。不过有一个限制……它一次只能做一件事。这很有用,因为它可以减少线程同步的需求,通常通过锁定(使用MutexRwLock等)来实现。

在Coerce中是如何实现的呢?

Coerce使用Tokio的MPSC通道(tokio::sync::mpsc::channel),每个创建的actor都会启动一个任务,监听来自Receiver的消息,处理并等待消息的结果。每个引用(ActorRef<A: Actor>)都持有Sender<M> where A: Handler<M>,它可以被克隆。

actor可以被停止,并且可以从应用程序的任何地方通过ID检索actor引用。ID是String,但如果创建时没有提供ID,则会生成一个新的Uuid。匿名actor在所有引用都被释放时自动丢弃(并标记为Stopped)。使用全局函数new_actor跟踪的actor必须停止。

基本ActorSystem + EchoActor示例

示例

pub struct EchoActor {}

#[async_trait]
impl Actor for EchoActor {}

pub struct EchoMessage(String);

impl Message for EchoMessage {
  type Result = String;
}

#[async_trait]
impl Handler<EchoMessage> for EchoActor {
  async fn handle(
    &mut self,
    message: EchoMessage,
    _ctx: &mut ActorContext,
  ) -> String {
    message.0.clone()
  }
}

pub async fn run() {
  let mut actor = new_actor(EchoActor {}).await.unwrap();

  let hello_world = "hello, world".to_string();
  let result = actor.send(EchoMessage(hello_world.clone())).await;

  assert_eq!(result, Ok(hello_world));
}

定时器示例

pub struct EchoActor {}

#[async_trait]
impl Actor for EchoActor {}

pub struct EchoMessage(String);

impl Message for EchoMessage {
  type Result = String;
}

pub struct PrintTimer(String);

impl TimerTick for PrintTimer {}

#[async_trait]
impl Handler<PrintTimer> for EchoActor {
  async fn handle(&mut self, msg: PrintTimer, _ctx: &mut ActorContext) {
    println!("{}", msg.0);
  }
}

pub async fn run() {
  let mut actor = new_actor(EchoActor {}).await.unwrap();
  let hello_world = "hello world!".to_string();

  // print "hello world!" every 5 seconds
  let timer = Timer::start(actor.clone(), Duration::from_secs(5), TimerTick(hello_world));

  // timer is stopped when handle is out of scope or can be stopped manually by calling `.stop()`
  sleep(Duration::from_secs(20)).await;
  timer.stop();
}

远程ActorSystem

依赖关系

~5–19MB
~238K SLoC