2 个版本
0.0.2 | 2023 年 7 月 20 日 |
---|---|
0.0.1 | 2023 年 7 月 20 日 |
29 在 #rpc-framework
42KB
817 行
TcpCommunicator
这个包提供了一个 RPC(远程过程调用)框架的实现,旨在用于学习目的。
TcpCommunicator
TcpCommunicator
实现了 Communicator
特性,旨在与 TCP 流一起使用。它对类型 T
和 S
是泛型的,其中 T
必须是 Send
、Serialize
和 DeserializeOwned
,而 S
必须是一个 Stream
的 TcpMessage<T>
项目。
以下是由 TcpCommunicator
提供的关键方法
/// Creates a new TcpCommunicator instance with the given TcpStream.
/// This can be useful when you already have a TcpStream instance and want to create a TcpCommunicator instance from it.
pub fn new_with_stream(stream: TcpStream) -> TcpCommunicator;
/// Connects to a TcpCommunicatorHub at the given address, establishing a TCP connection.
/// It returns a new instance of TcpCommunicator or a CommunicatorError if the connection fails.
pub async fn connect(address: String) -> Result<TcpCommunicator, CommunicatorError>;
/// Serializes the given value into a TcpMessage and sends it over the TCP connection.
/// Similar to `send_message`, it returns a Uuid that uniquely identifies the sent message.
async fn send_data(&mut self, value: T) -> Result<Uuid, CommunicatorError>;
/// Sends the given value as a message and waits for a response message.
/// This method is useful for request-reply scenarios where you want to send a message and wait for a response.
async fn send_and_wait(&mut self, value: T) -> Result<TcpMessage, CommunicatorError>;
/// Sends a response message to a previously received message with the given Uuid.
/// This method is useful when implementing servers that need to respond to incoming messages.
async fn answer_to(&mut self, id: &Uuid, answer: T) -> Result<(), CommunicatorError>;
/// Attempts to retrieve an incoming message.
/// If there is no message, this method returns `None`.
/// It's important to note that this method does not block, so it can return `None` even if there are messages that are about to arrive.
async fn try_take(&mut self) -> Option<TcpMessage>;
/// Returns a stream of incoming messages.
/// This is a convenient way of continuously receiving new messages.
fn incoming(&self) -> TcpCommunicatorStream;
TcpCommunicatorHub
TcpCommunicatorHub 是一个管理多个 TcpCommunicator 客户端实例的枢纽。它对消息类型 T 是泛型的,该类型必须是 Send、Serialize 和 DeserializeOwned。
以下是由 TcpCommunicatorHub 提供的关键方法
/// Starts a TcpCommunicatorHub instance which listens for connections on the provided address.
/// This is usually the first method to be called when setting up a new TcpCommunicatorHub.
pub async fn start(address: String) -> Result<TcpCommunicatorHub, CommunicatorError>;
/// Tries to take a message from any of the connected Communicators.
/// This function will return a tuple consisting of the CommunicatorId and the TcpMessage if there is a message available.
/// If there are no messages available, it will return `None`.
/// This method is non-blocking and may return `None` even if there are messages about to arrive.
pub async fn try_take(&mut self) -> Option<(CommunicatorId, TcpMessage<T>)>;
/// Sends data to a specific Communicator identified by the provided CommunicatorId.
/// The data is serialized and sent as a TcpMessage.
/// This method returns a Uuid which is the unique identifier for the sent message.
pub async fn send_data(&mut self, communicator_id: CommunicatorId, data: T) -> Result<Uuid, CommunicatorError>;
/// Sends data to a specific Communicator and waits for a response message.
/// This method is useful for request-reply scenarios where you want to send a message and wait for a response.
pub async fn send_and_wait(&mut self, communicator_id: CommunicatorId, data: T) -> Result<TcpMessage<T>, CommunicatorError>;
/// Returns a stream of incoming messages from all connected Communicators.
/// This is a convenient way of continuously receiving new messages.
pub fn incoming(&mut self) -> TcpCommunicatorHubStream<T>;
示例
use std::pin::pin;
use futures::{pin_mut, StreamExt};
use serde::{Deserialize, Serialize};
use tokio::join;
use crate::{Communicator, CommunicatorMessage, TcpCommunicator};
use crate::error::CommunicatorError;
use crate::tcp::TcpCommunicatorHub;
use crate::tests::tcp_message;
#[tokio::main]
async fn main() -> Result<(), CommunicatorError> {
let url = "127.0.0.1:550066".to_string();
// setup communicator hub. It will listen for incoming connections
// and will be able to send messages to all connected clients
let hub_url = url.clone();
let hub_task = tokio::spawn(async move {
let mut hub = TcpCommunicatorHub::<MyCommunicatorMessage>::start(hub_url).await.unwrap();
let mut stream = hub.incoming();
while let Some((communicator_id, message)) = stream.next().await {
let value = message.value();
match value {
MyCommunicatorMessage::SetUserName(name) => {
println!("Set username to: {}", name);
},
MyCommunicatorMessage::GetUserAge() => {
let result = 42;
println!("return user age to client: {}", result);
hub.answer_to(communicator_id, message.id(), MyCommunicatorMessage::GetUserAgeResponse(result)).await.unwrap();
},
MyCommunicatorMessage::Exit => {
println!("{}: {}", message.id(), "Exit");
for client in hub.get_client_list().await
{
// inform all clients, that server is going to shutdown
hub.answer_to(client.id(),
message.id(),
MyCommunicatorMessage::GoingToShutdown).await.unwrap();
}
return;
},
_ => { println!("{}: {}", message.id(), "Unknown message"); }
}
}
});
// setup client. It will connect to server, send messages to it and wait for responses
let client_url = url.clone();
let client_task = tokio::spawn(async move {
let mut client = TcpCommunicator::<MyCommunicatorMessage>::connect(client_url).await.unwrap();
// fire message and forgot example
client.send_data(MyCommunicatorMessage::SetUserName("John".to_string())).await.unwrap();
// send message and wait response to this message
let response = client.send_and_wait(MyCommunicatorMessage::GetUserAge()).await.unwrap();
let response_value = response.value();
match response_value {
MyCommunicatorMessage::GetUserAgeResponse(age) => { println!("User age is: {}", age); },
MyCommunicatorMessage::GoingToShutdown => { println!("{}: {}", response.id(), "Server is going to shutdown"); },
_ => { println!("{}: {}", response.id(), "Unknown message"); }
}
client.send_data(MyCommunicatorMessage::Exit).await.unwrap();
});
join!(hub_task, client_task);
return Ok(());
}
#[derive(Serialize, Deserialize)]
enum MyCommunicatorMessage {
SetUserName(String),
GetUserAge(),
GetUserAgeResponse(u32),
Exit,
GoingToShutdown
}
依赖项
~4–11MB
~113K SLoC