4 个版本

0.1.3 2020年12月31日
0.1.2 2020年12月30日
0.1.1 2020年12月30日
0.1.0 2020年12月30日

#197日期和时间

MIT/Apache

160KB
2K SLoC

TSXLIB-RS:Rust通用时间序列容器

Build Status codecov rust docs

项目概述

TSXLIB-RS/TSXLIB是一个处于beta阶段的开源项目。

我们仍在迭代和改进这个crate。由于容器和方法现在相对通用,我们将尝试确保在版本之间演变时保持向后兼容性。然而,就像任何beta阶段的项目一样,仍然可能发生破坏性变更。

目标和非目标

本项目目标是提供一个具有强大编译时可见性的通用容器,您可以使用它来1.)收集时间序列数据,2.)对数据进行高效映射操作,目前这可能会以查找性能为代价。

我们故意对您将放入容器中的数据类型做了(非常少的)假设。也就是说,它在数据和键方面都是通用的。这是为了让您能够放入任何自定义时间结构以及您想要的数据。

在待办事项列表中添加一些基本原语特殊化,例如,有一个diff()方法,而不是在f64的跳转操作符上每次都放入UDF来实现相同的功能。

相反,这并不是一个通用的类似dataframe的库。

快速入门

要么从这个repo分叉,要么将以下内容添加到您的项目中的Cargo.toml

[dependencies]
tsxlib = { version = "^0.1.0", features = ["parq","json"] }

兼容性说明

如果您启用了parquet IO功能,即使用"parq"特性,则需要使用nightly Rust。

所有其他功能在稳定版Rust上也能工作。

CI在稳定版(带有json功能)、beta版(带有json功能)和nightly版(带有json和parquet功能)上运行。

测试在Rust >=1.48上

一旦项目稳定,将努力保持与先前rust编译器版本的兼容性。

示例

使用此库,您可以
从时间序列中提取点

use tsxlib::timeseries::TimeSeries;
use chrono::{NaiveDateTime};

let index = vec![NaiveDateTime::from_timestamp(1,0), NaiveDateTime::from_timestamp(5,0), NaiveDateTime::from_timestamp(10,0)];
let data = vec![1.0, 2.0, 3.0];
let ts = TimeSeries::from_vecs(index, data).unwrap();

assert_eq!(ts.at(NaiveDateTime::from_timestamp(0,0)), None);
assert_eq!(ts.at(NaiveDateTime::from_timestamp(1,0)), Some(1.0));

也可以索引到第一个点或指定位置

assert_eq!(ts.at_or_first_prior(NaiveDateTime::from_timestamp(0,0)), None);
assert_eq!(ts.at_or_first_prior(NaiveDateTime::from_timestamp(1,0)), Some(1.0));
assert_eq!(ts.at_or_first_prior(NaiveDateTime::from_timestamp(4,0)), Some(1.0));

还可以在指定位置操作

assert_eq!(ts.at_idx_of(1), Some(TimeSeriesDataPoint::new(NaiveDateTime::from_timestamp(5,0), 2.0)));

该库还可以高效地对时间序列应用函数

let result = ts.map(|x| x * 2.0);

但是,您也可以将其用作迭代器,注意:collect将检查顺序并在需要时重新排序,但名为"unchecked"的方法则不会。

let result: TimeSeries<NaiveDateTime,f64> = ts.into_iter().map(|x| TimeSeriesDataPoint::new(x.timestamp,x.value * 2.0)).collect_from_unchecked_iter();

这意味着您可以使用它与其他作为迭代器扩展的crate一起使用,例如rayon,在需要多线程处理负载时非常有用。

let result: TimeSeries<NaiveDateTime,f64> = TimeSeries::from_tsdatapoints(ts.into_iter().par_bridge().map(|x| TimeSeriesDataPoint::new(x.timestamp,x.value * 2.0)).collect::<Vec<TimeSeriesDataPoint<NaiveDateTime, f64>>>()).unwrap();

这还意味着您可以使用本地迭代器方法来计算累积总和等操作

//as a total
let total = ts.into_iter().fold(0.0,|acc,x| acc + x.value);
// as a timeseries
let mut acc = 0.0;
let result: TimeSeries<NaiveDateTime, f64> = ts.into_iter().map(|x| {acc = acc + x.value; TimeSeriesDataPoint::new(x.timestamp,acc) }).collect();

实现了连接/交叉应用操作,我们有了交叉应用内部

use tsxlib::timeseries::TimeSeries;
use tsxlib::data_elements::TimeSeriesDataPoint;

let values : Vec<f64> = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let values2 : Vec<f64> = vec![1.0, 2.0, 4.0];
let index: Vec<i32> = (0..values.len()).map(|i| i as i32).collect();
let index2: Vec<i32> = (0..values2.len()).map(|i| i as i32).collect();
let ts = TimeSeries::from_vecs(index, values).unwrap();
let ts1 = TimeSeries::from_vecs(index2, values2).unwrap();
let tsres = ts.cross_apply_inner(&ts1,|a,b| (*a,*b));

let expected = vec![
    TimeSeriesDataPoint { timestamp: 0, value: (1.00, 1.00) },
    TimeSeriesDataPoint { timestamp: 1, value: (2.00, 2.00) },
    TimeSeriesDataPoint { timestamp: 2, value: (3.00, 4.00) },
];
let ts_expected = TimeSeries::from_tsdatapoints(expected).unwrap();

assert_eq!(ts_expected, tsres)

我们还有交叉应用左

use tsxlib::timeseries::TimeSeries;
use tsxlib::data_elements::TimeSeriesDataPoint;

let values : Vec<f64> = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let values2 : Vec<f64> = vec![1.0, 2.0, 4.0];
let index: Vec<i32> = (0..values.len()).map(|i| i as i32).collect();
let index2: Vec<i32> = (0..values2.len()).map(|i| i as i32).collect();
let ts = TimeSeries::from_vecs(index, values).unwrap();
let ts1 = TimeSeries::from_vecs(index2, values2).unwrap();
let tsres = ts.cross_apply_left(&ts1,|a,b| (*a, match b { Some(v) => Some(*v), _ => None }));

let expected = vec![
    TimeSeriesDataPoint { timestamp: 0, value: (1.00, Some(1.00)) },
    TimeSeriesDataPoint { timestamp: 1, value: (2.00, Some(2.00)) },
    TimeSeriesDataPoint { timestamp: 2, value: (3.00, Some(4.0)) },
    TimeSeriesDataPoint { timestamp: 3, value: (4.00, None) },
    TimeSeriesDataPoint { timestamp: 4, value: (5.00, None) },
];

let ts_expected = TimeSeries::from_tsdatapoints(expected).unwrap();

assert_eq!(ts_expected, tsres)

鉴于这是一个以时间序列为重点的库,我们还有“至”应用

let values = vec![1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0];
let index = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];    
let ts = TimeSeries::from_vecs(index.iter().map(|x| NaiveDateTime::from_timestamp(*x,0)).collect(), values).unwrap();
let values2 = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0];
let index2 = vec![2, 4, 5, 7, 8, 10];    
let ts_join = TimeSeries::from_vecs(index2.iter().map(|x| NaiveDateTime::from_timestamp(*x,0)).collect(), values2).unwrap();

let result = ts.merge_apply_asof(&ts_join,Some(chrono_utils::merge_asof_prior(Duration::seconds(1))),|a,b| (*a, match b {
    Some(x) => Some(*x),
    None => None
}), MergeAsofMode::RollPrior);

let expected = vec![
    TimeSeriesDataPoint { timestamp: NaiveDateTime::from_timestamp(1,0), value: (1.00, None) },
    TimeSeriesDataPoint { timestamp: NaiveDateTime::from_timestamp(2,0), value: (1.00, Some(1.00)) },
    TimeSeriesDataPoint { timestamp: NaiveDateTime::from_timestamp(3,0), value: (1.00, Some(1.00)) },
    TimeSeriesDataPoint { timestamp: NaiveDateTime::from_timestamp(4,0), value: (1.00, Some(2.00)) },
    TimeSeriesDataPoint { timestamp: NaiveDateTime::from_timestamp(5,0), value: (1.00, Some(3.00)) },
    TimeSeriesDataPoint { timestamp: NaiveDateTime::from_timestamp(6,0), value: (1.00, Some(3.00)) },
    TimeSeriesDataPoint { timestamp: NaiveDateTime::from_timestamp(7,0), value: (1.00, Some(4.00)) },
    TimeSeriesDataPoint { timestamp: NaiveDateTime::from_timestamp(8,0), value: (1.00, Some(5.00)) },
    TimeSeriesDataPoint { timestamp: NaiveDateTime::from_timestamp(9,0), value: (1.00, Some(5.00)) },
    TimeSeriesDataPoint { timestamp: NaiveDateTime::from_timestamp(10,0), value: (1.00, Some(6.00)) },
];

let ts_expected = TimeSeries::from_tsdatapoints(expected).unwrap();

assert_eq!(result, ts_expected);

最后,您也可以轻松地将多个序列连接在一起

let ts = TimeSeries::from_vecs(index.clone(), values).unwrap();
let ts1 = TimeSeries::from_vecs(index.clone(), values2).unwrap();
let ts2 = TimeSeries::from_vecs(index.clone(), values3).unwrap();
let ts3 = TimeSeries::from_vecs(index, values4).unwrap();
let tsres = n_inner_join!(ts,&ts1,&ts2,&ts3);

各种时间序列功能通常实现为迭代器。例如,移位...

let tslag: TimeSeries<NaiveDateTime,f64> = ts.shift(-1).collect();

API的语言设计旨在通用,所以上面的意思是“滞后”,并且

let tsfwd: TimeSeries<NaiveDateTime,f64> = ts.shift(1).collect();

表示“向前滚动”

TSXLIB-RS还有一个“跳过”运算符。即如果您想实现差异,可以编写

fn change_func(prior: &f64, curr: &f64) -> f64{
    curr - prior
};
let ts_diff: TimeSeries<NaiveDateTime,f64> = ts.skip_apply(1, change_func).collect();

相反,如果您想实现百分比变化,可以编写

fn change_func(prior: &f64, curr: &f64) -> f64{
    (curr - prior)/prior
};
let ts_perc_ch: TimeSeries<NaiveDateTime,f64> = ts.skip_apply(1, change_func).collect();

TSXLIB-RS还具有滚动窗口操作。它们可以使用缓冲区实现

let values = vec![1.0, 1.0, 1.0, 1.0, 1.0];
let index = (0..values.len()).map(|i| NaiveDateTime::from_timestamp(60 * i as i64,0)).collect();
let ts = TimeSeries::from_vecs(index, values).unwrap();

fn roll_func(buffer: &Vec<f64>) -> f64{
    buffer.iter().sum()
};

let tsrolled: TimeSeries<NaiveDateTime,f64> = ts.apply_rolling(2, roll_func).collect();

或通过更新函数实现。请注意,这将更有效率,因为您不需要在内存中保留缓冲区

fn update(prior: Option<f64>, next: &f64) -> Option<f64>{
    let v =  match prior.is_some(){
        true => prior.unwrap(),
        false => 0.0
    };
    Some(v + next)
};

fn decrement(next: Option<f64>, prior: &f64) -> Option<f64>{
    let v =  match next.is_some(){
        true => next.unwrap(),
        false => 0.0
    };
    Some(v - prior)
};

let tsrolled: TimeSeries<NaiveDateTime,f64> = ts.apply_updating_rolling(2, update, decrement).collect();

TSXLIB-RS还支持在索引上的聚合

let result = ts.resample_and_agg(Duration::minutes(15), |dt,dur| timeutils::round_up_to_nearest_duration(dt, dur), |x| *x.last().unwrap().value);

有关更全面/可运行的示例,请查看测试和示例!

基准性能

我们在这里运行的基准包括生成一系列999,997个双精度浮点数,使用chrono NaiveDateTime作为毫秒级精度的键。然后对这些数据进行滞后处理和连接。然后将其四舍五入到条。在两个基准测试中,都取了最后一个值(但您可以使用任何UDF来生成此聚合)。最后,我们将数据转换为一个简单的结构体,其中包含以下字段

#[derive(Clone,Copy,Serialize,Default)]
struct SimpleStruct{
    pub timestamp: i64,
    pub floatvalueother: f64,
    pub floatvalue: f64
};

在最后两个基准测试中,我们将上述结构体转换为

    #[derive(Clone,Copy,Serialize,Default)]
    struct OtherStruct{
        pub timestamp: i64,
        pub ratio: f64
    };

使用以下UDF

fn complicated(x: &SimpleStruct) -> OtherStruct{
    if x.timestamp & 1 == 0 {
        OtherStruct {timestamp:x.timestamp,ratio:x.floatvalue}
    }
    else{
        OtherStruct {timestamp:x.timestamp,ratio:x.floatvalue/x.floatvalueother}
    }
}

这是为了模拟该库的相对现实的应用场景。这绝对不是它所能完成的事情的渐近极限。


系统规格

处理器 Intel Core i7-9750H @ 2.60GHz
RAM 16 gb DDR4

结果

根据Cargo.toml中的发布设置进行编译。

要在您的机器上运行,请编译

cargobuild --examples --release --features "parq"

然后运行 benchmark.exe

基准 # 次数 总时间 (s) 平均值 (ms) 最小值 (ms) 最大值 (ms)
通过值迭代器映射浮点数 100 1.72 17.20 15.88 26.41
通过引用迭代器映射浮点数 100 1.20 11.96 11.21 14.54
通过本地方法映射浮点数 100 0.53 5.30 4.91 6.60
滞后1 100 1.88 18.78 17.4 26.16
交叉应用(内连接) 100 7.88 78.78 75.9 94.27
交叉应用(内连接)不同长度 100 8.02 80.24 75.7 91.19
交叉应用(左连接)不同长度 100 8.32 83.18 78.7 99.33
条数据向上取整 100 8.09 80.89 77.7 88.17
条数据向下取整 100 7.74 77.42 74.0 85.27
通过迭代器映射结构体 100 2.33 23.29 20.5 31.42
通过本地方法映射结构体:总计 100 1.19 11.88 11.1 14.07

特性/待办事项列表

特性 支持 类别 编译器选项 Rust 版本
时间过滤器 核心 >=1.48
位置索引 核心 >=1.48
键索引 核心 >=1.48
移位 核心 >=1.48
内连接(合并 & 哈希连接) 核心 >=1.48
左连接(合并 & 哈希连接) 核心 >=1.48
“至”连接(合并) 核心 >=1.48
多个内连接 核心 >=1.48
连接/交织 核心 >=1.48
时间聚合 核心 >=1.48
具有chrono索引的时间聚合辅助程序 特殊化 >=1.48
具有int索引的时间聚合辅助程序 特殊化 >=1.48
闭包应用(用户定义函数) 核心 >=1.48
SIMD 支持 核心 >=1.48
本地空值填充/插值 核心 >=1.48
基于缓冲区的移动窗口操作 核心 >=1.48
基于更新的移动窗口操作 核心 >=1.48
“跳过”操作(即差异等) 核心 >=1.48
Rust 迭代器 核心 >=1.48
有序 Rust 迭代器 核心 >=1.48
流式迭代器 核心 >=1.48
CSV IO* IO >=1.48
JSON IO* IO "json" >=1.48
Parquet IO* IO "parq" Nightly
Avro IO IO >=1.48
Flatbuffer IO IO >=1.48
Apache Kafka IO IO >=1.48
Protocol buffer IO IO >=1.48
原始值类型的直观API 特殊化 >=1.48
本地多线程 核心 >=1.48
全面的文档 元信息 >=1.48
测试覆盖率 元信息 >=1.48
更多示例 元信息 >=1.48

标记为“*”的功能需要额外的性能调整,可能还需要将其重构为更通用的框架。请注意,尽管兼容性只列出了 Rust >=1.48,但 TSXLIB-RS 也可能与较低的 Rust 版本一起工作,只是尚未经过测试。

许可证

根据以下任一许可证授权

贡献

除非您明确说明,否则根据 Apache-2.0 许可证定义的任何有意提交以包含在您的工作中的贡献,都应如上所述双重许可,没有任何附加条款或条件。

依赖项

~3–12MB
~143K SLoC