1 个不稳定版本

0.1.0 2022年10月6日

#1671解析器实现

自定义许可

120KB
3K SLoC

Rio

受 Orleans 启发的分布式有状态服务

该软件包提供了一种基于对象间消息传递的可扩展、分布式和有状态服务的框架

应用

您的大部分应用程序代码将以 ServiceObjectsMessages 的形式编写

use async_trait::async_trait;
use rio_rs::prelude::*;
use serde::{Deserialize, Serialize};
use std::sync::Arc;

#[derive(TypeName, Message, Deserialize, Serialize)]
pub struct HelloMessage {
    pub name: String
}

#[derive(TypeName, Message, Deserialize, Serialize)]
pub struct HelloResponse {}

#[derive(TypeName, FromId, Default)]
pub struct HelloWorldService {
    pub id: String,
}

#[async_trait]
impl Handler<HelloMessage> for HelloWorldService {
    type Returns = HelloResponse;
    async fn handle(
        &mut self,
        message: HelloMessage,
        app_data: Arc<AppData>,
    ) -> Result<Self::Returns, HandlerError> {
        println!("Hello world");
        Ok(HelloResponse {})
    }
}

运行服务器

要运行您的应用程序,您需要启动服务器,即 Server

use rio_rs::prelude::*;
use rio_rs::cluster::storage::sql::{SqlMembersStorage};
use rio_rs::object_placement::sql::SqlObjectPlacementProvider;

# // Copied from the snippet above
# use async_trait::async_trait;
# use serde::{Deserialize, Serialize};
# use std::sync::Arc;
#
# #[derive(TypeName, Message, Deserialize, Serialize)]
# pub struct HelloMessage {
#     pub name: String
# }
#
# #[derive(TypeName, Message, Deserialize, Serialize)]
# pub struct HelloResponse {}
#
# #[derive(TypeName, FromId, Default)]
# pub struct HelloWorldService {
#     pub id: String,
# }
#
# #[async_trait]
# impl Handler<HelloMessage> for HelloWorldService{
#     type Returns = HelloResponse;
#     async fn handle(
#         &mut self,
#         message: HelloMessage,
#         app_data: Arc<AppData>,
#     ) -> Result<Self::Returns, HandlerError> {
#         println!("Hello world");
#         Ok(HelloResponse {})
#     }
# }

#[tokio::main]
async fn main() {
    let addr = "0.0.0.0:5000";

    // Configure types on the server's registry
    let mut registry = Registry::new();
    registry.add_static_fn::<HelloWorldService, String, _>(FromId::from_id);
    registry.add_handler::<HelloWorldService, HelloMessage>();

    // Configure the Cluster Membership provider
    let pool = SqlMembersStorage::pool()
        .connect("sqlite::memory:")
        .await
        .expect("Membership database connection failure");
    let members_storage = SqlMembersStorage::new(pool);
    members_storage.migrate().await;

    let membership_provider_config = PeerToPeerClusterConfig::default();
    let membership_provider =
        PeerToPeerClusterProvider::new(members_storage, membership_provider_config);

    // Configure the object placement
    let pool = SqlMembersStorage::pool()
        .connect("sqlite::memory:")
        .await
        .expect("Object placement database connection failure");
    let object_placement_provider = SqlObjectPlacementProvider::new(pool);
    object_placement_provider.migrate().await;

    // Create the server object
    let mut server = Server::new(
        addr.to_string(),
        registry,
        membership_provider,
        object_placement_provider,
    );

    // Run the server
    // server.serve().await;
}

客户端

与集群通信只需通过 TCP 发送序列化的已知消息。该 client 模块提供了一种简单的方法来实现这一点

use rio_rs::prelude::*;
use rio_rs::cluster::storage::sql::{SqlMembersStorage};

# // Copied from the snippet above
# use async_trait::async_trait;
# use serde::{Deserialize, Serialize};
# use std::sync::Arc;
#
# #[derive(TypeName, Message, Deserialize, Serialize)]
# pub struct HelloMessage {
#     pub name: String
# }
#
# #[derive(TypeName, Message, Deserialize, Serialize)]
# pub struct HelloResponse {}
#
# #[derive(TypeName, FromId, Default)]
# pub struct HelloWorldService {
#     pub id: String,
# }
#
# #[async_trait]
# impl Handler<HelloMessage> for HelloWorldService {
#     type Returns = HelloResponse;
#     async fn handle(
#         &mut self,
#         message: HelloMessage,
#         app_data: Arc<AppData>,
#     ) -> Result<Self::Returns, HandlerError> {
#         println!("Hello world");
#         Ok(HelloResponse {})
#     }
# }

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Member storage configuration (Rendezvous)
    let pool = SqlMembersStorage::pool()
        .connect("sqlite::memory:")
        .await?;
    let members_storage = SqlMembersStorage::new(pool);
    # members_storage.migrate().await;

    // Create the client
    let mut client = ClientBuilder::new()
        .members_storage(members_storage)
        .build()?;

    let payload = HelloMessage { name: "Client".to_string() };
    let response: HelloResponse = client
        .send(
            "HelloWorldService".to_string(),
            "any-string-id".to_string(),
            &payload,
        ).await?;

    // response is a `HelloResponse {}`
    Ok(())
}

路线图

在 v0.1.0 之前必须完成几件事情

  • 原始服务器/客户端协议
  • 基本集群支持
  • 基本放置支持
  • 对象自关闭
  • 原始对象持久化
  • 公开 API 重命名
  • 减少 Boxed 对象
  • 创建 Server 构建器
  • 强化网络(仅实现 happy path)
    • 为客户端使用 tower
    • 从客户端和服务器服务中移除 unwrap
    • 改进 upsert 性能
    • 为客户端/服务器集成添加更多测试
  • 客户端/服务器保持活跃
  • 减少静态生命周期
  • 增加公共 API 测试覆盖率
  • 公共 API 100% 文档化
  • 发布/订阅
  • 放置策略
  • 容器化示例
  • 监督
  • 临时对象(也称为常规演员)
  • 行为准则
  • 移除魔术数字
  • 对象 TTL
  • 支持服务后台任务

依赖项

~34–66MB
~1M SLoC