7 个版本

0.1.6 2023 年 5 月 28 日
0.1.5 2023 年 5 月 28 日

#102缓存

Download history 7/week @ 2024-03-09 3/week @ 2024-03-30 128/week @ 2024-04-06 5/week @ 2024-04-13

每月 58 次下载

MIT/Apache

54KB
969

时间键流集

此库提供了一种混合内存/非内存数据结构,用于去重大多数有序的时间键流。

目标

有几个优先特性推动了该库的设计和开发。

  • 去重 来自物联网/可穿戴设备前端以 .1Hz 到 10kHz 的离散采样频率发出的行的时间键流。
    • 预期时间键将由微秒时间戳、整数用户_id、整数设备_id 和整数模式枚举组成。
  • 限制提供去重功能的内存结构以允许单个节点具有 32GiB 到 256GiB 的 RAM 作为去重节点。
  • 每日提供约 100B 行的去重
  • 允许配置保留期,无需重建索引或过滤器
  • 专门针对 u128 宽度的时间键

启发式动机

这个领域中有几个经验丰富的结构,如 B+ 树或日志结构合并树。作为一个专门用例,我们不需要承受这些结构在泛化过程中的一些权衡。将利用传入数据流的几个特性来优化平均情况。

  1. 从可穿戴/移动设备批量上传插入操作
    1. 标准情况下的批量大小为 ~5000 个连续的时间键
  2. 对于每个 user_id/device_id/source 对
    1. 时间戳将以 100s 到 1000000s 的时间间隔分开。
    2. 传入数据很少会倒退,几乎只在从开始读取保存的数据并再次向前推进时。
  3. 与时间键相关联的行数据不会与时间键流集一起存储。
  4. 流集资源的截断和回收可能发生在相对较少的时间间隔内,例如,每小时一次。

B+ 树非常适合这种场景,除了几个关键区别。首要原因是缺乏可行的 OOM 存储的 crate 实现,其次是缺乏对 u128 时间键的连续块插入的优化。LSM 树的类似缺乏支持促使开发以下算法的实现。

方法

  • 时间间隔分区文件,,在 LRU 风格中刷入磁盘,可能发生颠簸风险
    • 时间键 属于一个且仅一个
    • 用户可以选择时间间隔(每小时)
      • 与回收间隔对齐,可简化截断
  • 段是具有与页面大小的对齐节点,用于时间键的特殊化B+树。
    • 支持插入接口,如下所示:fn insert(batch: Vec<TimeKey>) -> Vec<bool>
    • 支持高效的页面缓存使用,以实现基于memmap文件的持久性。
  • 内部节点与段“叶”节点分离持久化,并持续存储在RAM中。
  • 时间键宽度为128位,包含64位的纪元微秒和64位的id的后半部分。
    • 实现From<&MyId> for TimeKey,其中ts_us: i64然后user_id: i32device_id: i32作为正整数占据时间键的后半部分,将为相对较低的基数user_id和device_id的部署留下高度可压缩的前导零。

段通过tsz然后zstd进行压缩,对于某些具有1_000x到200_000x压缩率的稀疏段,当刷新到磁盘时,某些数据集的数据速率保持一致。段以串行方式访问,插入率取决于底层索引,目前BTreeMap的插入率约为~300_000个时间键每秒(单核CPU限制)。

在每次插入时,以下是插入的伪代码

  1. 计算每行的段起始桶时间戳
  2. 锁定段索引
  3. 查找插入桶时间戳范围内的现有段
  4. 创建索引中尚未存在的任何新段
  5. 锁定热段缓存
  6. 将新段插入索引和缓存
  7. 循环LRU逐出和段非/插入的水合,尊重请求的内存限制
  8. 释放热段缓存锁
  9. 锁定每个正在插入的段
  10. 释放段索引锁
  11. 将时间键插入其各自的段
  12. 释放每个段锁
  13. 更新新插入时间键的内存使用情况

接口

在Rust中,使用时间键流集TkStreamSet,如下所示

use time_key_stream_set::prelude::*;
use std::error::Error;

async fn frontend_receive() -> Result<Vec<AdcRow>, Box<dyn Error>> {
    // Who knows, maybe you get this data on an HTTP server
    unimplemented!()
}

async fn yolo() {
    // Hope the database doesn't rollback this transaction
    unimplemented!()
}

#[derive(IntoTimeKey)]
struct AdcRow {
    ts_us: i64,
    user_id: i32,
    device_id: i32,
    millivolts: i16,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // Rehydrate a set of time keys from a known directory or a temperatory directory
    // Configure a stream set via a builder with reasonable defaults for the data source
    let stream_set = TkStreamSetBuilder::new()
        .with_segment_time_interval(Duration::from_secs(60 * 60 * 2))
        .with_memory_limit(MemoryLimit::Low)
        .build()
        .await
        .unwrap();


    // Multiple threads could be running the same loop on this node
    loop {
        // I got some new data that may have duplicates
        let batch: Vec<AdcRow> = frontend_receive().await?;

        // We are going to keep this data, glad RAM doesn't blow up
        let dups = stream_set.insert(batch.iter().map(|row| row.into()).collect::<Vec<_>>).await?;
        let dedups = batch.iter()
            .zip(dups)
            .filter_map(|(row, keep)| if keep { Some(row) } else { None });

        // Send to the data warehouse somewhere
        loop {
            match yolo().await {
                Ok(_) => break,
                Err(e) => todo!(),
            }
        }
    }
}

依赖项

~13MB
~220K SLoC