10 个稳定版本
1.5.0 | 2024 年 7 月 12 日 |
---|---|
1.4.3 | 2024 年 6 月 22 日 |
1.0.5 | 2024 年 5 月 4 日 |
1.0.1 |
|
0.0.1 |
|
#96 在 并发 中排名
93 每月下载次数
79KB
1K SLoC
Gabriel2
Gabriel2: 确实是一个基于 Tokio 的 Actor 库,用 Rust 编写
功能
- 异步发送消息
- Actor 中异步处理消息
- 支持发送和忽略消息
- 支持发送并等待响应的消息
- Actor 的可变状态
- Actor 从上下文中的自引用
- Actor 生命周期(pre_start, pre_stop)
- 向 Actor 发送
- 从 Actor 接收流
- 远程 Actor
- 事件总线
- 负载均衡器
用法
Cargo.toml
[dependencies]
gabriel2 = { version = "1.5.0", features = ["remote", "sink-stream", "broadcast", "balancer"] }
echo.rs
use std::sync::Arc;
use gabriel2::*;
use bincode::{Decode, Encode};
use derive_more::{Display, Error};
#[derive(Debug)]
pub struct EchoActor;
#[derive(Debug)]
pub enum EchoMessage {
Ping,
}
#[derive(Debug)]
pub enum EchoResponse {
Pong {counter: u32},
}
#[derive(Debug,Clone)]
pub struct EchoState {
pub counter: u32,
}
#[derive(Debug, Display, Error)]
pub enum EchoError {
#[display(fmt = "Unknown error")]
Unknown,
}
impl From<std::io::Error> for EchoError {
fn from(_err: std::io::Error) -> Self {
EchoError::Unknown
}
}
impl Handler for EchoActor {
type Actor = EchoActor;
type Message = EchoMessage;
type State = EchoState;
type Response = EchoResponse;
type Error = EchoError;
async fn receive(&self, ctx: Arc<Context<Self::Actor, Self::Message, Self::State, Self::Response, Self::Error>>) -> Result<EchoResponse, EchoError> {
match ctx.mgs {
EchoMessage::Ping => {
println!("Received Ping");
let mut state_lock = ctx.state.lock().await;
state_lock.counter += 1;
if state_lock.counter > 10 {
Err(EchoError::Unknown)
} else {
Ok(EchoResponse::Pong{counter: state_lock.counter})
}
}
}
}
}
main.rs
#[tokio::main]
async fn main() -> Result<(), EchoError> {
let state = EchoState {
counter: 0,
};
let echo_ref = ActorRef::new("echo", EchoActor {}, state, 100000).await?;
println!("Sent Ping");
echo_ref.send(EchoMessage::Ping).await?;
println!("Sent Ping and ask response");
let pong = echo_ref.ask(EchoMessage::Ping).await?;
println!("Got {:?}", pong);
_ = echo_ref.stop().await;
Ok(())
}
示例输出
Sent Ping
Sent Ping and ask response
Received Ping
Received Ping
Got Pong { counter: 2 }
示例代码: https://github.com/igumnoff/gabriel2/tree/main/test
下泄
#[tokio::main]
async fn main() -> Result<(), EchoError> {
let state = EchoState {
counter: 0,
};
let echo_ref = ActorRef::new("echo", crate::echo::EchoActor {}, state, 100000).await?;
let echo_sink = ActorSink::sink(echo_ref.clone());
let message_stream = futures::stream::iter(vec![EchoMessage::Ping, EchoMessage::Ping, EchoMessage::Ping]).map(Ok);
_ = message_stream.forward(echo_sink).await;
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
Ok(())
}
示例输出
Received Ping
Received Ping
Received Ping
流
#[tokio::main]
async fn main() -> Result<(), EchoError> {
let state = EchoState {
counter: 0,
};
let echo_ref = ActorRef::new("echo", crate::echo::EchoActor {}, state, 100000).await?;
let (echo_sink, echo_stream) = ActorSink::sink_stream(echo_ref.clone());
let message_stream = futures::stream::iter(vec![EchoMessage::Ping, EchoMessage::Ping, EchoMessage::Ping]).map(Ok);
_ = message_stream.forward(echo_sink).await;
echo_stream.for_each(|message| async move {
println!("Got {:?}", message.unwrap());
}).await;
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
Ok(())
}
示例输出
Received Ping
Received Ping
Received Ping
Got Pong { counter: 1 }
Got Pong { counter: 2 }
Got Pong { counter: 3 }
远程
远程准备
为 EchoActor, EchoMessage, EchoResponse, EchoState 和 EchoError 从 "bincode" 添加 Encode, Decode 到 derive(..)
远程版本
#[tokio::main]
async fn main() -> Result<(), EchoError> {
let state = EchoState {
counter: 0,
};
let echo_ref = ActorRef::new("echo", crate::echo::EchoActor {}, state, 100000).await?;
let echo_server = ActorServer::new("echo_server", "127.0.0.1", 9001, echo_ref).await?;
let echo_client: Arc<ActorClient<EchoActor, EchoMessage, EchoState, EchoResponse, EchoError >> = ActorClient::new("echo_client", "127.0.0.1", 9001).await?;
println!("Sent Ping");
echo_client.send(EchoMessage::Ping).await?;
println!("Sent Ping and ask response");
let pong = echo_client.ask(EchoMessage::Ping).await?;
println!("Got {:?}", pong);
_ = echo_client.stop().await;
_ = echo_server.stop().await;
Ok(())
}
事件总线
#[tokio::main]
async fn main() -> Result<(), EchoError> {
let state = EchoState {
counter: 0,
};
#[derive(Debug, Copy, Clone)]
enum EventElement {
Fire,
Water
}
let echo_ref = ActorRef::new("echo", crate::echo::EchoActor {}, state, 100000).await?;
let event_bus: Arc<EventBus<EventElement>> = Arc::new(EventBus::new());
let subscriber_id = event_bus.subscribe(move |event: EventElement| {
async move {
match event {
EventElement::Fire => {
let _ = echo_ref.send(EchoMessage::Ping).await;
()
},
_ => ()
}
}}).await;
event_bus.publish(EventElement::Fire).await;
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
event_bus.unsubscribe(subscriber_id).await;
Ok(())
}
负载均衡器
#[tokio::main]
async fn main() -> Result<(), EchoError> {
let echo_load_balancer: Arc<LoadBalancer<EchoActor, EchoMessage, EchoState, EchoResponse, EchoError>> =
LoadBalancer::new("echo_load_balancer", 10, |id: usize| {
Box::pin(async move {
let user: Arc<
ActorRef<EchoActor, EchoMessage, EchoState, EchoResponse, EchoError>,
> = ActorRef::new(
format!("echo-{}", id),
EchoActor {},
EchoState { counter: 0 },
10000,
)
.await?;
Ok(user)
})
})
.await
.unwrap();
for _ in 0..30 {
echo_load_balancer.send(EchoMessage::Ping).await?;
}
Ok(())
}
贡献
我非常希望看到社区的贡献。如果你遇到错误,请随时打开一个问题。如果你想要实现一个新功能或错误修复,请按照以下步骤操作
- 阅读 "贡献者许可协议 (CLA)"
- 通过 telegram @ievkz 或 discord @igumnovnsk 联系我
- 在仓库中确认电子邮件邀请
- 执行 "git clone"(你不需要分支!)
- 创建带有分配给你的问题的分支
- 向主分支创建拉取请求
依赖项
~1.1–9.5MB
~94K SLoC