#actor #tokio #messaging #remote #message #load-balancer #event-bus

gabriel2

Gabriel2: 确实是一个基于 Tokio 的 Actor 库,用 Rust 编写

10 个稳定版本

1.5.0 2024 年 7 月 12 日
1.4.3 2024 年 6 月 22 日
1.0.5 2024 年 5 月 4 日
1.0.1 2024 年 3 月 19 日
0.0.1 2024 年 3 月 19 日

#96并发 中排名

Download history 295/week @ 2024-04-29 15/week @ 2024-05-06 12/week @ 2024-05-20 516/week @ 2024-06-10 532/week @ 2024-06-17 42/week @ 2024-06-24 4/week @ 2024-07-01 114/week @ 2024-07-08 5/week @ 2024-07-15 78/week @ 2024-07-29

93 每月下载次数

自定义许可证

79KB
1K SLoC

Gabriel2

Gabriel2

Gabriel2: 确实是一个基于 Tokio 的 Actor 库,用 Rust 编写

功能

  • 异步发送消息
  • Actor 中异步处理消息
  • 支持发送和忽略消息
  • 支持发送并等待响应的消息
  • Actor 的可变状态
  • Actor 从上下文中的自引用
  • Actor 生命周期(pre_start, pre_stop)
  • 向 Actor 发送
  • 从 Actor 接收流
  • 远程 Actor
  • 事件总线
  • 负载均衡器

用法

Cargo.toml

[dependencies]
gabriel2 = { version = "1.5.0", features = ["remote", "sink-stream", "broadcast", "balancer"] }

echo.rs

use std::sync::Arc;
use gabriel2::*;

use bincode::{Decode, Encode};
use derive_more::{Display, Error};


#[derive(Debug)]
pub struct EchoActor;

#[derive(Debug)]
pub enum EchoMessage {
    Ping,
}

#[derive(Debug)]
pub enum EchoResponse {
    Pong {counter: u32},
}

#[derive(Debug,Clone)]
pub struct EchoState {
    pub counter: u32,
}

#[derive(Debug, Display, Error)]
pub enum EchoError {
    #[display(fmt = "Unknown error")]
    Unknown,
}

impl From<std::io::Error> for EchoError {
    fn from(_err: std::io::Error) -> Self {
        EchoError::Unknown
    }
}

impl Handler for EchoActor {
    type Actor = EchoActor;
    type Message = EchoMessage;
    type State = EchoState;
    type Response = EchoResponse;
    type Error = EchoError;

    async fn receive(&self, ctx: Arc<Context<Self::Actor, Self::Message, Self::State, Self::Response, Self::Error>>) -> Result<EchoResponse, EchoError> {
        match ctx.mgs {
            EchoMessage::Ping => {
                println!("Received Ping");
                let mut state_lock = ctx.state.lock().await;
                state_lock.counter += 1;
                if state_lock.counter > 10 {
                    Err(EchoError::Unknown)
                } else {
                    Ok(EchoResponse::Pong{counter: state_lock.counter})
                }
            }
        }
    }
}

main.rs

#[tokio::main]
async fn main() -> Result<(), EchoError> {
    let state = EchoState {
        counter: 0,
    };

    let echo_ref = ActorRef::new("echo", EchoActor {}, state, 100000).await?;

    println!("Sent Ping");
    echo_ref.send(EchoMessage::Ping).await?;

    println!("Sent Ping and ask response");
    let pong = echo_ref.ask(EchoMessage::Ping).await?;
    println!("Got {:?}", pong);

    _ = echo_ref.stop().await;
    Ok(())
}

示例输出

Sent Ping
Sent Ping and ask response
Received Ping
Received Ping
Got Pong { counter: 2 }

示例代码: https://github.com/igumnoff/gabriel2/tree/main/test

下泄

#[tokio::main]
async fn main() -> Result<(), EchoError> {
    let state = EchoState {
        counter: 0,
    };

    let echo_ref = ActorRef::new("echo", crate::echo::EchoActor {}, state, 100000).await?;
    let echo_sink = ActorSink::sink(echo_ref.clone());
    let message_stream = futures::stream::iter(vec![EchoMessage::Ping, EchoMessage::Ping, EchoMessage::Ping]).map(Ok);
    _ = message_stream.forward(echo_sink).await;
    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
     Ok(())
}

示例输出

Received Ping
Received Ping
Received Ping

#[tokio::main]
async fn main() -> Result<(), EchoError> {
    let state = EchoState {
        counter: 0,
    };

    let echo_ref = ActorRef::new("echo", crate::echo::EchoActor {}, state, 100000).await?;
    let (echo_sink, echo_stream) = ActorSink::sink_stream(echo_ref.clone());
    let message_stream = futures::stream::iter(vec![EchoMessage::Ping, EchoMessage::Ping, EchoMessage::Ping]).map(Ok);
    _ = message_stream.forward(echo_sink).await;
    echo_stream.for_each(|message| async move {
        println!("Got {:?}", message.unwrap());
    }).await;
    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

    Ok(())
}

示例输出

Received Ping
Received Ping
Received Ping
Got Pong { counter: 1 }
Got Pong { counter: 2 }
Got Pong { counter: 3 }

远程

远程准备

为 EchoActor, EchoMessage, EchoResponse, EchoState 和 EchoError 从 "bincode" 添加 Encode, Decode 到 derive(..)

远程版本

#[tokio::main]
async fn main() -> Result<(), EchoError> {
    let state = EchoState {
        counter: 0,
    };

    let echo_ref = ActorRef::new("echo", crate::echo::EchoActor {}, state, 100000).await?;
    let echo_server = ActorServer::new("echo_server", "127.0.0.1", 9001, echo_ref).await?;
    let echo_client: Arc<ActorClient<EchoActor, EchoMessage, EchoState, EchoResponse, EchoError >> = ActorClient::new("echo_client", "127.0.0.1", 9001).await?;

    println!("Sent Ping");
    echo_client.send(EchoMessage::Ping).await?;

    println!("Sent Ping and ask response");
    let pong = echo_client.ask(EchoMessage::Ping).await?;
    println!("Got {:?}", pong);

    _ = echo_client.stop().await;
    _ = echo_server.stop().await;
    Ok(())

}

事件总线

#[tokio::main]
async fn main() -> Result<(), EchoError> {
    let state = EchoState {
        counter: 0,
    };

    #[derive(Debug, Copy, Clone)]
    enum EventElement {
        Fire,
        Water
    }

    let echo_ref = ActorRef::new("echo", crate::echo::EchoActor {}, state, 100000).await?;

    let event_bus: Arc<EventBus<EventElement>> = Arc::new(EventBus::new());

    let subscriber_id = event_bus.subscribe(move |event: EventElement| {
        async move {
            match event {
                EventElement::Fire => {
                    let _ = echo_ref.send(EchoMessage::Ping).await;
                    ()
                },
                _ => ()
            }
        }}).await;

    event_bus.publish(EventElement::Fire).await;

    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

    event_bus.unsubscribe(subscriber_id).await;

    Ok(())
}

负载均衡器

#[tokio::main]
async fn main() -> Result<(), EchoError> {
    let echo_load_balancer: Arc<LoadBalancer<EchoActor, EchoMessage, EchoState, EchoResponse, EchoError>> =
        LoadBalancer::new("echo_load_balancer", 10, |id: usize| {
            Box::pin(async move {
                let user: Arc<
                    ActorRef<EchoActor, EchoMessage, EchoState, EchoResponse, EchoError>,
                > = ActorRef::new(
                    format!("echo-{}", id),
                    EchoActor {},
                    EchoState { counter: 0 },
                    10000,
                )
                    .await?;
                Ok(user)
            })
        })
            .await
            .unwrap();

    for _ in 0..30 {
        echo_load_balancer.send(EchoMessage::Ping).await?;
    }

    Ok(())
}

贡献

我非常希望看到社区的贡献。如果你遇到错误,请随时打开一个问题。如果你想要实现一个新功能或错误修复,请按照以下步骤操作

  1. 阅读 "贡献者许可协议 (CLA)"
  2. 通过 telegram @ievkz 或 discord @igumnovnsk 联系我
  3. 在仓库中确认电子邮件邀请
  4. 执行 "git clone"(你不需要分支!)
  5. 创建带有分配给你的问题的分支
  6. 向主分支创建拉取请求

依赖项

~1.1–9.5MB
~94K SLoC