#tcp-server #client-server #tcp-client #serialization #message #exchange #high-level

tcp_message_io

一个简单的TCP服务器和客户端实现,用于交换消息

6个版本 (稳定版)

1.0.4 2023年12月2日
1.0.3 2023年12月1日
1.0.0 2023年11月29日
0.1.0 2023年11月19日

#386编码

每月下载量31次

MIT 许可证

40KB
634

TCP消息I/O

一个简单的tokio TCP客户端/服务器实现。该库包含两个抽象级别

  • 高级接口:允许使用 serde 交换Rust类型。
  • 低级接口:允许交换 Vec<u8> 消息。

本页描述了高级抽象,对于低级抽象,请查看 [raw] 子模块。

目标与非目标

  • 隐藏监听和接受TCP连接,以及将TCP流转换为请求/响应流的复杂性。
  • 将消息序列化和反序列化为Rust类型。
  • 可定制的序列化和压缩。
  • 使用 zstd 的透明压缩。
  • 灵活性:使用便利功能或原始接口。

该库故意将大多数错误处理留给用户。

Cargo features

默认情况下,没有启用功能。可用功能

  • postcard - 启用使用 postcard(一种快速有效的序列化格式)的自动序列化。
  • zstd - 启用使用 zstd 的消息透明压缩。

我们建议同时启用这两个功能,以实现最大程度的简单性。

请注意,客户端和服务器必须使用相同的功能,否则它们将无法理解彼此的消息。

客户端

客户端的使用非常简单

use tcp_message_io::TCPClient;
use tokio;
use serde::{Deserialize, Serialize};

// This type represents the requests to the server.
#[derive(Serialize, Deserialize)]
enum Request {
  Hello,
}

// This type represents the responses from the server.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
enum Response {
  World,
}

#[tokio::main]
async fn main() {
  // We need to specify the response type so that tcp_message_io
  // knows what object to use for response deserialization.
  let client = TCPClient::<_, Response>::connect("127.0.0.1", 12345).await.unwrap();
  let response = client.send(Request::Hello).await.unwrap();
  assert_eq!(response, Some(Response::World));
}

服务器

创建服务器也非常简单

use anyhow::Result;
use tcp_message_io::{TCPServer, TCPResponse};
use tokio;
use serde::{Deserialize, Serialize};

// This type represents the requests to the server.
#[derive(Serialize, Deserialize)]
enum Request {
  Hello,
}

// This type represents the responses from the server.
#[derive(Serialize, Deserialize)]
enum Response {
  World,
}

// We need a request handler: in this case we implement a simple
// "Hello, World" handler.
async fn hello_world_handler(request: Request) -> Result<TCPResponse<Response>> {
  match request {
    Request::Hello => Ok(TCPResponse::Message(Response::World))
    // Handle additional request types here!
  }
}

#[tokio::main]
async fn main() {
    TCPServer::new("127.0.0.1", 12345, hello_world_handler).listen().await;
}

TCPResponse 可以是以下之一

  • TCPResponse::Message(response) 发送响应消息。
  • TCPResponse::CloseConnection 关闭与客户端的连接。这将向客户端发送一个空响应。
  • TCPResponse::StopServer 用于关闭服务器。这也会向客户端发送一个空响应并关闭连接。

该库依赖于 anyhow 来处理错误,使得处理器可以返回任何类型的错误。

如果处理器返回错误,客户端将收到一个空消息,一个 tracing 错误消息将被记录,并且服务器继续监听来自同一客户端的新消息。此机制旨在处理未处理的错误,并避免客户端挂起等待响应。

如果需要,用户负责在转换之上构建错误报告机制。例如,可以通过确保处理器总是返回 Ok(...) 来实现这一点,并将错误作为枚举变体发送回来。

超时后停止TCP服务器

此外,此crate支持在一段时间内无活动后停止服务器(无活动超时)

TCPServer::new("127.0.0.1", 12345, echo_handler)
    .with_inactivity_timeout(60)  // Seconds 
    .listen()
    .await;

此功能在构建类似工作节点的东西时很有用:节点可能由于许多原因而成为孤儿(网络问题、主节点崩溃等)。使用此功能,您可以实现一个清理机制,使工作节点自动关闭。

选择在不良请求的情况下执行的操作

此crate假定客户端和服务器共享请求/响应类型,并使用相同的序列化格式和压缩设置(两者都启用或禁用)。负载的版本由用户负责。

这可能导致客户端使用不同的类型或压缩设置,而服务器无法反序列化请求(不良请求)。默认情况下,此crate向客户端返回一个空消息,并记录一个 tracing 错误。

您可以通过使用 TCPServer::with_bad_request_handler 方法来设置该情况的处理程序来自定义行为。

#[derive(Serialize, Deserialize)]
enum Response {
  World,
  BadRequest,
}

TCPServer::new("127.0.0.1", 12345, echo_handler)
    // This will be called when a bad request happens.
    // In this example we return a BadRequest message to the client.
    .with_bad_request_handler(|| TCPResponse::Message(Response::BadRequest)) 
    .listen()
    .await;

自定义序列化

tcp_message_io 通过使用合理的默认值来尽可能简化启动过程:启用 postcard 功能,任何 SerializeDeserialize 类型都将作为请求或响应工作,启用 zstd 以实现负载的透明压缩。

如果您想自定义序列化方法,可以禁用 postcard 功能,并为您的消息类型实现 SerializeMessage 特性。

#[derive(Serialize, Deserialize)]
enum Response {
  World,
}

impl SerializeMessage for Response {
  fn serialize(&self) -> Result<Vec<u8>> {
    // Implement serialization to bytes
  }
  fn deserialize(message: &[u8]) -> Result<Self> {
    // Implement deserialization from bytes
  }
}

如果您想使用其他压缩方法,禁用 zstd 并在上述的 serializedeserialize 方法中实现您的压缩方法。

线格式

库使用的线格式是消息加上一个内部8字节头部,该头部编码了每个消息的长度。

依赖关系

~3–12MB
~125K SLoC