#udp #ipc #networking #gamedev

tachyon-networking

一个基于Nack的可靠UDP库,适用于游戏和IPC

7个版本

0.1.7 2022年1月22日
0.1.6 2022年1月22日

#1261 in 游戏开发

MIT许可证

150KB
3.5K SLoC

Tachyon

Tachyon是一个高性能且高度并行的可靠UDP库,采用基于Nack的模型。

  • 高度可靠
  • 可靠分片
  • 有序和无序通道
  • 身份管理
  • 适用于高吞吐量IPC和游戏。

Tachyon专门设计用于具有许多连接客户端和高吞吐量本地IPC的垂直扩展环境。为了在单个帧内处理大量数据,接收并行化是必要的。并且需要更大的接收窗口来处理IPC量同时保持高度可靠性。

可靠性

由于想要支持更高的消息量和更大的消息,Tachyon采用了一种与大多数不同的模型。在可靠性方法中有一个核心的权衡,即越空间高效,它们能支持的可靠性窗口越小。在游戏中一个流行的方法是Glen Fiedler的方法,将ack编码为位标志。它的正常用法是在每个外出消息中编码ack。但这意味着如果你每帧发送33条消息,你将在一个帧内用完整个窗口。它被设计成与高级消息聚合一起使用,例如每帧发送1-4个数据包。

Nack模型可以在33个槽位中乐观地覆盖更大的窗口,因为我们只覆盖丢失的数据包。Tachyon通过一种可以覆盖非常大的窗口的方法进一步扩展了这一点,默认情况下每个通道有512个槽位。

接收窗口具有可配置的最大值。它从收到的最后一个有序序列开始,到收到的最后一个序列结束。我们每帧一次从后向前遍历这个窗口,并为每个覆盖33个槽位的窗口创建nack消息。然后,将这些消息打包成一个单独的varint编码的网络包。

但是,该消息本身可能会丢失,导致延迟。因此,我们还支持以循环方式将这些相同的nack插入到外出消息中,最多可重复TachyonConfig.nack_redundancy次。这里的想法是外出消息中的nack将覆盖每帧组合的最近丢失的nack。每帧的nack提供了更大的窗口覆盖。冗余的成本是外出消息的头部大小从4字节增加到10字节。

需要注意的一点是,nack模型需要恒定的消息流才能知道缺少了什么。因此,如果你有只有偶尔消息的通道,你应该定期发送一个头部加1大小的消息。Tachyon应该在这里添加一个内部消息,如果其他消息没有发送,则自动在所有通道上发送,但这还没有实现。

我们还有逻辑来过期发送缓冲区中持续时间太长的消息。比如偶尔的大消息,它们有自己的通道。发送缓冲区大小为1024,是默认接收窗口大小的两倍。

通道

顺序是按通道进行的。每个连接的地址(连接)都有自己的通道集。

系统自动配置了两个通道:通道1按顺序,通道2无序。你可以添加更多,但必须在绑定/连接之前添加。因为它们是按地址分配的,在服务器端,我们会在收到新地址的接收时懒加载创建通道。

分片

分片是单独可靠的。每个分片都作为单独的有序消息发送。因此,较大的消息工作得相当好,只是不要太大,以免占用太多的接收窗口。

有序与无序

这里的差异很简单,正如你所期望的那样。有序消息只按顺序交付。无序消息一到达就交付。两者都是可靠的。

连接管理

在UDP中使用“连接”这个词比较尴尬。因为尽管它们不像TCP那样连接,但UDP API确实有一个连接的概念。因此,将连接定义为具有应用级功能的东西,实际上只会使事情更加混乱。

因此,为了使事情清楚,Tachyon连接反映了UDP连接。然后我们添加一个可以链接到连接的Identity抽象。Identity是一个由应用程序创建的整数id和会话id。你在服务器上设置id/session对,并通过https等通道将这些信息告诉客户端。如果配置为使用身份,客户端将在连接后自动尝试链接其身份。如果客户端IP更改,它需要请求再次链接。服务器在首次链接时将删除之前链接的任何地址。启用身份后,两端将阻止常规消息,直到建立身份。

并发

最好的并发是没有并发。Tachyon是并行的,但内部不使用并发原语。相反,它通过简单地运行多个Tachyon实例(每个端口一个实例)利用Udp的性质。它提供了一个Pool抽象来管理这些实例,并并行运行它们的接收流。

支持并发发送但不支持可靠发送。对于可靠发送,你必须一次从单个线程发送。UDP套接字在OS级别上是原子的,因此不可靠就是直接通往那里的路径。UnreliableSender是一个可以用于其他线程的结构体。这是Rust的东西,它并不是为了实际的线程安全,只是为了让Rust知道它是安全的。

并行接收确实有额外的开销。它为接收到的消息分配字节,然后最终将所有这些推送到单个消费者队列。设计是一个并发队列的非并发队列。Tachyon实例将并发地从常规队列中解包,接收进入那个队列,然后将那个队列重新入队到并发队列。批量级别的原子操作,没有细粒度。

性能

大部分CPU时间都在udp系统调用中。但严重的丢包也可以增加CPU时间,因为较大的接收窗口使得Tachyon本身需要做更多的工作,因为它必须迭代那些窗口。

不可靠

不可靠的消息存在一个热路径,其中几乎不进行任何处理。我们必须无论如何缓冲可靠的消息,所以您只需要处理收发的消息体。对于不可靠的消息,您需要发送一个包含消息加上1个字节的字节数组。接收到的消息也将包含1个字节的头部。您不需要接触头部,也无法弄乱它,因为Tachyon会在发送时写入。但您必须对此进行推理。另一种选择是在发送和接收时使用memcpy。

待办事项列表

整合池以使其更加无缝。主要是一点对公共API的调整和几个额外的辅助方法。您实际上不需要考虑并行性,只需选择一个级别即可。

有一个完整的C#集成层,尚未推送到github。应该很快就会推出。

使用方法

目前文档不多,但有大量单元测试。ffi.rs封装了大部分API。tachyon_tests.rs包含一些压力测试单元测试。API主要设计用于ffi消费,因为我从.NET服务器中使用它。

update()必须每帧调用一次。在那里发送nacks和响应nacks的重新发送。此外还有一些日常维护和片段过期。发送将被立即处理。

基本使用。

let config = TachyonConfig::default();
let server = Tachyon::create(config);
let client = Tachyon::create(config);

let address = NetworkAddress { a: 127, b: 0, c: 0, d: 0, port: 8001};
server.bind(address);
client.connect(address);

let send_buffer: Vec<u8> = vec![0;1024];
let receive_buffer: Vec<u8> = vec![0;4096];

client.send_reliable(1, NetworkAddress::default(), &mut send_buffer, 32);
let receive_result = server.receive_loop(&mut receive_buffer);
if receive_result.length > 0 && receive_result.error == 0 {
    server.send_reliable(1, receive_result.address, &mut send_buffer, 32);
}

client.update();
server.update();

池的使用

let mut pool = Pool::create();
let config = TachyonConfig::default();

// create_server also binds the port
pool.create_server(config, NetworkAddress::localhost(8001));
pool.create_server(config, NetworkAddress::localhost(8002));
pool.create_server(config, NetworkAddress::localhost(8003));

// using built in test client which removes some boilerplate
let mut client1 = TachyonTestClient::create(NetworkAddress::localhost(8001));
let mut client2 = TachyonTestClient::create(NetworkAddress::localhost(8002));
let mut client3 = TachyonTestClient::create(NetworkAddress::localhost(8003));
client1.connect();
client2.connect();
client3.connect();

let count = 20000;
let msg_len = 64;

for _ in 0..count {
    client1.client_send_reliable(1, msg_len);
    client2.client_send_reliable(1, msg_len);
    client3.client_send_reliable(1, msg_len);
}
// non blocking receive
let receiving = pool.receive();

// call this to finish/wait on the receive.
let res = pool.finish_receive();

// or alternative call pool.receive_blocking() which doesn't need a finish_receive().

依赖项

~4.5MB
~86K SLoC