1 个不稳定版本
0.1.0 | 2022年10月6日 |
---|
#1671 在 解析器实现
120KB
3K SLoC
Rio
受 Orleans 启发的分布式有状态服务
该软件包提供了一种基于对象间消息传递的可扩展、分布式和有状态服务的框架
应用
您的大部分应用程序代码将以 ServiceObjects
和 Messages
的形式编写
use async_trait::async_trait;
use rio_rs::prelude::*;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
#[derive(TypeName, Message, Deserialize, Serialize)]
pub struct HelloMessage {
pub name: String
}
#[derive(TypeName, Message, Deserialize, Serialize)]
pub struct HelloResponse {}
#[derive(TypeName, FromId, Default)]
pub struct HelloWorldService {
pub id: String,
}
#[async_trait]
impl Handler<HelloMessage> for HelloWorldService {
type Returns = HelloResponse;
async fn handle(
&mut self,
message: HelloMessage,
app_data: Arc<AppData>,
) -> Result<Self::Returns, HandlerError> {
println!("Hello world");
Ok(HelloResponse {})
}
}
运行服务器
要运行您的应用程序,您需要启动服务器,即 Server
use rio_rs::prelude::*;
use rio_rs::cluster::storage::sql::{SqlMembersStorage};
use rio_rs::object_placement::sql::SqlObjectPlacementProvider;
# // Copied from the snippet above
# use async_trait::async_trait;
# use serde::{Deserialize, Serialize};
# use std::sync::Arc;
#
# #[derive(TypeName, Message, Deserialize, Serialize)]
# pub struct HelloMessage {
# pub name: String
# }
#
# #[derive(TypeName, Message, Deserialize, Serialize)]
# pub struct HelloResponse {}
#
# #[derive(TypeName, FromId, Default)]
# pub struct HelloWorldService {
# pub id: String,
# }
#
# #[async_trait]
# impl Handler<HelloMessage> for HelloWorldService{
# type Returns = HelloResponse;
# async fn handle(
# &mut self,
# message: HelloMessage,
# app_data: Arc<AppData>,
# ) -> Result<Self::Returns, HandlerError> {
# println!("Hello world");
# Ok(HelloResponse {})
# }
# }
#[tokio::main]
async fn main() {
let addr = "0.0.0.0:5000";
// Configure types on the server's registry
let mut registry = Registry::new();
registry.add_static_fn::<HelloWorldService, String, _>(FromId::from_id);
registry.add_handler::<HelloWorldService, HelloMessage>();
// Configure the Cluster Membership provider
let pool = SqlMembersStorage::pool()
.connect("sqlite::memory:")
.await
.expect("Membership database connection failure");
let members_storage = SqlMembersStorage::new(pool);
members_storage.migrate().await;
let membership_provider_config = PeerToPeerClusterConfig::default();
let membership_provider =
PeerToPeerClusterProvider::new(members_storage, membership_provider_config);
// Configure the object placement
let pool = SqlMembersStorage::pool()
.connect("sqlite::memory:")
.await
.expect("Object placement database connection failure");
let object_placement_provider = SqlObjectPlacementProvider::new(pool);
object_placement_provider.migrate().await;
// Create the server object
let mut server = Server::new(
addr.to_string(),
registry,
membership_provider,
object_placement_provider,
);
// Run the server
// server.serve().await;
}
客户端
与集群通信只需通过 TCP 发送序列化的已知消息。该 client
模块提供了一种简单的方法来实现这一点
use rio_rs::prelude::*;
use rio_rs::cluster::storage::sql::{SqlMembersStorage};
# // Copied from the snippet above
# use async_trait::async_trait;
# use serde::{Deserialize, Serialize};
# use std::sync::Arc;
#
# #[derive(TypeName, Message, Deserialize, Serialize)]
# pub struct HelloMessage {
# pub name: String
# }
#
# #[derive(TypeName, Message, Deserialize, Serialize)]
# pub struct HelloResponse {}
#
# #[derive(TypeName, FromId, Default)]
# pub struct HelloWorldService {
# pub id: String,
# }
#
# #[async_trait]
# impl Handler<HelloMessage> for HelloWorldService {
# type Returns = HelloResponse;
# async fn handle(
# &mut self,
# message: HelloMessage,
# app_data: Arc<AppData>,
# ) -> Result<Self::Returns, HandlerError> {
# println!("Hello world");
# Ok(HelloResponse {})
# }
# }
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Member storage configuration (Rendezvous)
let pool = SqlMembersStorage::pool()
.connect("sqlite::memory:")
.await?;
let members_storage = SqlMembersStorage::new(pool);
# members_storage.migrate().await;
// Create the client
let mut client = ClientBuilder::new()
.members_storage(members_storage)
.build()?;
let payload = HelloMessage { name: "Client".to_string() };
let response: HelloResponse = client
.send(
"HelloWorldService".to_string(),
"any-string-id".to_string(),
&payload,
).await?;
// response is a `HelloResponse {}`
Ok(())
}
路线图
在 v0.1.0 之前必须完成几件事情
- 原始服务器/客户端协议
- 基本集群支持
- 基本放置支持
- 对象自关闭
- 原始对象持久化
- 公开 API 重命名
- 减少 Boxed 对象
- 创建 Server 构建器
- 强化网络(仅实现 happy path)
- 为客户端使用 tower
- 从客户端和服务器服务中移除 unwrap
- 改进
upsert
性能 - 为客户端/服务器集成添加更多测试
- 客户端/服务器保持活跃
- 减少静态生命周期
- 增加公共 API 测试覆盖率
- 公共 API 100% 文档化
- 发布/订阅
- 放置策略
- 容器化示例
- 监督
- 临时对象(也称为常规演员)
- 行为准则
- 移除魔术数字
- 对象 TTL
- 支持服务后台任务
依赖项
~34–66MB
~1M SLoC