#tcp #rpc #request-response #rpc-framework #send-message #async #communication

bin+lib sshu-communicator

这只是另一个 RPC 通信框架。旨在实现 ws 和 tcp 协议上的异步代码的请求/响应模式。这是第一个实现,仅用于学习目的。

2 个版本

0.0.2 2023 年 7 月 20 日
0.0.1 2023 年 7 月 20 日

29#rpc-framework

MIT 许可证

42KB
817

TcpCommunicator

这个包提供了一个 RPC(远程过程调用)框架的实现,旨在用于学习目的。

TcpCommunicator

TcpCommunicator 实现了 Communicator 特性,旨在与 TCP 流一起使用。它对类型 TS 是泛型的,其中 T 必须是 SendSerializeDeserializeOwned,而 S 必须是一个 StreamTcpMessage<T> 项目。

以下是由 TcpCommunicator 提供的关键方法

/// Creates a new TcpCommunicator instance with the given TcpStream.
/// This can be useful when you already have a TcpStream instance and want to create a TcpCommunicator instance from it.
pub fn new_with_stream(stream: TcpStream) -> TcpCommunicator;

/// Connects to a TcpCommunicatorHub at the given address, establishing a TCP connection.
/// It returns a new instance of TcpCommunicator or a CommunicatorError if the connection fails.
pub async fn connect(address: String) -> Result<TcpCommunicator, CommunicatorError>;

/// Serializes the given value into a TcpMessage and sends it over the TCP connection.
/// Similar to `send_message`, it returns a Uuid that uniquely identifies the sent message.
async fn send_data(&mut self, value: T) -> Result<Uuid, CommunicatorError>;

/// Sends the given value as a message and waits for a response message.
/// This method is useful for request-reply scenarios where you want to send a message and wait for a response.
async fn send_and_wait(&mut self, value: T) -> Result<TcpMessage, CommunicatorError>;

/// Sends a response message to a previously received message with the given Uuid.
/// This method is useful when implementing servers that need to respond to incoming messages.
async fn answer_to(&mut self, id: &Uuid, answer: T) -> Result<(), CommunicatorError>;

/// Attempts to retrieve an incoming message.
/// If there is no message, this method returns `None`.
/// It's important to note that this method does not block, so it can return `None` even if there are messages that are about to arrive.
async fn try_take(&mut self) -> Option<TcpMessage>;

/// Returns a stream of incoming messages.
/// This is a convenient way of continuously receiving new messages.
fn incoming(&self) -> TcpCommunicatorStream;

TcpCommunicatorHub

TcpCommunicatorHub 是一个管理多个 TcpCommunicator 客户端实例的枢纽。它对消息类型 T 是泛型的,该类型必须是 Send、Serialize 和 DeserializeOwned。

以下是由 TcpCommunicatorHub 提供的关键方法

/// Starts a TcpCommunicatorHub instance which listens for connections on the provided address.
/// This is usually the first method to be called when setting up a new TcpCommunicatorHub.
pub async fn start(address: String) -> Result<TcpCommunicatorHub, CommunicatorError>;

/// Tries to take a message from any of the connected Communicators.
/// This function will return a tuple consisting of the CommunicatorId and the TcpMessage if there is a message available.
/// If there are no messages available, it will return `None`. 
/// This method is non-blocking and may return `None` even if there are messages about to arrive.
pub async fn try_take(&mut self) -> Option<(CommunicatorId, TcpMessage<T>)>;

/// Sends data to a specific Communicator identified by the provided CommunicatorId.
/// The data is serialized and sent as a TcpMessage.
/// This method returns a Uuid which is the unique identifier for the sent message.
pub async fn send_data(&mut self, communicator_id: CommunicatorId, data: T) -> Result<Uuid, CommunicatorError>;

/// Sends data to a specific Communicator and waits for a response message.
/// This method is useful for request-reply scenarios where you want to send a message and wait for a response.
pub async fn send_and_wait(&mut self, communicator_id: CommunicatorId, data: T) -> Result<TcpMessage<T>, CommunicatorError>;

/// Returns a stream of incoming messages from all connected Communicators.
/// This is a convenient way of continuously receiving new messages.
pub fn incoming(&mut self) -> TcpCommunicatorHubStream<T>;

示例

use std::pin::pin;
use futures::{pin_mut, StreamExt};
use serde::{Deserialize, Serialize};
use tokio::join;
use crate::{Communicator, CommunicatorMessage, TcpCommunicator};
use crate::error::CommunicatorError;
use crate::tcp::TcpCommunicatorHub;
use crate::tests::tcp_message;

#[tokio::main]
async fn main() -> Result<(), CommunicatorError> {
    let url = "127.0.0.1:550066".to_string();
    
    // setup communicator hub. It will listen for incoming connections
    // and will be able to send messages to all connected clients
    let hub_url = url.clone();
    let hub_task = tokio::spawn(async move {
        let mut hub = TcpCommunicatorHub::<MyCommunicatorMessage>::start(hub_url).await.unwrap();
        let mut stream = hub.incoming();

        while let Some((communicator_id, message)) = stream.next().await {
            let value = message.value();
            match value {
                MyCommunicatorMessage::SetUserName(name) => {
                    println!("Set username to: {}", name);
                },
                MyCommunicatorMessage::GetUserAge() => {
                    let result = 42; 
                    println!("return user age to client: {}", result);
                    hub.answer_to(communicator_id,  message.id(), MyCommunicatorMessage::GetUserAgeResponse(result)).await.unwrap();
                },
                MyCommunicatorMessage::Exit => {
                    println!("{}: {}", message.id(), "Exit");
                    for client in hub.get_client_list().await 
                    {
                        // inform all clients, that server is going to shutdown
                        hub.answer_to(client.id(),  
                                      message.id(), 
                                      MyCommunicatorMessage::GoingToShutdown).await.unwrap();
                    }
                    return;
                },
                _ => { println!("{}: {}", message.id(), "Unknown message"); }
            }
        }
    });
    
    // setup client. It will connect to server, send messages to it and wait for responses
    let client_url = url.clone();
    let client_task = tokio::spawn(async move {
        let mut client = TcpCommunicator::<MyCommunicatorMessage>::connect(client_url).await.unwrap();
        
        // fire message and forgot example
        client.send_data(MyCommunicatorMessage::SetUserName("John".to_string())).await.unwrap();
        
        
        // send message and wait response to this message
        let response = client.send_and_wait(MyCommunicatorMessage::GetUserAge()).await.unwrap();
        let response_value = response.value();
        
        match response_value {
            MyCommunicatorMessage::GetUserAgeResponse(age) => { println!("User age is: {}", age); },
            MyCommunicatorMessage::GoingToShutdown => { println!("{}: {}", response.id(), "Server is going to shutdown"); },
            _ => { println!("{}: {}", response.id(), "Unknown message"); }
        }
        client.send_data(MyCommunicatorMessage::Exit).await.unwrap();
    });

    join!(hub_task, client_task);
    return Ok(());
}

#[derive(Serialize, Deserialize)]
enum MyCommunicatorMessage {
    SetUserName(String),
    GetUserAge(),
    GetUserAgeResponse(u32),
    Exit,
    GoingToShutdown
}

依赖项

~4–11MB
~113K SLoC