#buffer #message-bus #packet #bus #thread-local #pool

无 std bab

构建一个总线 - 为您的下一个消息总线构建的低级组件

2 个版本

0.0.1 2024 年 8 月 12 日
0.0.0 2024 年 8 月 12 日

#21#message-bus

Download history 213/week @ 2024-08-10

213 每月下载量

MIT 许可证

475KB
2.5K SLoC

Bab

免责声明:这个包是实验性的,使用了相当多的不安全代码块。它不适合生产环境。如果您发现错误 / UB / 竞态条件,请提交问题。Loom 测试是一个待办事项。

您是否曾想过构建自己的总线?没有?那么现在您可以做!

crab builds a bus

Bab,简称“构建总线”,是一个库,旨在为您应用程序的消息总线提供基础组件。其核心组件包括

  • 一个异步 BufferPool,允许在启动后快速分配和回收缓冲区,而不需要任何内存分配。根据当前的 API,通常不需要直接与 BufferPool 交互。
  • Packet,一个消息的处理句柄。数据包由缓冲区的一个区域支持,多个数据包可以由同一个缓冲区支持。缓冲区引用由系统管理 - 当缓冲区的所有引用都被丢弃时,它将自动释放回池中。
  • 对于创建和发送消息,使用 WriterWriteFlusher。编写者会在自动获取的缓冲区上为消息预留空间。由编写者服务的 WriteFlusher 将消息批量提供给调用者,例如在 TCP 套接字上刷新,批量和刷新节奏完全由您控制。每个写入的消息也会以 Packet 的形式返回给调用者,以便在需要时在本地应用程序中循环。
  • 对于接收传入的消息,使用 Framer,它允许您将字节写入到一个暂存区域,以便将其框架到 Packet 中供您的应用程序消费。

Bab 有一些偏见并且有点奇怪 - 它使用线程局部优化,并且通常假设您的应用程序是线程每核心。它的未来(例如,由 BufferPool::acquire 返回的)是 !Send,因此不能与像 Tokio 的多线程执行器这样的工作窃取执行器一起使用。例如,您可以使用 Tokio 的 LocalSet 和 futures-executor 的 LocalPool

数据包也可以是!发送。要在线程之间发送数据包,必须调用其上的数据包::send方法以获取一个发送数据包。然后在接收线程上调用发送数据包::receive将其转换回数据包

路线图

  • 添加LocalWriter - Writer!发送版本。
  • BufferPool / BufferPtr的线程安全接口?
  • Loom测试

为什么?

有两个与bab(批处理和线程局部优化)略有相关的问题,激励了bab的发展。

批处理

批处理是bab中的一个常见主题,但我最喜欢的例子是在Writer / WriteFlusher中。多个线程上的多个写入者可以写入相同的基本缓冲区,所有这些消息(可能跨越多个缓冲区)可以单个(相当昂贵的)O(1)操作发送到刷新器。请注意,这里批处理中发生的不仅仅是刷新 - 单个缓冲区可以包含多个消息,因此您可以非常自然地将多个消息打包到一个单个的输出数据包中(想象一下您正在使用UDP),这可以帮助您更好地利用网络的MTU(巨型帧,有人吗?)。

线程局部优化

在我的x86 System76 Darter Pro笔记本电脑上(这是免费的产品推广吗?),criterion表明,无争用的AtomicUsize::fetch_add花费约5.5纳秒,而一个非原子的usize::wrapping_add花费约280皮秒(1-2个时钟周期?),两者都设置了核心亲和力。因此,在我们的微基准测试中,原子增加比其非原子对应物昂贵一个数量级,即使受影响的缓存行仅从同一核心访问。并发数据结构肯定不是免费的。

bab通过使用线程局部优化来提供混合并发数据结构,以在API便利性和实现复杂性之间进行权衡,从而获得更好的性能。

bab线程局部优化的一个例子是,克隆数据包的成本与克隆Rc的成本相似(非常便宜),但您仍然可以选择将数据包发送到另一个线程(在这一点上产生类似于Arc::clone的成本)。

另一个例子是,BufferPool维护一个线程局部的缓冲区缓存,当缓存为空时批量重新填充它,当本地缓存太满时批量将缓冲区释放回共享池。此外,提供BufferPoolWriter的异步性的WaiterQueue结构,努力维护每个线程的单个线程安全等待者注册。给定线程上的所有其他等待者都注册在线程局部的列表中。

准则指出,创建一个容量为1033字节的缓冲区,并使用相应的释放操作,对于一个 Vec::<u8>::with_capacity 和从 bab::BufferPool 中获取和释放缓冲区操作,大约需要 ~32 纳秒。缓冲池还具有更多功能,例如具有固定内存使用量,并在缓冲区可用时通知等待缓冲区的任务(尽管在本次基准测试中未使用这些代码路径)。因此,从功能角度来看,可能更适合的对比是:1)获取信号量许可 2)分配缓冲区 3)释放缓冲区 4)释放信号量许可。

性能

有一些有限的基准测试 - 您可以尝试 examples/writer_benchmark.rs,它具有一些可配置的维度。您还可以查看 benches/,它为 FramerBufferPool 提供了基准测试。我将在未来更新本节并提供一些具体数字。但如果你以可扩展的方式(如果你追求的是原始吞吐量,不要所有线程都使用同一个 Writer)来构建你的总线,你应该没有问题,每核心每秒可以处理 1M - 10M 条消息。

许可证

MIT

依赖关系