#influx-db #时间序列 #reqwest-client

rinfluxdb-flux

一个用于查询和向 InfluxDB 发布数据的库

1 个不稳定版本

0.2.0 2021年11月27日

#2605数据库接口

34 每月下载量
用于 rinfluxdb

MIT/Apache

31KB
618

Rust InfluxDB 库

一个用于查询和向 InfluxDB 发送数据的库。

https://gitlab.com/claudiomattera/rinfluxdb

功能

  • 将数据序列化到 InfluxDB 行协议;
  • 在 Rust 中构建 InfluxQL 查询;
  • 在 Rust 中构建 FLUX 查询;
  • 解析来自 InfluxDB 的响应;
  • (可选) 基于 Reqwest 的客户端以执行常见查询。
  • 解析 InfluxQL 查询返回的 InfluxDB JSON 中的数据框;
  • 解析 FLUX 查询返回的 InfluxDB 注释 CSV 中的数据框;
  • (可选) Reqwest 对象包装器以构建请求和解析响应;

将数据序列化为 InfluxDB 行协议

行协议用于将数据发送到 InfluxDB。

use rinfluxdb::line_protocol::LineBuilder;
use chrono::{TimeZone, Utc};

let line = LineBuilder::new("location")
    .insert_field("latitude", 55.383333)
    .insert_field("longitude", 10.383333)
    .insert_tag("city", "Odense")
    .set_timestamp(Utc.ymd(2014, 7, 8).and_hms(9, 10, 11))
    .build();

assert_eq!(line.measurement(), &"location".into());
assert_eq!(line.field("latitude"), Some(&55.383333.into()));
assert_eq!(line.field("longitude"), Some(&10.383333.into()));
assert_eq!(line.tag("city"), Some(&"Odense".into()));
assert_eq!(line.timestamp(), Some(&Utc.ymd(2014, 7, 8).and_hms(9, 10, 11)));

assert_eq!(
    line.to_string(), 
    "location,city=Odense latitude=55.383333,longitude=10.383333 1404810611000000000"
);

在 Rust 中构建 InfluxQL 查询

可以使用 influxql::QueryBuilder 构建 InfluxQL 查询。

use rinfluxdb::influxql::QueryBuilder;
use chrono::{TimeZone, Utc};

let query = QueryBuilder::from("indoor_environment")
    .field("temperature")
    .field("humidity")
    .start(Utc.ymd(2021, 3, 7).and_hms(21, 0, 0))
    .build();

assert_eq!(
    query.as_ref(),
    "SELECT temperature, humidity \
    FROM indoor_environment \
    WHERE time > '2021-03-07T21:00:00Z'",
);

在 Rust 中构建 FLUX 查询

可以使用 flux::QueryBuilder 构建 FLUX 查询。

use rinfluxdb::types::Duration;
use rinfluxdb::flux::QueryBuilder;

let query = QueryBuilder::from("telegraf/autogen")
    .range_start(Duration::Minutes(-15))
    .filter(
        r#"r._measurement == "cpu" and
        r._field == "usage_system" and
        r.cpu == "cpu-total""#
    )
    .build();

assert_eq!(
    query.as_ref(),
    r#"from(bucket: "telegraf/autogen")
  |> range(start: -15m)
  |> filter(fn: (r) =>
    r._measurement == "cpu" and
    r._field == "usage_system" and
    r.cpu == "cpu-total"
  )
  |> yield()"#,
);

解析 InfluxDB 的响应

向 InfluxDB 发送查询时,它将回复包含数据框列表的 JSON 或注释 CSV 内容。该库允许将此类回复解析为用户定义的数据框类型。

数据框必须可以从其名称(一个字符串)、其索引(一个时间点的向量)和其列(列名称到值向量的映射)构建。

数据框实现只需要实现此特性即可与该包一起使用。例如,只要为给定的类型 DF (和类型 E 实现 Into<ParseError>)实现了 TryFrom<(String, Vec<DateTime<Utc>>, HashMap<String, Vec<Value>>), Error = E>),解析器可以使用它来构建最终对象。

一个数据框的示例实现为 dataframe::DataFrame,但该特性可以用于许多其他现有库。

解析 InfluxQL 查询返回的 JSON

InfluxQL 查询的 JSON 响应可以解析为数据框。

use rinfluxdb::influxql::{ResponseError, StatementResult, from_str};
use rinfluxdb::dataframe::DataFrame;

let input: String = todo!();

let response: Result<Vec<StatementResult<DataFrame>>, ResponseError> =
    from_str(&input);

解析 FLUX 查询返回的注释 CSV

可以解析FLUX查询的注释CSV响应。

use rinfluxdb::flux::{ResponseError, from_str};
use rinfluxdb::dataframe::DataFrame;

let input: String = todo!();

let response: Result<DataFrame, ResponseError> = from_str(&input);

(可选)使用基于Reqwest的客户端执行常见查询

上述功能可用于将查询和数据序列化为原始文本,并将它们集成到现有应用程序中。作为替代,此库还实现了一个基于Reqwest的可选客户端API,以直接与InfluxDB实例交互。提供了阻塞和非阻塞客户端,并支持常见查询。

使用client Cargo功能启用客户端。

使用InfluxQL查询InfluxDB

# use std::collections::HashMap;
# use url::Url;
#
use rinfluxdb::influxql::QueryBuilder;
use rinfluxdb::influxql::blocking::Client;
use rinfluxdb::dataframe::DataFrame;

let client = Client::new(
    Url::parse("https://example.com/")?,
    Some(("username", "password")),
)?;

let query = QueryBuilder::from("indoor_environment")
    .database("house")
    .field("temperature")
    .field("humidity")
    .build();
let dataframe: DataFrame = client.fetch_dataframe(query)?;
println!("{}", dataframe);

let query = QueryBuilder::from("indoor_environment")
    .database("house")
    .field("temperature")
    .field("humidity")
    .group_by("room")
    .build();
let tagged_dataframes: HashMap<String, DataFrame> = 
    client.fetch_dataframes_by_tag(query, "room")?;
for (tag, dataframe) in tagged_dataframes {
    println!("{}: {}", tag, dataframe);
}

# Ok::<(), rinfluxdb::influxql::ClientError>(())

使用FLUX查询InfluxDB

unimplemented!()

使用行协议将数据发送到InfluxDB

# use url::Url;
#
use rinfluxdb::line_protocol::LineBuilder;
use rinfluxdb::line_protocol::blocking::Client;

let client = Client::new(
    Url::parse("https://example.com/")?,
    Some(("username", "password")),
)?;

let lines = vec![
    LineBuilder::new("measurement")
        .insert_field("field", 42.0)
        .build(),
    LineBuilder::new("measurement")
        .insert_field("field", 43.0)
        .insert_tag("tag", "value")
        .build(),
];

client.send("database", &lines)?;

# Ok::<(), rinfluxdb::line_protocol::ClientError>(())

(可选)Reqwest对象的包装器,用于构造请求和解析响应

此crate通过HTTP(s)与InfluxDB实例通信。HTTP请求和响应的内容必须遵循InfluxDB规范和协议,但可以进行其他自定义,例如添加基本认证。

为了确保最大程度的自由,构建了一个围绕Reqwest的Client的小型包装器,以便它可以创建一个已准备与InfluxDB通信的请求构建器。然后将该构建器转换为常规请求构建器并执行。

# use url::Url;
#
use rinfluxdb::influxql::Query;

// Bring into scope the trait implementation
use rinfluxdb::influxql::blocking::InfluxqlClientWrapper;

// Create Reqwest client
let client = reqwest::blocking::Client::new();

// Create InfluxQL request
let base_url = Url::parse("https://example.com")?;
let mut builder = client
    // (this is a function added by the trait above)
    .influxql(&base_url)?
    // (this functions are defined on influxql::RequestBuilder)
    .database("house")
    .query(Query::new("SELECT temperature FROM indoor_temperature"))
    // (this function returns a regular Reqwest builder)
    .into_reqwest_builder();

// Now this is a regular Reqwest builder, and can be customized as usual
if let Some((username, password)) = Some(("username", "password")) {
    builder = builder.basic_auth(username, Some(password));
}

// Create a request from the builder
let request = builder.build()?;

// Execute the request through Reqwest and obtain a response
let response = client.execute(request)?;

# Ok::<(), rinfluxdb::influxql::ClientError>(())

同样,构建了一个围绕Reqwest的Response的小型包装器,以便向其中添加一个解析数据帧的新功能。

# use std::collections::HashMap;
#
# use url::Url;
#
use rinfluxdb::influxql::Query;

use rinfluxdb::influxql::StatementResult;
use rinfluxdb::influxql::blocking::InfluxqlClientWrapper;
use rinfluxdb::dataframe::DataFrame;

// Bring into scope the trait implementation
use rinfluxdb::influxql::blocking::InfluxqlResponseWrapper;

// Create Reqwest client
let client = reqwest::blocking::Client::new();

// Create InfluxQL request
let base_url = Url::parse("https://example.com")?;
let mut request = client
    .influxql(&base_url)?
    .database("house")
    .query(Query::new("SELECT temperature FROM indoor_temperature"))
    .into_reqwest_builder()
    .build()?;

// Execute the request through Reqwest and obtain a response
let response = client.execute(request)?;

// Return an error if response status is not 200
// (this is a function from Reqwest's response)
let response = response.error_for_status()?;

// Parse the response from JSON to a list of dataframes
// (this is a function added by the trait above)
let results: Vec<StatementResult<DataFrame>> = response.dataframes()?;

# Ok::<(), rinfluxdb::influxql::ClientError>(())

为Reqwest的阻塞API(influxql::blocking::InfluxqlClientWrapperinfluxql::blocking::InfluxqlResponseWrapper)和异步API(influxql::r#async::InfluxqlClientWrapperinfluxql::r#async::InfluxqlResponseWrapper)定义了包装器,并使用client Cargo功能启用。

用法

此crate是较小的crate的简单聚合器,每个crate都通过Cargo功能启用,并实现InfluxDB支持的具体部分。

rinfluxdb
├── rinfluxdb-types
├── rinfluxdb-lineprotocol
├── rinfluxdb-influxql
├── rinfluxdb-flux
└── rinfluxdb-dataframe

客户端可以依赖于启用所需功能的rinfluxdb,或者可以显式依赖于rinfluxdb-* crate。

[dependencies.rinfluxdb]
version = "0.2.0"
features = ["lineprotocol", "influxql", "client"]

# Or

[dependencies]
rinfluxdb-lineprotocol = { version = "0.2.0", features = ["client"] }
rinfluxdb-influxql = { version = "0.2.0", features = ["client"] }

Cargo功能

此crate支持以下Cargo功能。

  • lineprotocol:重新导出rinfluxdb-lineprotocol crate;
  • influxql:重新导出rinfluxdb-influxql crate;
  • flux:重新导出rinfluxdb-flux crate;
  • dataframe:重新导出rinfluxdb-dataframe crate;
  • client:在所有rinfluxdb-* crate中启用功能client

当功能client启用时,这些crate定义了行协议、InfluxQL和FLUX的客户端。客户端使用Reqwest实现,并提供了阻塞和异步模式。

许可证

版权所有Claudio Mattera 2021

您可以在归功于原作者的情况下自由复制、修改和分发此应用程序,具体条款如下:

由您选择。

本项目是完全原创的作品,与InfluxData无任何关联,也未获得其任何形式的认可或支持。

依赖项

~3–15MB
~191K SLoC