4 个版本
0.1.3 | 2020年12月31日 |
---|---|
0.1.2 | 2020年12月30日 |
0.1.1 | 2020年12月30日 |
0.1.0 | 2020年12月30日 |
#197 在 日期和时间
160KB
2K SLoC
TSXLIB-RS:Rust通用时间序列容器
项目概述
TSXLIB-RS/TSXLIB是一个处于beta阶段的开源项目。
我们仍在迭代和改进这个crate。由于容器和方法现在相对通用,我们将尝试确保在版本之间演变时保持向后兼容性。然而,就像任何beta阶段的项目一样,仍然可能发生破坏性变更。
目标和非目标
本项目目标是提供一个具有强大编译时可见性的通用容器,您可以使用它来1.)收集时间序列数据,2.)对数据进行高效映射操作,目前这可能会以查找性能为代价。
我们故意对您将放入容器中的数据类型做了(非常少的)假设。也就是说,它在数据和键方面都是通用的。这是为了让您能够放入任何自定义时间结构以及您想要的数据。
在待办事项列表中添加一些基本原语特殊化,例如,有一个diff()方法,而不是在f64的跳转操作符上每次都放入UDF来实现相同的功能。
相反,这并不是一个通用的类似dataframe的库。
快速入门
要么从这个repo分叉,要么将以下内容添加到您的项目中的Cargo.toml
[dependencies]
tsxlib = { version = "^0.1.0", features = ["parq","json"] }
兼容性说明
如果您启用了parquet IO功能,即使用"parq"特性,则需要使用nightly Rust。
所有其他功能在稳定版Rust上也能工作。
CI在稳定版(带有json功能)、beta版(带有json功能)和nightly版(带有json和parquet功能)上运行。
测试在Rust >=1.48上
一旦项目稳定,将努力保持与先前rust编译器版本的兼容性。
示例
使用此库,您可以
从时间序列中提取点
use tsxlib::timeseries::TimeSeries;
use chrono::{NaiveDateTime};
let index = vec![NaiveDateTime::from_timestamp(1,0), NaiveDateTime::from_timestamp(5,0), NaiveDateTime::from_timestamp(10,0)];
let data = vec![1.0, 2.0, 3.0];
let ts = TimeSeries::from_vecs(index, data).unwrap();
assert_eq!(ts.at(NaiveDateTime::from_timestamp(0,0)), None);
assert_eq!(ts.at(NaiveDateTime::from_timestamp(1,0)), Some(1.0));
也可以索引到第一个点或指定位置
assert_eq!(ts.at_or_first_prior(NaiveDateTime::from_timestamp(0,0)), None);
assert_eq!(ts.at_or_first_prior(NaiveDateTime::from_timestamp(1,0)), Some(1.0));
assert_eq!(ts.at_or_first_prior(NaiveDateTime::from_timestamp(4,0)), Some(1.0));
还可以在指定位置操作
assert_eq!(ts.at_idx_of(1), Some(TimeSeriesDataPoint::new(NaiveDateTime::from_timestamp(5,0), 2.0)));
该库还可以高效地对时间序列应用函数
let result = ts.map(|x| x * 2.0);
但是,您也可以将其用作迭代器,注意:collect将检查顺序并在需要时重新排序,但名为"unchecked"的方法则不会。
let result: TimeSeries<NaiveDateTime,f64> = ts.into_iter().map(|x| TimeSeriesDataPoint::new(x.timestamp,x.value * 2.0)).collect_from_unchecked_iter();
这意味着您可以使用它与其他作为迭代器扩展的crate一起使用,例如rayon
,在需要多线程处理负载时非常有用。
let result: TimeSeries<NaiveDateTime,f64> = TimeSeries::from_tsdatapoints(ts.into_iter().par_bridge().map(|x| TimeSeriesDataPoint::new(x.timestamp,x.value * 2.0)).collect::<Vec<TimeSeriesDataPoint<NaiveDateTime, f64>>>()).unwrap();
这还意味着您可以使用本地迭代器方法来计算累积总和等操作
//as a total
let total = ts.into_iter().fold(0.0,|acc,x| acc + x.value);
// as a timeseries
let mut acc = 0.0;
let result: TimeSeries<NaiveDateTime, f64> = ts.into_iter().map(|x| {acc = acc + x.value; TimeSeriesDataPoint::new(x.timestamp,acc) }).collect();
实现了连接/交叉应用操作,我们有了交叉应用内部
use tsxlib::timeseries::TimeSeries;
use tsxlib::data_elements::TimeSeriesDataPoint;
let values : Vec<f64> = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let values2 : Vec<f64> = vec![1.0, 2.0, 4.0];
let index: Vec<i32> = (0..values.len()).map(|i| i as i32).collect();
let index2: Vec<i32> = (0..values2.len()).map(|i| i as i32).collect();
let ts = TimeSeries::from_vecs(index, values).unwrap();
let ts1 = TimeSeries::from_vecs(index2, values2).unwrap();
let tsres = ts.cross_apply_inner(&ts1,|a,b| (*a,*b));
let expected = vec![
TimeSeriesDataPoint { timestamp: 0, value: (1.00, 1.00) },
TimeSeriesDataPoint { timestamp: 1, value: (2.00, 2.00) },
TimeSeriesDataPoint { timestamp: 2, value: (3.00, 4.00) },
];
let ts_expected = TimeSeries::from_tsdatapoints(expected).unwrap();
assert_eq!(ts_expected, tsres)
我们还有交叉应用左
use tsxlib::timeseries::TimeSeries;
use tsxlib::data_elements::TimeSeriesDataPoint;
let values : Vec<f64> = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let values2 : Vec<f64> = vec![1.0, 2.0, 4.0];
let index: Vec<i32> = (0..values.len()).map(|i| i as i32).collect();
let index2: Vec<i32> = (0..values2.len()).map(|i| i as i32).collect();
let ts = TimeSeries::from_vecs(index, values).unwrap();
let ts1 = TimeSeries::from_vecs(index2, values2).unwrap();
let tsres = ts.cross_apply_left(&ts1,|a,b| (*a, match b { Some(v) => Some(*v), _ => None }));
let expected = vec![
TimeSeriesDataPoint { timestamp: 0, value: (1.00, Some(1.00)) },
TimeSeriesDataPoint { timestamp: 1, value: (2.00, Some(2.00)) },
TimeSeriesDataPoint { timestamp: 2, value: (3.00, Some(4.0)) },
TimeSeriesDataPoint { timestamp: 3, value: (4.00, None) },
TimeSeriesDataPoint { timestamp: 4, value: (5.00, None) },
];
let ts_expected = TimeSeries::from_tsdatapoints(expected).unwrap();
assert_eq!(ts_expected, tsres)
鉴于这是一个以时间序列为重点的库,我们还有“至”应用
let values = vec![1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0];
let index = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let ts = TimeSeries::from_vecs(index.iter().map(|x| NaiveDateTime::from_timestamp(*x,0)).collect(), values).unwrap();
let values2 = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0];
let index2 = vec![2, 4, 5, 7, 8, 10];
let ts_join = TimeSeries::from_vecs(index2.iter().map(|x| NaiveDateTime::from_timestamp(*x,0)).collect(), values2).unwrap();
let result = ts.merge_apply_asof(&ts_join,Some(chrono_utils::merge_asof_prior(Duration::seconds(1))),|a,b| (*a, match b {
Some(x) => Some(*x),
None => None
}), MergeAsofMode::RollPrior);
let expected = vec![
TimeSeriesDataPoint { timestamp: NaiveDateTime::from_timestamp(1,0), value: (1.00, None) },
TimeSeriesDataPoint { timestamp: NaiveDateTime::from_timestamp(2,0), value: (1.00, Some(1.00)) },
TimeSeriesDataPoint { timestamp: NaiveDateTime::from_timestamp(3,0), value: (1.00, Some(1.00)) },
TimeSeriesDataPoint { timestamp: NaiveDateTime::from_timestamp(4,0), value: (1.00, Some(2.00)) },
TimeSeriesDataPoint { timestamp: NaiveDateTime::from_timestamp(5,0), value: (1.00, Some(3.00)) },
TimeSeriesDataPoint { timestamp: NaiveDateTime::from_timestamp(6,0), value: (1.00, Some(3.00)) },
TimeSeriesDataPoint { timestamp: NaiveDateTime::from_timestamp(7,0), value: (1.00, Some(4.00)) },
TimeSeriesDataPoint { timestamp: NaiveDateTime::from_timestamp(8,0), value: (1.00, Some(5.00)) },
TimeSeriesDataPoint { timestamp: NaiveDateTime::from_timestamp(9,0), value: (1.00, Some(5.00)) },
TimeSeriesDataPoint { timestamp: NaiveDateTime::from_timestamp(10,0), value: (1.00, Some(6.00)) },
];
let ts_expected = TimeSeries::from_tsdatapoints(expected).unwrap();
assert_eq!(result, ts_expected);
最后,您也可以轻松地将多个序列连接在一起
let ts = TimeSeries::from_vecs(index.clone(), values).unwrap();
let ts1 = TimeSeries::from_vecs(index.clone(), values2).unwrap();
let ts2 = TimeSeries::from_vecs(index.clone(), values3).unwrap();
let ts3 = TimeSeries::from_vecs(index, values4).unwrap();
let tsres = n_inner_join!(ts,&ts1,&ts2,&ts3);
各种时间序列功能通常实现为迭代器。例如,移位...
let tslag: TimeSeries<NaiveDateTime,f64> = ts.shift(-1).collect();
API的语言设计旨在通用,所以上面的意思是“滞后”,并且
let tsfwd: TimeSeries<NaiveDateTime,f64> = ts.shift(1).collect();
表示“向前滚动”
TSXLIB-RS还有一个“跳过”运算符。即如果您想实现差异,可以编写
fn change_func(prior: &f64, curr: &f64) -> f64{
curr - prior
};
let ts_diff: TimeSeries<NaiveDateTime,f64> = ts.skip_apply(1, change_func).collect();
相反,如果您想实现百分比变化,可以编写
fn change_func(prior: &f64, curr: &f64) -> f64{
(curr - prior)/prior
};
let ts_perc_ch: TimeSeries<NaiveDateTime,f64> = ts.skip_apply(1, change_func).collect();
TSXLIB-RS还具有滚动窗口操作。它们可以使用缓冲区实现
let values = vec![1.0, 1.0, 1.0, 1.0, 1.0];
let index = (0..values.len()).map(|i| NaiveDateTime::from_timestamp(60 * i as i64,0)).collect();
let ts = TimeSeries::from_vecs(index, values).unwrap();
fn roll_func(buffer: &Vec<f64>) -> f64{
buffer.iter().sum()
};
let tsrolled: TimeSeries<NaiveDateTime,f64> = ts.apply_rolling(2, roll_func).collect();
或通过更新函数实现。请注意,这将更有效率,因为您不需要在内存中保留缓冲区
fn update(prior: Option<f64>, next: &f64) -> Option<f64>{
let v = match prior.is_some(){
true => prior.unwrap(),
false => 0.0
};
Some(v + next)
};
fn decrement(next: Option<f64>, prior: &f64) -> Option<f64>{
let v = match next.is_some(){
true => next.unwrap(),
false => 0.0
};
Some(v - prior)
};
let tsrolled: TimeSeries<NaiveDateTime,f64> = ts.apply_updating_rolling(2, update, decrement).collect();
TSXLIB-RS还支持在索引上的聚合
let result = ts.resample_and_agg(Duration::minutes(15), |dt,dur| timeutils::round_up_to_nearest_duration(dt, dur), |x| *x.last().unwrap().value);
有关更全面/可运行的示例,请查看测试和示例!
基准性能
我们在这里运行的基准包括生成一系列999,997个双精度浮点数,使用chrono
NaiveDateTime作为毫秒级精度的键。然后对这些数据进行滞后处理和连接。然后将其四舍五入到条。在两个基准测试中,都取了最后一个值(但您可以使用任何UDF来生成此聚合)。最后,我们将数据转换为一个简单的结构体,其中包含以下字段
#[derive(Clone,Copy,Serialize,Default)]
struct SimpleStruct{
pub timestamp: i64,
pub floatvalueother: f64,
pub floatvalue: f64
};
在最后两个基准测试中,我们将上述结构体转换为
#[derive(Clone,Copy,Serialize,Default)]
struct OtherStruct{
pub timestamp: i64,
pub ratio: f64
};
使用以下UDF
fn complicated(x: &SimpleStruct) -> OtherStruct{
if x.timestamp & 1 == 0 {
OtherStruct {timestamp:x.timestamp,ratio:x.floatvalue}
}
else{
OtherStruct {timestamp:x.timestamp,ratio:x.floatvalue/x.floatvalueother}
}
}
这是为了模拟该库的相对现实的应用场景。这绝对不是它所能完成的事情的渐近极限。
系统规格
处理器 | Intel Core i7-9750H @ 2.60GHz |
RAM | 16 gb DDR4 |
结果
根据Cargo.toml中的发布设置进行编译。
要在您的机器上运行,请编译
cargobuild --examples --release --features "parq"
然后运行 benchmark.exe
基准 | # 次数 | 总时间 (s) | 平均值 (ms) | 最小值 (ms) | 最大值 (ms) |
---|---|---|---|---|---|
通过值迭代器映射浮点数 | 100 | 1.72 | 17.20 | 15.88 | 26.41 |
通过引用迭代器映射浮点数 | 100 | 1.20 | 11.96 | 11.21 | 14.54 |
通过本地方法映射浮点数 | 100 | 0.53 | 5.30 | 4.91 | 6.60 |
滞后1 | 100 | 1.88 | 18.78 | 17.4 | 26.16 |
交叉应用(内连接) | 100 | 7.88 | 78.78 | 75.9 | 94.27 |
交叉应用(内连接)不同长度 | 100 | 8.02 | 80.24 | 75.7 | 91.19 |
交叉应用(左连接)不同长度 | 100 | 8.32 | 83.18 | 78.7 | 99.33 |
条数据向上取整 | 100 | 8.09 | 80.89 | 77.7 | 88.17 |
条数据向下取整 | 100 | 7.74 | 77.42 | 74.0 | 85.27 |
通过迭代器映射结构体 | 100 | 2.33 | 23.29 | 20.5 | 31.42 |
通过本地方法映射结构体:总计 | 100 | 1.19 | 11.88 | 11.1 | 14.07 |
特性/待办事项列表
特性 | 支持 | 类别 | 编译器选项 | Rust 版本 |
---|---|---|---|---|
时间过滤器 | ✔ | 核心 | >=1.48 | |
位置索引 | ✔ | 核心 | >=1.48 | |
键索引 | ✔ | 核心 | >=1.48 | |
移位 | ✔ | 核心 | >=1.48 | |
内连接(合并 & 哈希连接) | ✔ | 核心 | >=1.48 | |
左连接(合并 & 哈希连接) | ✔ | 核心 | >=1.48 | |
“至”连接(合并) | ✔ | 核心 | >=1.48 | |
多个内连接 | ✔ | 核心 | >=1.48 | |
连接/交织 | ✔ | 核心 | >=1.48 | |
时间聚合 | ✔ | 核心 | >=1.48 | |
具有chrono索引的时间聚合辅助程序 | ✔ | 特殊化 | >=1.48 | |
具有int索引的时间聚合辅助程序 | ✔ | 特殊化 | >=1.48 | |
闭包应用(用户定义函数) | ✔ | 核心 | >=1.48 | |
SIMD 支持 | 核心 | >=1.48 | ||
本地空值填充/插值 | 核心 | >=1.48 | ||
基于缓冲区的移动窗口操作 | ✔ | 核心 | >=1.48 | |
基于更新的移动窗口操作 | ✔ | 核心 | >=1.48 | |
“跳过”操作(即差异等) | ✔ | 核心 | >=1.48 | |
Rust 迭代器 | ✔ | 核心 | >=1.48 | |
有序 Rust 迭代器 | ✔ | 核心 | >=1.48 | |
流式迭代器 | ✔ | 核心 | >=1.48 | |
CSV IO* | ✔ | IO | >=1.48 | |
JSON IO* | ✔ | IO | "json" | >=1.48 |
Parquet IO* | ✔ | IO | "parq" | Nightly |
Avro IO | IO | >=1.48 | ||
Flatbuffer IO | IO | >=1.48 | ||
Apache Kafka IO | IO | >=1.48 | ||
Protocol buffer IO | IO | >=1.48 | ||
原始值类型的直观API | 特殊化 | >=1.48 | ||
本地多线程 | 核心 | >=1.48 | ||
全面的文档 | 元信息 | >=1.48 | ||
测试覆盖率 | 元信息 | >=1.48 | ||
更多示例 | 元信息 | >=1.48 |
标记为“*”的功能需要额外的性能调整,可能还需要将其重构为更通用的框架。请注意,尽管兼容性只列出了 Rust >=1.48,但 TSXLIB-RS 也可能与较低的 Rust 版本一起工作,只是尚未经过测试。
许可证
根据以下任一许可证授权
贡献
除非您明确说明,否则根据 Apache-2.0 许可证定义的任何有意提交以包含在您的工作中的贡献,都应如上所述双重许可,没有任何附加条款或条件。
依赖项
~3–12MB
~143K SLoC