1 个不稳定版本
0.1.0 | 2021 年 12 月 19 日 |
---|
#1210 在 并发
29KB
599 行(不含注释)
批处理队列
一个在生产者和消费者之间实现智能批处理的库。换句话说,这是一个尝试在休眠之前让任一侧运行一段时间(以利用 L1 缓存)的单个生产者单个消费者队列。
该队列的一个优点是所有桶都是预先分配的,除非使用 recv_batch
或 try_recv_batch
方法,否则在发送和接收项时不会进行分配。
示例
# tokio_test::block_on(async {
use batch_queue::{batch_queue, pipe};
let (tx, mut rx) = batch_queue::<u32, 128>(20);
const N: u32 = 10_000_000;
tokio::spawn(async move {
let stream = futures::stream::iter(0..N);
pipe(stream, tx).await;
});
let mut x = 0;
while let Ok(iter) = rx.recv().await {
for y in iter {
assert_eq!(y, x);
x += 1;
}
}
assert_eq!(x, N);
# })
在这里,由 recv()
返回的迭代器允许您在不进行分配的情况下消费桶内容。在我的 AMD Hetzner 箱子上,每项大约需要 6 纳秒。
工作原理
队列被建模为一个桶的箱切片,其中每个桶都是一个适当大小的 MaybeUninit
初始化项的数组和一个填充级别。读取器和写入器各自维护一个当前位置,包括桶索引以及一个循环计数器(以检测写入器是否正好领先于读取器一个完整循环)。
插入项
- 检查写入者的桶是否可用于写入(即它不是上一个循环中读取者的桶)
- 如果是这样,则在桶中的下一个槽位中插入;如果已满,则移动写入桶位置
声明当前批次的结束
- 检查当前写入桶是否有元素 ⇒ 移动写入桶位置
取桶
- 检查写入者的桶是否领先于读取者的桶 ⇒ 取走它并移动读取桶位置
依赖关系
~1–1.7MB
~34K SLoC