38 个版本

0.10.3 2024年5月24日
0.9.8 2024年5月2日
0.9.7 2024年3月16日
0.9.3 2023年10月13日
0.7.5 2023年3月24日

#22 in 异步

Download history 876/week @ 2024-05-02 892/week @ 2024-05-09 1449/week @ 2024-05-16 1465/week @ 2024-05-23 1513/week @ 2024-05-30 1435/week @ 2024-06-06 1237/week @ 2024-06-13 1461/week @ 2024-06-20 1442/week @ 2024-06-27 1497/week @ 2024-07-04 1340/week @ 2024-07-11 1108/week @ 2024-07-18 1004/week @ 2024-07-25 1439/week @ 2024-08-01 1248/week @ 2024-08-08 1354/week @ 2024-08-15

5,209 每月下载量
5 crates 中使用

MIT 许可证

540KB
11K SLoC

ractor

发音为 ract-er

一个纯 Rust actor 框架。受 Erlang 的 gen_server 启发,结合了 Rust 的速度和性能!

  • github
  • crates.io
  • docs.rs
  • docs.rs
  • CI/main
  • codecov
  • ractor: ractor 下载
  • ractor_cluster: ractor_cluster 下载

网站 Ractor 有一个配套网站,提供更详细的入门指南和一些最佳实践,并定期更新。Api 文档仍可在 docs.rs 上找到,但这是一个补充网站。试试看! https://slawlor.github.io/ractor/

关于

ractor 尝试解决在 Rust 中构建和维护类似 Erlang 的 actor 框架的问题。它提供了一套通用原语,并帮助自动化监督树和 actor 的管理,以及传统的 actor 消息处理逻辑。最初它被设计为使用 tokio 运行时,但现在也支持 async-std 运行时。

ractor 是一个用 100% Rust 编写的现代 actor 框架。

此外,ractor 还有一个配套库,ractor_cluster,这是在分布式(类似集群)场景中部署 ractor 所必需的。ractor_cluster 不应被视为生产就绪,但它相对稳定,我们期待您的反馈!

为什么选择 ractor?

除了Rust语言编写的其他actor框架(如Actixriker,或Tokio中的actor)之外,还有一些类似列表的框架被编译在这篇Reddit帖子中。

Ractor试图通过更接近纯Erlang的gen_server进行建模,这意味着每个actor也可以简单地成为一个其他actor的supervisor,而无需额外的成本(只需将它们连接在一起!)此外,我们旨在保持与Erlang模式紧密的逻辑联系,因为它们在实际应用中工作得相当好,并且得到了很好的利用。

此外,我们没有依赖“Runtime”或“System”这样的东西来构建ractor。actor可以独立运行,与其他基本的tokio运行时一起使用,几乎没有额外的开销。

我们目前完全支持以下功能:

  1. 单线程消息处理
  2. actor监督树
  3. rpc模块中对actor进行远程过程调用
  4. time模块中的定时器
  5. 命名actor注册表(registry模块)来自Erlang的Registered processes
  6. 进程组(ractor::pg模块)来自Erlang的pg模块

在我们的路线图上,我们计划添加更多Erlang功能,包括可能的一个分布式actor集群。

性能

ractor中的actor通常相当轻量,并且您可以在自己的主机系统上运行基准测试

cargo bench -p ractor

安装

通过将以下内容添加到Cargo.toml依赖项中安装ractor

[dependencies]
ractor = "0.10"

ractor的最低支持Rust版本(MSRV)是1.64。但是,为了利用特性和特质的本地async fn支持,而不是依赖于async-trait crate的语法糖功能,您需要使用Rust版本>= 1.75。特性async fn在特质中的稳定化最近已经添加

特性

ractor公开以下特性

  1. cluster,它公开了设置和管理通过网络链路设置actor集群所需的各项功能。这是一个正在进行中的项目,并在#16中跟踪。
  2. async-std,它启用了使用async-std的异步运行时而不是tokio运行时的功能。但是,带有sync功能的tokio仍然是一个依赖项,因为我们利用了来自tokio的消息同步原语,无论运行时如何,因为它们并不特定于tokio运行时。这项工作在#173中跟踪。您可以删除默认特性以“最小化”tokio依赖项,仅限于同步原语。

与actor一起工作

ractor中的参与者非常轻量级,可以被视为线程安全的。每个参与者一次只会调用其处理函数中的一个,它们永远不会并行执行。遵循参与者模型可以导致具有良好定义的状态和处理逻辑的微服务。

以下是一个示例 ping-pong 参与者:

use ractor::{cast, Actor, ActorProcessingErr, ActorRef};

/// [PingPong] is a basic actor that will print
/// ping..pong.. repeatedly until some exit
/// condition is met (a counter hits 10). Then
/// it will exit
pub struct PingPong;

/// This is the types of message [PingPong] supports
#[derive(Debug, Clone)]
pub enum Message {
    Ping,
    Pong,
}

impl Message {
    // retrieve the next message in the sequence
    fn next(&self) -> Self {
        match self {
            Self::Ping => Self::Pong,
            Self::Pong => Self::Ping,
        }
    }
    // print out this message
    fn print(&self) {
        match self {
            Self::Ping => print!("ping.."),
            Self::Pong => print!("pong.."),
        }
    }
}

// the implementation of our actor's "logic"
impl Actor for PingPong {
    // An actor has a message type
    type Msg = Message;
    // and (optionally) internal state
    type State = u8;
    // Startup initialization args
    type Arguments = ();

    // Initially we need to create our state, and potentially
    // start some internal processing (by posting a message for
    // example)
    async fn pre_start(
        &self,
        myself: ActorRef<Self::Msg>,
        _: (),
    ) -> Result<Self::State, ActorProcessingErr> {
        // startup the event processing
        cast!(myself, Message::Ping)?;
        // create the initial state
        Ok(0u8)
    }

    // This is our main message handler
    async fn handle(
        &self,
        myself: ActorRef<Self::Msg>,
        message: Self::Msg,
        state: &mut Self::State,
    ) -> Result<(), ActorProcessingErr> {
        if *state < 10u8 {
            message.print();
            cast!(myself, message.next())?;
            *state += 1;
        } else {
            println!();
            myself.stop(None);
            // don't send another message, rather stop the agent after 10 iterations
        }
        Ok(())
    }
}

#[tokio::main]
async fn main() {
    let (_actor, handle) = Actor::spawn(None, PingPong, ())
        .await
        .expect("Failed to start ping-pong actor");
    handle
        .await
        .expect("Ping-pong actor failed to exit properly");
}

这将输出

$ cargo run
ping..pong..ping..pong..ping..pong..ping..pong..ping..pong..
$ 

消息传递参与者

参与者之间通信的方式是它们相互传递消息。开发人员可以定义任何类型的消息,这些类型是 Send + 'static,并且将被 ractor 支持。有 4 种并发消息类型,它们按优先级监听。它们是

  1. 信号:信号是所有信号中优先级最高的,将在任何当前处理的地方中断参与者(这包括终止异步工作)。目前只有一个信号,即 Signal::Kill,它将立即终止所有工作。这包括消息处理或监督事件处理。
  2. 停止:还有一个预定义的停止信号。如果您想,您可以提供“停止原因”,但这不是必需的。停止是一个优雅的退出,意味着当前执行的异步工作将完成,并且在下一次消息处理迭代中,停止将比未来的监督事件或常规消息具有优先级。它将 不会 终止当前执行的工作,无论提供的理由如何。
  3. 监督事件:监督事件是子参与者在其启动、死亡和/或未处理的恐慌事件中发送给其监督者(父参与者)的消息。监督事件是参与者监督者(父参与者)或对等方如何通知其子参与者/对等方的事件,并可以为他们处理生命周期事件。如果您在 Cargo.toml 中设置 panic = 'abort',则恐慌 导致程序终止,并且在监督流程中不会被捕获。
  4. 消息:常规的、用户定义的消息是传递给参与者的最后一条通信渠道。它们是 4 种消息类型中优先级最低的,表示一般的参与者工作。前 3 种消息类型(信号、停止、监督)通常很安静,除非是参与者的生命周期事件,但这个渠道是“工作”渠道,执行参与者想要执行的操作!

分布式集群中的Ractor

Ractor 参与者还可以用于构建分布式参与者池,类似于 Erlang 的 EPMD,它管理节点间连接 + 节点名。在我们的实现中,我们有 ractor_cluster 以便于分布式 ractor 参与者。

ractor_cluster 中有一个主要类型,即 NodeServer,它表示一个 node() 进程的主机。它还包含一些宏和过程宏,以方便开发人员在构建分布式参与者时提高效率。负责的 NodeServer

  1. 管理所有传入和传出的 NodeSession 参与者,这些参与者代表连接到此主机的外部节点。
  2. 管理 TcpListener,它承载服务器套接字以接受传入的会话请求。

然而,节点间连接的大部分逻辑都包含在 NodeSession 中,它管理

  1. 底层 TCP 连接,管理流中的读写操作。
  2. 本节点与对等连接之间的身份验证
  3. 管理远程系统上创建的演员的生命周期。
  4. 在节点之间传输所有演员之间的消息。
  5. 管理PG组同步

等等。

NodeSession 通过创建 RemoteActor 来在远程系统上使本地演员可用,RemoteActor 本质上是未类型化的演员,仅处理序列化消息,将消息反序列化留给原始系统。它还跟踪挂起的RPC请求,以便在回复时匹配请求和响应。在 ractor 中有特殊的扩展点,专门用于支持不一定在标准模式外使用的 RemoteActor

Actor::spawn(Some("name".to_string()), MyActor).await

设计远程支持的演员

注意并非所有演员都是平等的。演员需要支持通过网络链路发送其消息类型。这是通过覆盖所有消息都需要支持的 ractor::Message 特质的特定方法来实现的。由于 Rust 中缺少专业化支持,如果您选择使用 ractor_cluster,您需要为您的包中的所有消息类型派生 ractor::Message 特质。但是为了支持这一点,我们有一些过程宏来使这个过程更加顺畅。

为仅进程内演员派生基本消息特质

许多演员将是仅本地且不需要通过网络链路发送消息。这是最基本的情况,在这种情况下,默认的 ractor::Message 特质实现是合适的。您可以使用以下方法快速派生它。

这将为您实现默认的 ractor::Message 特质,而不必手动编写它。

use ractor_cluster::RactorMessage;
use ractor::RpcReplyPort;

#[derive(RactorMessage)]
enum MyBasicMessageType {
    Cast1(String, u64),
    Call1(u8, i64, RpcReplyPort<Vec<String>>),
}

派生远程演员的网络可序列化消息特质

如果您希望您的演员 支持 远程连接,那么您应该使用不同的派生语句,即

这为实现添加了大量的底层样板代码(您可以使用 cargo expand 查看它)。

use ractor_cluster::RactorClusterMessage;
use ractor::RpcReplyPort;

#[derive(RactorClusterMessage)]
enum MyBasicMessageType {
    Cast1(String, u64),
    #[rpc]
    Call1(u8, i64, RpcReplyPort<Vec<String>>),
}

但简短的回答是,每个枚举变体都需要序列化到一个字节数组参数中,一个变体名称,如果是 RPC,则给出一个接收字节数组的端口,并反序列化回复。参数类型或回复类型内部的每个类型都需要实现 ractor_cluster::BytesConvertable 特质,该特质只是说这个值可以被写入字节数组并从字节数组解码。如果您使用 prost 为您的消息类型定义(protobuf),我们有一个宏可以自动为您实现这一点。

ractor_cluster::derive_serialization_for_prost_type! {MyProtobufType}

除此之外,只需像往常一样编写您的演员。该演员将存在于您定义的位置,并且能够从其他集群接收通过网络链路发送的消息!

演员的 "状态" 与 self 之间的区别

演员可以(但不需要!)具有内部状态。为了促进这一点,ractor 为实现 Actor 特质的实现者提供了定义演员状态类型的能力。演员的 pre_start 程序是初始化和设置该状态的程序。您可以想象做如下事情:

  1. 打开网络套接字 + 将 TcpListener 存储在状态中
  2. 设置数据库连接 + 验证对数据库的身份
  3. 初始化基本状态变量(计数器、统计数据等)

由于这个原因以及一些操作可能失败的可能性,pre_start方法在初始化过程中捕获了panic,并将其返回给Actor::spawn的调用者。

在设计ractor时,我们明确决定为actor创建一个单独的状态类型,而不是传递可变的self引用。这样做的原因是,如果我们使用可变的&mut self引用,Self结构体的创建和实例化将超出actor的规范(即在pre_start之外),从而可能失去其提供的安全性,导致调用者在可能不应该发生的情况下崩溃。

最后,我们需要更改一些ractor当前基于的所有权属性,以便在每个调用中传递一个拥有的self,返回一个Self引用,这在当前上下文中似乎有些笨拙。

在当前实现中,actor的self作为只读引用传递,理想情况下不应包含状态信息,但可以包含配置/启动信息。然而,每个Actor也有Arguments,允许将拥有的值传递给actor的状态。在理想世界中,所有actor结构体都应该是空的,没有任何存储值。

贡献者

ractor的原始作者是Sean Lawlor (@slawlor)。有关为ractor做出贡献的更多信息,请参阅CONTRIBUTING.md

许可协议

本项目的许可协议为MIT

依赖关系

~4–18MB
~206K SLoC