7 个版本
0.1.6 | 2023 年 5 月 28 日 |
---|---|
0.1.5 | 2023 年 5 月 28 日 |
#102 在 缓存
每月 58 次下载
54KB
969 行
时间键流集
此库提供了一种混合内存/非内存数据结构,用于去重大多数有序的时间键流。
目标
有几个优先特性推动了该库的设计和开发。
- 去重 来自物联网/可穿戴设备前端以 .1Hz 到 10kHz 的离散采样频率发出的行的时间键流。
- 预期时间键将由微秒时间戳、整数用户_id、整数设备_id 和整数模式枚举组成。
- 限制提供去重功能的内存结构以允许单个节点具有 32GiB 到 256GiB 的 RAM 作为去重节点。
- 每日提供约 100B 行的去重
- 允许配置保留期,无需重建索引或过滤器
- 专门针对 u128 宽度的时间键
启发式动机
这个领域中有几个经验丰富的结构,如 B+ 树或日志结构合并树。作为一个专门用例,我们不需要承受这些结构在泛化过程中的一些权衡。将利用传入数据流的几个特性来优化平均情况。
- 从可穿戴/移动设备批量上传插入操作
- 标准情况下的批量大小为 ~5000 个连续的时间键
- 对于每个 user_id/device_id/source 对
- 时间戳将以 100s 到 1000000s 的时间间隔分开。
- 传入数据很少会倒退,几乎只在从开始读取保存的数据并再次向前推进时。
- 与时间键相关联的行数据不会与时间键流集一起存储。
- 流集资源的截断和回收可能发生在相对较少的时间间隔内,例如,每小时一次。
B+ 树非常适合这种场景,除了几个关键区别。首要原因是缺乏可行的 OOM 存储的 crate 实现,其次是缺乏对 u128 时间键的连续块插入的优化。LSM 树的类似缺乏支持促使开发以下算法的实现。
方法
- 时间间隔分区文件,段,在 LRU 风格中刷入磁盘,可能发生颠簸风险
- 时间键 属于一个且仅一个 段
- 用户可以选择时间间隔(每小时)
- 与回收间隔对齐,可简化截断
- 段是具有与页面大小的对齐节点,用于时间键的特殊化B+树。
- 支持插入接口,如下所示:
fn insert(batch: Vec<TimeKey>) -> Vec<bool>
- 支持高效的页面缓存使用,以实现基于memmap文件的持久性。
- 支持插入接口,如下所示:
- 内部节点与段“叶”节点分离持久化,并持续存储在RAM中。
- 时间键宽度为128位,包含64位的纪元微秒和64位的id的后半部分。
- 实现
From<&MyId> for TimeKey
,其中ts_us: i64
然后user_id: i32
和device_id: i32
作为正整数占据时间键的后半部分,将为相对较低的基数user_id和device_id的部署留下高度可压缩的前导零。
- 实现
段通过tsz然后zstd进行压缩,对于某些具有1_000x到200_000x压缩率的稀疏段,当刷新到磁盘时,某些数据集的数据速率保持一致。段以串行方式访问,插入率取决于底层索引,目前BTreeMap
的插入率约为~300_000个时间键每秒(单核CPU限制)。
在每次插入时,以下是插入的伪代码
- 计算每行的段起始桶时间戳
- 锁定段索引
- 查找插入桶时间戳范围内的现有段
- 创建索引中尚未存在的任何新段
- 锁定热段缓存
- 将新段插入索引和缓存
- 循环LRU逐出和段非/插入的水合,尊重请求的内存限制
- 释放热段缓存锁
- 锁定每个正在插入的段
- 释放段索引锁
- 将时间键插入其各自的段
- 释放每个段锁
- 更新新插入时间键的内存使用情况
接口
在Rust中,使用时间键流集TkStreamSet
,如下所示
use time_key_stream_set::prelude::*;
use std::error::Error;
async fn frontend_receive() -> Result<Vec<AdcRow>, Box<dyn Error>> {
// Who knows, maybe you get this data on an HTTP server
unimplemented!()
}
async fn yolo() {
// Hope the database doesn't rollback this transaction
unimplemented!()
}
#[derive(IntoTimeKey)]
struct AdcRow {
ts_us: i64,
user_id: i32,
device_id: i32,
millivolts: i16,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Rehydrate a set of time keys from a known directory or a temperatory directory
// Configure a stream set via a builder with reasonable defaults for the data source
let stream_set = TkStreamSetBuilder::new()
.with_segment_time_interval(Duration::from_secs(60 * 60 * 2))
.with_memory_limit(MemoryLimit::Low)
.build()
.await
.unwrap();
// Multiple threads could be running the same loop on this node
loop {
// I got some new data that may have duplicates
let batch: Vec<AdcRow> = frontend_receive().await?;
// We are going to keep this data, glad RAM doesn't blow up
let dups = stream_set.insert(batch.iter().map(|row| row.into()).collect::<Vec<_>>).await?;
let dedups = batch.iter()
.zip(dups)
.filter_map(|(row, keep)| if keep { Some(row) } else { None });
// Send to the data warehouse somewhere
loop {
match yolo().await {
Ok(_) => break,
Err(e) => todo!(),
}
}
}
}
依赖项
~13MB
~220K SLoC