2 个版本
新 0.0.1 | 2024 年 8 月 12 日 |
---|---|
0.0.0 | 2024 年 8 月 12 日 |
#21 在 #message-bus
213 每月下载量
475KB
2.5K SLoC
Bab
免责声明:这个包是实验性的,使用了相当多的不安全代码块。它不适合生产环境。如果您发现错误 / UB / 竞态条件,请提交问题。Loom 测试是一个待办事项。
您是否曾想过构建自己的总线?没有?那么现在您可以做!
Bab,简称“构建总线”,是一个库,旨在为您应用程序的消息总线提供基础组件。其核心组件包括
- 一个异步
BufferPool
,允许在启动后快速分配和回收缓冲区,而不需要任何内存分配。根据当前的 API,通常不需要直接与BufferPool
交互。 Packet
,一个消息的处理句柄。数据包由缓冲区的一个区域支持,多个数据包可以由同一个缓冲区支持。缓冲区引用由系统管理 - 当缓冲区的所有引用都被丢弃时,它将自动释放回池中。- 对于创建和发送消息,使用
Writer
和WriteFlusher
。编写者会在自动获取的缓冲区上为消息预留空间。由编写者服务的WriteFlusher
将消息批量提供给调用者,例如在 TCP 套接字上刷新,批量和刷新节奏完全由您控制。每个写入的消息也会以Packet
的形式返回给调用者,以便在需要时在本地应用程序中循环。 - 对于接收传入的消息,使用
Framer
,它允许您将字节写入到一个暂存区域,以便将其框架到Packet
中供您的应用程序消费。
Bab 有一些偏见并且有点奇怪 - 它使用线程局部优化,并且通常假设您的应用程序是线程每核心。它的未来(例如,由 BufferPool::acquire
返回的)是 !Send
,因此不能与像 Tokio 的多线程执行器这样的工作窃取执行器一起使用。例如,您可以使用 Tokio 的 LocalSet
和 futures-executor 的 LocalPool
。
数据包
也可以是!发送
。要在线程之间发送数据包,必须调用其上的数据包::send
方法以获取一个发送数据包
。然后在接收线程上调用发送数据包::receive
将其转换回数据包
。
路线图
- 添加
LocalWriter
-Writer
的!发送
版本。 - 对
BufferPool
/BufferPtr
的线程安全接口? - Loom测试
为什么?
有两个与bab(批处理和线程局部优化)略有相关的问题,激励了bab的发展。
批处理
批处理是bab中的一个常见主题,但我最喜欢的例子是在Writer
/ WriteFlusher
中。多个线程上的多个写入者可以写入相同的基本缓冲区,所有这些消息(可能跨越多个缓冲区)可以单个(相当昂贵的)O(1)操作发送到刷新器。请注意,这里批处理中发生的不仅仅是刷新 - 单个缓冲区可以包含多个消息,因此您可以非常自然地将多个消息打包到一个单个的输出数据包中(想象一下您正在使用UDP),这可以帮助您更好地利用网络的MTU(巨型帧,有人吗?)。
线程局部优化
在我的x86 System76 Darter Pro笔记本电脑上(这是免费的产品推广吗?),criterion表明,无争用的AtomicUsize::fetch_add
花费约5.5纳秒,而一个非原子的usize::wrapping_add
花费约280皮秒(1-2个时钟周期?),两者都设置了核心亲和力。因此,在我们的微基准测试中,原子增加比其非原子对应物昂贵一个数量级,即使受影响的缓存行仅从同一核心访问。并发数据结构肯定不是免费的。
bab通过使用线程局部优化来提供混合并发数据结构,以在API便利性和实现复杂性之间进行权衡,从而获得更好的性能。
bab线程局部优化的一个例子是,克隆数据包
的成本与克隆Rc
的成本相似(非常便宜),但您仍然可以选择将数据包发送到另一个线程(在这一点上产生类似于Arc::clone
的成本)。
另一个例子是,BufferPool
维护一个线程局部的缓冲区缓存,当缓存为空时批量重新填充它,当本地缓存太满时批量将缓冲区释放回共享池。此外,提供BufferPool
和Writer
的异步性的WaiterQueue
结构,努力维护每个线程的单个线程安全等待者注册。给定线程上的所有其他等待者都注册在线程局部的列表中。
准则指出,创建一个容量为1033字节的缓冲区,并使用相应的释放操作,对于一个 Vec::<u8>::with_capacity
和从 bab::BufferPool
中获取和释放缓冲区操作,大约需要 ~32 纳秒。缓冲池还具有更多功能,例如具有固定内存使用量,并在缓冲区可用时通知等待缓冲区的任务(尽管在本次基准测试中未使用这些代码路径)。因此,从功能角度来看,可能更适合的对比是:1)获取信号量许可 2)分配缓冲区 3)释放缓冲区 4)释放信号量许可。
性能
有一些有限的基准测试 - 您可以尝试 examples/writer_benchmark.rs
,它具有一些可配置的维度。您还可以查看 benches/
,它为 Framer
和 BufferPool
提供了基准测试。我将在未来更新本节并提供一些具体数字。但如果你以可扩展的方式(如果你追求的是原始吞吐量,不要所有线程都使用同一个 Writer
)来构建你的总线,你应该没有问题,每核心每秒可以处理 1M - 10M 条消息。
许可证
MIT