6个版本 (稳定版)
1.0.4 | 2023年12月2日 |
---|---|
1.0.3 | 2023年12月1日 |
1.0.0 | 2023年11月29日 |
0.1.0 | 2023年11月19日 |
#386 在 编码
每月下载量31次
40KB
634 行
TCP消息I/O
一个简单的tokio
TCP客户端/服务器实现。该库包含两个抽象级别
- 高级接口:允许使用
serde
交换Rust类型。 - 低级接口:允许交换
Vec<u8>
消息。
本页描述了高级抽象,对于低级抽象,请查看 [raw
] 子模块。
目标与非目标
- 隐藏监听和接受TCP连接,以及将TCP流转换为请求/响应流的复杂性。
- 将消息序列化和反序列化为Rust类型。
- 可定制的序列化和压缩。
- 使用
zstd
的透明压缩。 - 灵活性:使用便利功能或原始接口。
该库故意将大多数错误处理留给用户。
Cargo features
默认情况下,没有启用功能。可用功能
我们建议同时启用这两个功能,以实现最大程度的简单性。
请注意,客户端和服务器必须使用相同的功能,否则它们将无法理解彼此的消息。
客户端
客户端的使用非常简单
use tcp_message_io::TCPClient;
use tokio;
use serde::{Deserialize, Serialize};
// This type represents the requests to the server.
#[derive(Serialize, Deserialize)]
enum Request {
Hello,
}
// This type represents the responses from the server.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
enum Response {
World,
}
#[tokio::main]
async fn main() {
// We need to specify the response type so that tcp_message_io
// knows what object to use for response deserialization.
let client = TCPClient::<_, Response>::connect("127.0.0.1", 12345).await.unwrap();
let response = client.send(Request::Hello).await.unwrap();
assert_eq!(response, Some(Response::World));
}
服务器
创建服务器也非常简单
use anyhow::Result;
use tcp_message_io::{TCPServer, TCPResponse};
use tokio;
use serde::{Deserialize, Serialize};
// This type represents the requests to the server.
#[derive(Serialize, Deserialize)]
enum Request {
Hello,
}
// This type represents the responses from the server.
#[derive(Serialize, Deserialize)]
enum Response {
World,
}
// We need a request handler: in this case we implement a simple
// "Hello, World" handler.
async fn hello_world_handler(request: Request) -> Result<TCPResponse<Response>> {
match request {
Request::Hello => Ok(TCPResponse::Message(Response::World))
// Handle additional request types here!
}
}
#[tokio::main]
async fn main() {
TCPServer::new("127.0.0.1", 12345, hello_world_handler).listen().await;
}
TCPResponse
可以是以下之一
TCPResponse::Message(response)
发送响应消息。TCPResponse::CloseConnection
关闭与客户端的连接。这将向客户端发送一个空响应。TCPResponse::StopServer
用于关闭服务器。这也会向客户端发送一个空响应并关闭连接。
该库依赖于 anyhow
来处理错误,使得处理器可以返回任何类型的错误。
如果处理器返回错误,客户端将收到一个空消息,一个 tracing
错误消息将被记录,并且服务器继续监听来自同一客户端的新消息。此机制旨在处理未处理的错误,并避免客户端挂起等待响应。
如果需要,用户负责在转换之上构建错误报告机制。例如,可以通过确保处理器总是返回 Ok(...)
来实现这一点,并将错误作为枚举变体发送回来。
超时后停止TCP服务器
此外,此crate支持在一段时间内无活动后停止服务器(无活动超时)
TCPServer::new("127.0.0.1", 12345, echo_handler)
.with_inactivity_timeout(60) // Seconds
.listen()
.await;
此功能在构建类似工作节点的东西时很有用:节点可能由于许多原因而成为孤儿(网络问题、主节点崩溃等)。使用此功能,您可以实现一个清理机制,使工作节点自动关闭。
选择在不良请求的情况下执行的操作
此crate假定客户端和服务器共享请求/响应类型,并使用相同的序列化格式和压缩设置(两者都启用或禁用)。负载的版本由用户负责。
这可能导致客户端使用不同的类型或压缩设置,而服务器无法反序列化请求(不良请求)。默认情况下,此crate向客户端返回一个空消息,并记录一个 tracing
错误。
您可以通过使用 TCPServer::with_bad_request_handler
方法来设置该情况的处理程序来自定义行为。
#[derive(Serialize, Deserialize)]
enum Response {
World,
BadRequest,
}
TCPServer::new("127.0.0.1", 12345, echo_handler)
// This will be called when a bad request happens.
// In this example we return a BadRequest message to the client.
.with_bad_request_handler(|| TCPResponse::Message(Response::BadRequest))
.listen()
.await;
自定义序列化
tcp_message_io
通过使用合理的默认值来尽可能简化启动过程:启用 postcard
功能,任何 Serialize
和 Deserialize
类型都将作为请求或响应工作,启用 zstd
以实现负载的透明压缩。
如果您想自定义序列化方法,可以禁用 postcard
功能,并为您的消息类型实现 SerializeMessage
特性。
#[derive(Serialize, Deserialize)]
enum Response {
World,
}
impl SerializeMessage for Response {
fn serialize(&self) -> Result<Vec<u8>> {
// Implement serialization to bytes
}
fn deserialize(message: &[u8]) -> Result<Self> {
// Implement deserialization from bytes
}
}
如果您想使用其他压缩方法,禁用 zstd
并在上述的 serialize
和 deserialize
方法中实现您的压缩方法。
线格式
库使用的线格式是消息加上一个内部8字节头部,该头部编码了每个消息的长度。
依赖关系
~3–12MB
~125K SLoC