1 个不稳定版本
0.2.0 | 2021年11月27日 |
---|
#2106 在 数据库接口
59 每月下载量
在 rinfluxdb 中使用
71KB
1K SLoC
Rust InfluxDB 库
一个用于查询和向InfluxDB发送数据的库。
https://gitlab.com/claudiomattera/rinfluxdb
特性
- 将数据序列化到InfluxDB行协议;
- 在Rust中构建InfluxQL查询;
- 在Rust中构建FLUX查询;
- 解析InfluxDB的响应;
- (可选) 基于Reqwest的客户端来执行常见查询。
- 解析InfluxQL查询返回的InfluxDB JSON数据框;
- 解析FLUX查询返回的InfluxDB注释CSV数据框;
- (可选) Reqwest对象的包装器,用于构造请求和解析响应;
将数据序列化到InfluxDB行协议
使用行协议将数据发送到InfluxDB。
use rinfluxdb::line_protocol::LineBuilder;
use chrono::{TimeZone, Utc};
let line = LineBuilder::new("location")
.insert_field("latitude", 55.383333)
.insert_field("longitude", 10.383333)
.insert_tag("city", "Odense")
.set_timestamp(Utc.ymd(2014, 7, 8).and_hms(9, 10, 11))
.build();
assert_eq!(line.measurement(), &"location".into());
assert_eq!(line.field("latitude"), Some(&55.383333.into()));
assert_eq!(line.field("longitude"), Some(&10.383333.into()));
assert_eq!(line.tag("city"), Some(&"Odense".into()));
assert_eq!(line.timestamp(), Some(&Utc.ymd(2014, 7, 8).and_hms(9, 10, 11)));
assert_eq!(
line.to_string(),
"location,city=Odense latitude=55.383333,longitude=10.383333 1404810611000000000"
);
在Rust中构建InfluxQL查询
可以使用 influxql::QueryBuilder
构建InfluxQL查询。
use rinfluxdb::influxql::QueryBuilder;
use chrono::{TimeZone, Utc};
let query = QueryBuilder::from("indoor_environment")
.field("temperature")
.field("humidity")
.start(Utc.ymd(2021, 3, 7).and_hms(21, 0, 0))
.build();
assert_eq!(
query.as_ref(),
"SELECT temperature, humidity \
FROM indoor_environment \
WHERE time > '2021-03-07T21:00:00Z'",
);
在Rust中构建FLUX查询
可以使用 flux::QueryBuilder
构建FLUX查询。
use rinfluxdb::types::Duration;
use rinfluxdb::flux::QueryBuilder;
let query = QueryBuilder::from("telegraf/autogen")
.range_start(Duration::Minutes(-15))
.filter(
r#"r._measurement == "cpu" and
r._field == "usage_system" and
r.cpu == "cpu-total""#
)
.build();
assert_eq!(
query.as_ref(),
r#"from(bucket: "telegraf/autogen")
|> range(start: -15m)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r._field == "usage_system" and
r.cpu == "cpu-total"
)
|> yield()"#,
);
解析InfluxDB的响应
向InfluxDB发送查询时,它会以JSON或注释CSV内容的形式回复,包含一系列数据框。此库允许将此类回复解析为用户定义的数据框类型。
数据框必须可以从其名称(一个字符串)、其索引(一个瞬间向量)和其列(列名到值向量的映射)构造。
仅需要实现此特性,数据框架实现就可以与该crate一起使用。也就是说,只要为给定的类型DF
(以及类型E
实现了Into<ParseError>
)实现了trait TryFrom<(String, Vec<DateTime<Utc>>, HashMap<String, Vec<Value>>), Error = E>
),解析器就可以使用它来构建最终的对象。
数据框架的一个空实现可以作为dataframe::DataFrame
使用,但可以为许多其他现有库实现此特性。
解析 InfluxQL 查询返回的 JSON
InfluxQL查询的JSON响应可以解析成数据框架。
use rinfluxdb::influxql::{ResponseError, StatementResult, from_str};
use rinfluxdb::dataframe::DataFrame;
let input: String = todo!();
let response: Result<Vec<StatementResult<DataFrame>>, ResponseError> =
from_str(&input);
解析 FLUX 查询返回的注解 CSV
FLUX查询的注解CSV响应可以解析。
use rinfluxdb::flux::{ResponseError, from_str};
use rinfluxdb::dataframe::DataFrame;
let input: String = todo!();
let response: Result<DataFrame, ResponseError> = from_str(&input);
(可选)使用基于 Reqwest 的客户端执行常见查询
上述函数可用于序列化和反序列化查询和数据到原始文本,并且它们可以集成到现有应用程序中。作为替代,此库还实现了基于Reqwest的可选客户端API,以直接与InfluxDB实例交互。提供阻塞和非阻塞客户端,并支持常见查询。
使用client
Cargo功能启用客户端。
使用 InfluxQL 查询 InfluxDB
# use std::collections::HashMap;
# use url::Url;
#
use rinfluxdb::influxql::QueryBuilder;
use rinfluxdb::influxql::blocking::Client;
use rinfluxdb::dataframe::DataFrame;
let client = Client::new(
Url::parse("https://example.com/")?,
Some(("username", "password")),
)?;
let query = QueryBuilder::from("indoor_environment")
.database("house")
.field("temperature")
.field("humidity")
.build();
let dataframe: DataFrame = client.fetch_dataframe(query)?;
println!("{}", dataframe);
let query = QueryBuilder::from("indoor_environment")
.database("house")
.field("temperature")
.field("humidity")
.group_by("room")
.build();
let tagged_dataframes: HashMap<String, DataFrame> =
client.fetch_dataframes_by_tag(query, "room")?;
for (tag, dataframe) in tagged_dataframes {
println!("{}: {}", tag, dataframe);
}
# Ok::<(), rinfluxdb::influxql::ClientError>(())
使用 FLUX 查询 InfluxDB
unimplemented!()
使用行协议将数据发送到 InfluxDB
# use url::Url;
#
use rinfluxdb::line_protocol::LineBuilder;
use rinfluxdb::line_protocol::blocking::Client;
let client = Client::new(
Url::parse("https://example.com/")?,
Some(("username", "password")),
)?;
let lines = vec![
LineBuilder::new("measurement")
.insert_field("field", 42.0)
.build(),
LineBuilder::new("measurement")
.insert_field("field", 43.0)
.insert_tag("tag", "value")
.build(),
];
client.send("database", &lines)?;
# Ok::<(), rinfluxdb::line_protocol::ClientError>(())
(可选)围绕 Reqwest 对象包装以构建请求和解析响应
此crate通过HTTP(s)与InfluxDB实例通信。HTTP请求和响应的内容必须遵循InfluxDB规范和协议,但可以进行其他定制,例如添加基本身份验证。
为了确保最大的自由度,在Reqwest的Client
周围构建了一个小型包装器,以便它可以创建一个已准备好与InfluxDB通信的请求构建器。然后可以将此构建器转换为常规请求构建器并执行。
# use url::Url;
#
use rinfluxdb::influxql::Query;
// Bring into scope the trait implementation
use rinfluxdb::influxql::blocking::InfluxqlClientWrapper;
// Create Reqwest client
let client = reqwest::blocking::Client::new();
// Create InfluxQL request
let base_url = Url::parse("https://example.com")?;
let mut builder = client
// (this is a function added by the trait above)
.influxql(&base_url)?
// (this functions are defined on influxql::RequestBuilder)
.database("house")
.query(Query::new("SELECT temperature FROM indoor_temperature"))
// (this function returns a regular Reqwest builder)
.into_reqwest_builder();
// Now this is a regular Reqwest builder, and can be customized as usual
if let Some((username, password)) = Some(("username", "password")) {
builder = builder.basic_auth(username, Some(password));
}
// Create a request from the builder
let request = builder.build()?;
// Execute the request through Reqwest and obtain a response
let response = client.execute(request)?;
# Ok::<(), rinfluxdb::influxql::ClientError>(())
类似地,在Reqwest的Response
周围构建了一个小型包装器,以便添加一个新功能来从其中解析数据框架。
# use std::collections::HashMap;
#
# use url::Url;
#
use rinfluxdb::influxql::Query;
use rinfluxdb::influxql::StatementResult;
use rinfluxdb::influxql::blocking::InfluxqlClientWrapper;
use rinfluxdb::dataframe::DataFrame;
// Bring into scope the trait implementation
use rinfluxdb::influxql::blocking::InfluxqlResponseWrapper;
// Create Reqwest client
let client = reqwest::blocking::Client::new();
// Create InfluxQL request
let base_url = Url::parse("https://example.com")?;
let mut request = client
.influxql(&base_url)?
.database("house")
.query(Query::new("SELECT temperature FROM indoor_temperature"))
.into_reqwest_builder()
.build()?;
// Execute the request through Reqwest and obtain a response
let response = client.execute(request)?;
// Return an error if response status is not 200
// (this is a function from Reqwest's response)
let response = response.error_for_status()?;
// Parse the response from JSON to a list of dataframes
// (this is a function added by the trait above)
let results: Vec<StatementResult<DataFrame>> = response.dataframes()?;
# Ok::<(), rinfluxdb::influxql::ClientError>(())
为Reqwest的阻塞API(influxql::blocking::InfluxqlClientWrapper
,influxql::blocking::InfluxqlResponseWrapper
)和异步API(influxql::r#async::InfluxqlClientWrapper
,influxql::r#async::InfluxqlResponseWrapper
)定义了包装器,并且使用client
Cargo功能启用。
用法
此crate是较小的crate的简单聚合器,每个crate通过Cargo功能启用,并实现InfluxDB支持的一个特定部分。
rinfluxdb
├── rinfluxdb-types
├── rinfluxdb-lineprotocol
├── rinfluxdb-influxql
├── rinfluxdb-flux
└── rinfluxdb-dataframe
客户端可以依赖启用必要功能的rinfluxdb
,或者它们可以明确地依赖rinfluxdb-*
crates。
[dependencies.rinfluxdb]
version = "0.2.0"
features = ["lineprotocol", "influxql", "client"]
# Or
[dependencies]
rinfluxdb-lineprotocol = { version = "0.2.0", features = ["client"] }
rinfluxdb-influxql = { version = "0.2.0", features = ["client"] }
Cargo功能
此crate支持以下Cargo功能。
lineprotocol
:重新导出rinfluxdb-lineprotocol
crate;influxql
:重新导出rinfluxdb-influxql
crate;flux
:重新导出rinfluxdb-flux
crate;dataframe
:重新导出rinfluxdb-dataframe
crate;client
:在所有rinfluxdb-*
crate中启用client
功能。
当启用client
功能时,这些crate定义了行协议、InfluxQL和Flux的客户端。客户端使用Reqwest实现,提供阻塞和异步模式。
许可证
版权所有 Claudio Mattera 2021
您可以在归因的情况下免费复制、修改和分发此应用程序,具体条款如下:
- Apache License,版本 2.0 (LICENSE-Apache-2.0 或 https://opensource.org/licenses/Apache-2.0)
- MIT许可证 (LICENSE-MIT 或 https://opensource.org/licenses/MIT)
任选其一。
本项目完全是原创作品,与InfluxData无任何关联或支持。
依赖项
~2–18MB
~201K SLoC