#mio #serialization #message #tcp-connection #networking #data-structures #send-receive

middleman

为 mio::TcpStream 提供一个包装器,以发送和接收任意大小的可序列化 Structs(阻塞或非阻塞)

5 个不稳定版本

使用旧的 Rust 2015

0.3.1 2018 年 5 月 5 日
0.3.0 2018 年 4 月 29 日
0.2.1 2018 年 4 月 23 日
0.2.0 2018 年 4 月 22 日
0.1.0 2018 年 4 月 19 日

#26 in #send-receive

自定义许可证

47KB
826

Middleman

Middleman 是一个库,用于通过 TCP 连接发送和接收可序列化的数据结构,抽象出原始字节。该项目从较旧的库 wire 中汲取灵感,但旨在与 mio 轮询系统良好协作。

 struct M: Message 
    ↑       ╷
    ┆       ┆

    Middleman (⌐■_)
       TCP
    ~~~~~~~~~
    ▲       ▼    
    ▲ bytes ▼    
    ▲       ▼
    ~~~~~~~~~    
       TCP
    Middleman (⌐■_)

    ┆       ┆
    ╵       ↓
 struct M: Message 

使用它

如果您想构建某种基于 Tcp 的网络程序,您需要做一些事情。其中许多与 mio 相同,但让我们从某个地方开始。在我们的例子中,我将考虑设置单个客户端-服务器连接作为基准。

在我们开始之前,这是您在通信链路两端 概念上 想要的核心内容

  • 一个暴露非阻塞函数 send(&T)recv() -> Option<T>Middleman,其中 T 是您希望通过网络发送的消息结构(s)的类型。很简单。

旧版本的 middleman 就在这里停止。这始终存在一个问题:何时调用 recv?简单的解决方案是不断调用它。幸运的是,mio 正好可以帮助解决这个问题。它依赖于轮询来延迟执行工作,并在可以潜在地取得进展时取消阻塞。因此,我们来看看如何使这一切顺利协同工作

  1. 设置您的消息
    1. 定义您希望通过网络发送的消息类型(在上述描述中称为 'T')。
    2. 使用 serde 使这些结构可序列化。我建议依赖 serde_derive 中的宏。
    3. 为您的消息实现标记特质 middleman::Message。总体而言,它可能看起来像这样
    #[derive(Serialize, Deserialize)]
    enum MyMsg {
        // some examples of variants. All irrelevant to `Middleman`
        SayHello,
        SendGameState(Box<GameState>),
        Coordinate(f32, f32),
        ...
    }
    impl middleman::Message for MyMsg {}
    
  2. 设置您的 mio 循环
    1. 对于每个参与者,以某种方式获取一个与相关对等方连接的 mio::net::TcpStream 对象。这不是 middleman 独有的东西。
    2. 将每个 tcp 流包装在 Middleman 中。
    3. 使用各自的 Poll 对象注册您的 middleman(就像注册 mio::TcpStream 本身一样)。
    4. 在 mio poll 循环内部,在适当的时候调用 Middleman::recv 的某个变体。您的任务是确保您 始终清除所有等待的消息recv 从不会阻塞,所以您可以随意尝试接收一些内容。
    5. 根据需要使用您的 middleman 来 send

这就完了。流程与典型的 Tcp 设置没有太大不同。大部分工作涉及到让 Poll、Middleman 和 TcpStream 对象都很好地协同工作。有关更详细的示例,请参阅 测试

从 Mio 到 Middleman 的过渡

在实现高级算法时,人们喜欢不思考 字节数据包,而是考虑离散的 消息。枚举和结构体比字节序列更简洁地映射到这些理论结构。Middleman 的目标是隐藏所有字节级别的细节,但不隐藏更多内容。

熟悉使用 mio 通过 select-loop-like 构造来轮询一个或多个 Evented 结构体进度的用户会看到使用 middleman 并没有太大变化。

从高层来看,您的代码可能看起来像这样

let poll = ...
... // setup other mio stuff
let mut mm = Middleman::new(tcp_stream);
poll.register(&mm, MIDDLEMAN_TOK, ...).unwrap();

loop {
    poll.poll(&mut events, ... ).unwrap();
    for event in events.iter() {
        match event.token() {
            MIDDLEMAN_TOK => {
                if mm.recv_all_map<_, MyType>(|mm_ref, msg| {
                    // do something with `msg`
                }).1.is_err() {
                    // handle errors
                }
            },
            ...
            _ => unreachable!(),
        }
    }
}

有方法来精确地获取消息,何时反序列化它们以及接下来要做什么,但这是关键:当您从 poll 获得通知时,您尝试读取所有等待的消息并处理它们。就是这样。在任何时候,您都可以使用 mm.send::<MyType>(& msg) 将消息发送回对方。不需要额外的线程。不需要忙等待(多亏了 mio::Poll)。

特殊案例 recv_blocking

mio 本身是异步和非阻塞的。然而,有时阻塞接收更符合某些情况,例如预期恰好有一条消息。函数 recv_blockingrecv_blocking_solo 作为暂时篡改轮询循环流程的紧凑手段,直到有消息准备好。有关更多详细信息,请参阅文档,并查看 测试 中的示例。

关于消息大小的说明

这个库专注于灵活性。同一类型的消息可以在运行时以不同的大小表示(例如:空的 hashmap 比满的 hashmap 少占字节)。就消息的字节大小而言,您不必太担心,但仍需注意一些病态情况可能对内存大小产生的影响。

#[derive(Serialize, Deserialize)]
enum Large {
    A([u64; 32]),
    B(bool),
}
impl Message for Large {}

fn test() {
    let packed = PackedMessage::new(& Large::B(true)).unwrap();
    println!("packed bytes {:?}", packed.byte_len());
    println!("memory bytes {:?}", ::std::mem::size_of::<Large>());
}

调用 test 可能会打印

packed bytes 9
memory bytes 264

依赖关系

~1–1.6MB
~29K SLoC