10 个版本
0.3.12 | 2023年12月19日 |
---|---|
0.3.11 | 2022年9月7日 |
0.3.10 | 2022年6月15日 |
0.3.9 | 2022年2月10日 |
0.1.0 |
|
#331 在 数据库接口 中
每月下载量:55
585KB
14K SLoC
Apache IoTDB
Apache IoTDB(物联网数据库)是一个高性能的物联网原生数据库,适用于数据管理和分析,可在边缘和云端部署。由于其轻量级架构、高性能以及与Apache Hadoop、Spark和Flink的深度集成,Apache IoTDB能够满足物联网领域大规模数据存储、高速数据摄入和复杂数据分析的需求。
Apache IoTDB Rust 客户端
概述
这是Apache IoTDB的Rust客户端。
Apache IoTDB 网站: https://iotdb.apache.org Apache IoTDB Github: https://github.com/apache/iotdb
先决条件
apache-iotdb 0.12.0 及以上。
rust 1.56.0 及以上。
如何使用客户端(快速入门)
使用方法
将以下内容放入您的 Cargo.toml
[dependencies]
iotdb-client-rs="^0.3.12"
示例
将以下内容放入您的示例的 Cargo.toml
[dependencies]
iotdb-client-rs="^0.3.12"
chrono="0.4.19"
prettytable-rs="0.8.0"
structopt = "0.3.25"
use std::{collections::BTreeMap, vec};
use chrono::Local;
use iotdb::client::remote::{Config, RpcSession};
use iotdb::client::{MeasurementSchema, Result, RowRecord, Session, Tablet, Value};
use iotdb::protocal::{TSCompressionType, TSDataType, TSEncoding};
use prettytable::{cell, Row, Table};
use structopt::StructOpt;
fn main() {
run().expect("failed to run session_example.");
}
fn run() -> Result<()> {
#[derive(StructOpt)]
#[structopt(name = "session_example")]
struct Opt {
#[structopt(short = "h", long, default_value = "127.0.0.1")]
host: String,
#[structopt(short = "P", long, default_value = "6667")]
port: i32,
#[structopt(short = "u", long, default_value = "root")]
user: String,
#[structopt(short = "p", long, default_value = "root")]
password: String,
#[structopt(short = "c", long)]
clean: bool,
}
let opt = Opt::from_args();
// let config = Config {
// host: opt.host,
// port: opt.port,
// username: opt.user,
// password: opt.password,
// ..Default::default()
// };
// Create config object with builder
let config = Config::builder()
.host(opt.host)
.port(opt.port)
.username(opt.user)
.password(opt.password)
.build();
let mut session = RpcSession::new(config)?;
session.open()?;
//time_zone
let tz = session.get_time_zone()?;
if tz != "Asia/Shanghai" {
session.set_time_zone("Asia/Shanghai")?;
}
//set_storage_group
session.set_storage_group("root.ln1")?;
session.delete_storage_group("root.ln1")?;
//delete_storage_groups
session.set_storage_group("root.ln1")?;
session.set_storage_group("root.ln2")?;
session.delete_storage_groups(vec!["root.ln1", "root.ln2"])?;
//if storage group 'root.sg_rs' exist, remove it.
if opt.clean {
session
.delete_storage_group("root.sg_rs")
.unwrap_or_default();
}
//create_timeseries
{
session.create_timeseries(
"root.sg_rs.dev2.status",
TSDataType::Float,
TSEncoding::Plain,
TSCompressionType::SNAPPY,
Some(BTreeMap::from([
("prop1".to_owned(), "1".to_owned()),
("prop2".to_owned(), "2".to_owned()),
])),
Some(BTreeMap::from([
("attr1".to_owned(), "1".to_owned()),
("attr2".to_owned(), "2".to_owned()),
])),
Some(BTreeMap::from([
("tag1".to_owned(), "1".to_owned()),
("tag2".to_owned(), "2".to_owned()),
])),
Some("stats".to_string()),
)?;
session.delete_timeseries(vec!["root.sg_rs.dev2.status"])?;
}
//create_multi_timeseries
{
session.create_multi_timeseries(
vec!["root.sg3.dev1.temperature", "root.sg3.dev1.desc"],
vec![TSDataType::Float, TSDataType::Text],
vec![TSEncoding::Plain, TSEncoding::Plain],
vec![TSCompressionType::SNAPPY, TSCompressionType::SNAPPY],
None,
None,
None,
None,
)?;
session.delete_timeseries(vec!["root.sg3.dev1.temperature", "root.sg3.dev1.desc"])?;
}
//insert_record
{
session.insert_record(
"root.sg_rs.dev5",
vec!["online", "desc"],
vec![Value::Bool(false), Value::Text("F4145".to_string())],
Local::now().timestamp_millis(),
false,
)?;
session.delete_timeseries(vec!["root.sg_rs.dev5.online", "root.sg_rs.dev5.desc"])?;
}
//insert_string_record
{
session.insert_string_record(
"root.sg_rs.wf02.wt02",
vec!["id", "location"],
vec!["SN:001", "BeiJing"],
Local::now().timestamp_millis(),
false,
)?;
session.delete_timeseries(vec![
"root.sg_rs.wf02.wt02.id",
"root.sg_rs.wf02.wt02.location",
])?;
}
//insert_records
{
session.insert_records(
vec!["root.sg_rs.dev1"],
vec![vec![
"restart_count",
"tick_count",
"price",
"temperature",
"description",
"status",
]],
vec![vec![
Value::Int32(i32::MAX),
Value::Int64(1639704010752),
Value::Double(1988.1),
Value::Float(36.8),
Value::Text("Test Device 1".to_string()),
Value::Bool(false),
]],
vec![Local::now().timestamp_millis()],
)?;
session.delete_timeseries(vec![
"root.sg_rs.dev1.restart_count",
"root.sg_rs.dev1.tick_count",
"root.sg_rs.dev1.price",
"root.sg_rs.dev1.temperature",
"root.sg_rs.dev1.description",
"root.sg_rs.dev1.status",
])?;
}
//insert_records_of_one_device
{
session.insert_records_of_one_device(
"root.sg_rs.dev0",
vec![
Local::now().timestamp_millis(),
Local::now().timestamp_millis() - 1,
],
vec![
vec!["restart_count", "tick_count", "price"],
vec!["temperature", "description", "status"],
],
vec![
vec![Value::Int32(1), Value::Int64(2018), Value::Double(1988.1)],
vec![
Value::Float(36.8),
Value::Text("thermograph".to_string()),
Value::Bool(false),
],
],
false,
)?;
}
//tablet
let mut ts = Local::now().timestamp_millis();
let mut tablet1 = create_tablet(5, ts);
ts += 5;
let mut tablet2 = create_tablet(10, ts);
ts += 10;
let mut tablet3 = create_tablet(2, ts);
//insert_tablet
tablet1.sort();
session.insert_tablet(&tablet1)?;
//insert_tablets
{
tablet2.sort();
tablet3.sort();
session.insert_tablets(vec![&tablet2, &tablet3])?;
}
//delete_data
session.delete_data(vec!["root.sg_rs.dev1.status"], 1, 16)?;
//execute_query_statement
{
let dataset = session.execute_query_statement("select * from root.sg_rs.device2", None)?;
// Get columns, column types and values from the dataset
// For example:
let width = 18;
let column_count = dataset.get_column_names().len();
let print_line_sep =
|| println!("{:=<width$}", '=', width = (width + 1) * column_count + 1);
print_line_sep();
dataset
.get_column_names()
.iter()
.for_each(|c| print!("|{:>width$}", c.split('.').last().unwrap(), width = width));
println!("|");
print_line_sep();
dataset.get_data_types().iter().for_each(|t| {
let type_name = format!("{:?}", t);
print!("|{:>width$}", type_name, width = width)
});
println!("|");
print_line_sep();
dataset.for_each(|r| {
r.values.iter().for_each(|v| match v {
Value::Bool(v) => print!("|{:>width$}", v, width = width),
Value::Int32(v) => print!("|{:>width$}", v, width = width),
Value::Int64(v) => print!("|{:>width$}", v, width = width),
Value::Float(v) => print!("|{:>width$}", v, width = width),
Value::Double(v) => print!("|{:>width$}", v, width = width),
Value::Text(v) => print!("|{:>width$}", v, width = width),
Value::Null => print!("|{:>width$}", "null", width = width),
});
println!("|");
});
print_line_sep();
}
//execute_statement
{
let dataset = session.execute_statement("show timeseries", None)?;
let mut table = Table::new();
table.set_titles(Row::new(
dataset
.get_column_names()
.iter()
.map(|c| cell!(c))
.collect(),
));
dataset.for_each(|r: RowRecord| {
table.add_row(Row::new(
r.values.iter().map(|v: &Value| cell!(v)).collect(),
));
});
table.printstd();
}
//execute_batch_statement
{
session.execute_batch_statement(vec![
"insert into root.sg_rs.dev6(time,s5) values(1,true)",
"insert into root.sg_rs.dev6(time,s5) values(2,true)",
"insert into root.sg_rs.dev6(time,s5) values(3,true)",
])?;
session.delete_timeseries(vec!["root.sg_rs.dev6.s5"])?;
}
//execute_raw_data_query
{
let dataset = session.execute_raw_data_query(
vec![
"root.sg_rs.device2.restart_count",
"root.sg_rs.device2.tick_count",
"root.sg_rs.device2.description",
],
0,
i64::MAX,
)?;
let mut table = Table::new();
table.set_titles(Row::new(
dataset
.get_column_names()
.iter()
.map(|c| cell!(c))
.collect(),
));
dataset.for_each(|r: RowRecord| {
table.add_row(Row::new(
r.values.iter().map(|v: &Value| cell!(v)).collect(),
));
});
table.printstd();
}
//execute_update_statement
{
if let Some(dataset) =
session.execute_update_statement("delete timeseries root.sg_rs.dev0.*")?
{
dataset.for_each(|r| println!("timestamp: {} {:?}", r.timestamp, r.values));
}
}
session.close()?;
Ok(())
}
fn create_tablet(row_count: i32, start_timestamp: i64) -> Tablet {
let mut tablet = Tablet::new(
"root.sg_rs.device2",
vec![
MeasurementSchema::new(
String::from("status"),
TSDataType::Boolean,
TSEncoding::Plain,
TSCompressionType::SNAPPY,
None,
),
MeasurementSchema::new(
String::from("restart_count"),
TSDataType::Int32,
TSEncoding::RLE,
TSCompressionType::SNAPPY,
None,
),
MeasurementSchema::new(
String::from("tick_count"),
TSDataType::Int64,
TSEncoding::RLE,
TSCompressionType::SNAPPY,
None,
),
MeasurementSchema::new(
String::from("temperature"),
TSDataType::Float,
TSEncoding::Plain,
TSCompressionType::SNAPPY,
None,
),
MeasurementSchema::new(
String::from("price"),
TSDataType::Double,
TSEncoding::Gorilla,
TSCompressionType::SNAPPY,
None,
),
MeasurementSchema::new(
String::from("description"),
TSDataType::Text,
TSEncoding::Plain,
TSCompressionType::SNAPPY,
None,
),
],
);
(0..row_count).for_each(|row| {
let ts = start_timestamp + row as i64;
tablet
.add_row(
vec![
Value::Bool(ts % 2 == 0),
Value::Int32(row),
Value::Int64(row as i64),
Value::Float(row as f32 + 0.1),
Value::Double(row as f64 + 0.2),
Value::Text(format!("ts: {}", ts)),
],
ts,
)
.unwrap_or_else(|err| eprintln!("Add row failed, reason '{}'", err));
});
tablet
}
依赖关系
~1–1.6MB
~32K SLoC