2 个不稳定版本
0.2.0 | 2024年1月31日 |
---|---|
0.1.0 | 2024年1月29日 |
#991 在 数据库接口
23KB
353 行
Redis 作为简单时间序列数据库
这个库允许你使用纯 Redis 安装来存储简单的时间序列数据。这并不是为了取代时间序列数据库;如果你有可用的,请使用它。同样,Redis 有一个或多个插件,允许你更有效地存储时间序列数据。如果可用,使用这些方法存储数据可能更合适。
但是,如果你已经在使用 Redis,并且受到限制,无法添加和维护特殊插件,这个库可以用于缓存和维护时间序列数据。
Redis 主要是一个键/值存储,但提供了许多存储不同数据类型的附加功能。其中之一称为“有序集合”,最初是为了进行网站投票排名而创建的,但可以相反地使用浮点数(time_t)值作为时间戳来跟踪时间序列数据。
在这样做的时候,有序集合的“键”成为被跟踪的数据,“分数”是时间索引。这样做的主要缺点是有序集合的“键”(因此我们的数据)必须是唯一的。为了确保唯一性,可以在发送到 Redis 之前将数据与时间戳组合成一个元组或数组。这为每个点创建了一个冗余的时间戳,但实际上简化了数据检索,因为不需要重新组合索引来获取时间/数据对。从 Redis 检索的值都包含时间/数据信息。
数据库连接
使用这个库,每个数据集都由 Rust 的 TimeSeries<T>
泛型类型表示。这包含一个数据库连接,可以用来添加或查询单个时间序列的数据。
在 Redis 中,该系列被指定为有序的“Z”集合,由命名空间和系列名称标识。命名空间通常是创建数据的应用程序的名称。这样,应用程序就可以使用任何时间序列名称,而不用担心名称冲突。只需要在主机上保持命名空间的唯一性。
它可以在 Rust 中这样使用
use redis-zset-ts::TimeSeries;
let mut series = TimeSeries::<f64>("analog", "chan0").unwrap();
这将创建一个连接到本地主机的 Redis 连接,用于管理命名空间 analog 的时间序列,系列名称为 chan0。每个数据点将是一个单一的 64 位浮点值。
在数据库中,命名空间和系列名称仅通过冒号":"简单连接。因此,在Redis中的该系列名称将是 "analog:chan0"。
通常在边缘设备或物联网网关中,我们会使用同一主机上的本地Redis数据库,但该库也可以用于连接到任意主机,或任何URI。
pub fn with_host(host: &str, namespace: &str, name: &str) -> Result<Self> { ... }
pub fn with_uri(uri: &str, namespace: &str, name: &str) -> Result<Self> { ... }
时间序列
在使用此库的应用程序中,数据点可以用 TimeValue<T>
来表示
/// A timestamped value
pub struct TimeValue<T> {
/// The timestamp
pub timestamp: f64,
/// The value
pub value: T,
}
为了与Redis完全兼容,类型T必须能够使用serde进行序列化和反序列化,因此
T: serde::Serialize + serde::DeserializeOwned
为了方便起见,时间值也可以转换为和从元组形式转换,并进行操作
(f64, T)
例如
let tv = TimeValue::from((0.0, 42));
let tup = tv.into_tuple();
时间戳 是从UNIX纪元(即浮点 time_t)以来的秒数的64位浮点表示,具有至少微秒的分辨率。它与Python的time函数 time.time() 直接兼容。它可以用来获取当前时间或从任何Rust SystemTime
获取
let ts = redis_zset_ts::timestamp(); // The current time
use std::time::{SystemTime, Duration};
let sys_time = SystemTime::now() - Duration::from_secs(10);
let ts = redis_zset_ts::as_timestamp(sys_time); // 10 seconds ago
let ts = redis__zset_ts::timestamp() - 10.0; // also 10 seconds ago
数据通过MsgPack将时间和值组合成元组,然后序列化为字节流,如下所示
rmps::encode::to_vec_named(&(v.timestamp, &v.value))
数据插入
可以使用单个点将值插入Redis数据库
let v: i32 = read_some_value();
series.add(timestamp(), v);
或者只需使用当前时间作为
series.add_now(v);
或者使用一个 TimeValue:
let tv = TimeValue::new(v); // Uses the current time
series.add_value(tv);
可以一次性高效地添加多个值
let vals: Vec<_> = (0..5)
.into_iter()
.map(|_| TimeValue::new(read_some_values())
.collect();
series.add_multiple(&vals);
数据检索
可以使用时间范围来查询数据。范围可以是浮点时间戳或 SystemTime
值。使用时间戳可以方便地进行简单的秒数范围减法
// Retrieve the last minute of data
let now = timestamp();
let vals = series.get_range(now-60.0, now).unwrap();
时间点可以使用Redis中特殊的字符串如 "-inf" 和 "+inf"
// Gets the whole time series (all points)
let vals = series.get_range_any("-inf", "+inf").unwrap();
删除数据
可以通过时间点轻松清除最旧的数据,擦除任何早于它的值
// Erase any data older than 10min
let ts = timestamp();
series.purge(ts - 600.0);
这可以定期进行,或者在添加新点时进行,以保持Redis中的移动窗口数据,如最后一分钟、十分钟、一小时、一天等
let ts = timestamp();
series.add(ts, read_some_value()); // Insert a new value, then
series.purge(ts - 600.0); // erase any data older than 10min
可以通过单个调用从Redis中删除整个系列
series.delete();
语言和系统兼容性
Redis客户端存在于所有主要语言中。此库使用MsgPack将数据序列化到Redis中,MsgPack也在大量语言中有实现。应该很容易将此库创建的数据与其他大多数语言和平台共享。
依赖关系
~3.5–4.5MB
~113K SLoC