10 个版本

0.3.12 2023年12月19日
0.3.11 2022年9月7日
0.3.10 2022年6月15日
0.3.9 2022年2月10日
0.1.0 2021年12月6日

#331数据库接口

Download history 3/week @ 2024-03-10 82/week @ 2024-03-31

每月下载量:55

Apache-2.0

585KB
14K SLoC

Apache IoTDB

Main Mac and Linux Main Win coveralls GitHub release License IoTDB Website

Apache IoTDB(物联网数据库)是一个高性能的物联网原生数据库,适用于数据管理和分析,可在边缘和云端部署。由于其轻量级架构、高性能以及与Apache Hadoop、Spark和Flink的深度集成,Apache IoTDB能够满足物联网领域大规模数据存储、高速数据摄入和复杂数据分析的需求。

Apache IoTDB Rust 客户端

概述

这是Apache IoTDB的Rust客户端。

Apache IoTDB 网站: https://iotdb.apache.org Apache IoTDB Github: https://github.com/apache/iotdb

先决条件

apache-iotdb 0.12.0 及以上。
rust 1.56.0 及以上。

如何使用客户端(快速入门)

使用方法

将以下内容放入您的 Cargo.toml

[dependencies]
iotdb-client-rs="^0.3.12"

示例

将以下内容放入您的示例的 Cargo.toml

[dependencies]
iotdb-client-rs="^0.3.12"
chrono="0.4.19"
prettytable-rs="0.8.0"
structopt = "0.3.25"
use std::{collections::BTreeMap, vec};

use chrono::Local;
use iotdb::client::remote::{Config, RpcSession};
use iotdb::client::{MeasurementSchema, Result, RowRecord, Session, Tablet, Value};
use iotdb::protocal::{TSCompressionType, TSDataType, TSEncoding};
use prettytable::{cell, Row, Table};
use structopt::StructOpt;

fn main() {
    run().expect("failed to run session_example.");
}

fn run() -> Result<()> {
    #[derive(StructOpt)]
    #[structopt(name = "session_example")]
    struct Opt {
        #[structopt(short = "h", long, default_value = "127.0.0.1")]
        host: String,

        #[structopt(short = "P", long, default_value = "6667")]
        port: i32,

        #[structopt(short = "u", long, default_value = "root")]
        user: String,

        #[structopt(short = "p", long, default_value = "root")]
        password: String,

        #[structopt(short = "c", long)]
        clean: bool,
    }

    let opt = Opt::from_args();
    // let config = Config {
    //     host: opt.host,
    //     port: opt.port,
    //     username: opt.user,
    //     password: opt.password,
    //     ..Default::default()
    // };

    // Create config object with builder
    let config = Config::builder()
        .host(opt.host)
        .port(opt.port)
        .username(opt.user)
        .password(opt.password)
        .build();

    let mut session = RpcSession::new(config)?;
    session.open()?;

    //time_zone
    let tz = session.get_time_zone()?;
    if tz != "Asia/Shanghai" {
        session.set_time_zone("Asia/Shanghai")?;
    }

    //set_storage_group
    session.set_storage_group("root.ln1")?;
    session.delete_storage_group("root.ln1")?;

    //delete_storage_groups
    session.set_storage_group("root.ln1")?;
    session.set_storage_group("root.ln2")?;
    session.delete_storage_groups(vec!["root.ln1", "root.ln2"])?;

    //if storage group 'root.sg_rs' exist, remove it.
    if opt.clean {
        session
            .delete_storage_group("root.sg_rs")
            .unwrap_or_default();
    }

    //create_timeseries
    {
        session.create_timeseries(
            "root.sg_rs.dev2.status",
            TSDataType::Float,
            TSEncoding::Plain,
            TSCompressionType::SNAPPY,
            Some(BTreeMap::from([
                ("prop1".to_owned(), "1".to_owned()),
                ("prop2".to_owned(), "2".to_owned()),
            ])),
            Some(BTreeMap::from([
                ("attr1".to_owned(), "1".to_owned()),
                ("attr2".to_owned(), "2".to_owned()),
            ])),
            Some(BTreeMap::from([
                ("tag1".to_owned(), "1".to_owned()),
                ("tag2".to_owned(), "2".to_owned()),
            ])),
            Some("stats".to_string()),
        )?;
        session.delete_timeseries(vec!["root.sg_rs.dev2.status"])?;
    }

    //create_multi_timeseries
    {
        session.create_multi_timeseries(
            vec!["root.sg3.dev1.temperature", "root.sg3.dev1.desc"],
            vec![TSDataType::Float, TSDataType::Text],
            vec![TSEncoding::Plain, TSEncoding::Plain],
            vec![TSCompressionType::SNAPPY, TSCompressionType::SNAPPY],
            None,
            None,
            None,
            None,
        )?;
        session.delete_timeseries(vec!["root.sg3.dev1.temperature", "root.sg3.dev1.desc"])?;
    }

    //insert_record
    {
        session.insert_record(
            "root.sg_rs.dev5",
            vec!["online", "desc"],
            vec![Value::Bool(false), Value::Text("F4145".to_string())],
            Local::now().timestamp_millis(),
            false,
        )?;
        session.delete_timeseries(vec!["root.sg_rs.dev5.online", "root.sg_rs.dev5.desc"])?;
    }

    //insert_string_record
    {
        session.insert_string_record(
            "root.sg_rs.wf02.wt02",
            vec!["id", "location"],
            vec!["SN:001", "BeiJing"],
            Local::now().timestamp_millis(),
            false,
        )?;
        session.delete_timeseries(vec![
            "root.sg_rs.wf02.wt02.id",
            "root.sg_rs.wf02.wt02.location",
        ])?;
    }

    //insert_records
    {
        session.insert_records(
            vec!["root.sg_rs.dev1"],
            vec![vec![
                "restart_count",
                "tick_count",
                "price",
                "temperature",
                "description",
                "status",
            ]],
            vec![vec![
                Value::Int32(i32::MAX),
                Value::Int64(1639704010752),
                Value::Double(1988.1),
                Value::Float(36.8),
                Value::Text("Test Device 1".to_string()),
                Value::Bool(false),
            ]],
            vec![Local::now().timestamp_millis()],
        )?;
        session.delete_timeseries(vec![
            "root.sg_rs.dev1.restart_count",
            "root.sg_rs.dev1.tick_count",
            "root.sg_rs.dev1.price",
            "root.sg_rs.dev1.temperature",
            "root.sg_rs.dev1.description",
            "root.sg_rs.dev1.status",
        ])?;
    }

    //insert_records_of_one_device
    {
        session.insert_records_of_one_device(
            "root.sg_rs.dev0",
            vec![
                Local::now().timestamp_millis(),
                Local::now().timestamp_millis() - 1,
            ],
            vec![
                vec!["restart_count", "tick_count", "price"],
                vec!["temperature", "description", "status"],
            ],
            vec![
                vec![Value::Int32(1), Value::Int64(2018), Value::Double(1988.1)],
                vec![
                    Value::Float(36.8),
                    Value::Text("thermograph".to_string()),
                    Value::Bool(false),
                ],
            ],
            false,
        )?;
    }

    //tablet
    let mut ts = Local::now().timestamp_millis();
    let mut tablet1 = create_tablet(5, ts);

    ts += 5;
    let mut tablet2 = create_tablet(10, ts);

    ts += 10;
    let mut tablet3 = create_tablet(2, ts);

    //insert_tablet
    tablet1.sort();
    session.insert_tablet(&tablet1)?;

    //insert_tablets
    {
        tablet2.sort();
        tablet3.sort();
        session.insert_tablets(vec![&tablet2, &tablet3])?;
    }

    //delete_data
    session.delete_data(vec!["root.sg_rs.dev1.status"], 1, 16)?;

    //execute_query_statement
    {
        let dataset = session.execute_query_statement("select * from root.sg_rs.device2", None)?;
        // Get columns, column types and values from the dataset
        // For example:
        let width = 18;
        let column_count = dataset.get_column_names().len();
        let print_line_sep =
            || println!("{:=<width$}", '=', width = (width + 1) * column_count + 1);

        print_line_sep();
        dataset
            .get_column_names()
            .iter()
            .for_each(|c| print!("|{:>width$}", c.split('.').last().unwrap(), width = width));
        println!("|");
        print_line_sep();
        dataset.get_data_types().iter().for_each(|t| {
            let type_name = format!("{:?}", t);
            print!("|{:>width$}", type_name, width = width)
        });
        println!("|");
        print_line_sep();
        dataset.for_each(|r| {
            r.values.iter().for_each(|v| match v {
                Value::Bool(v) => print!("|{:>width$}", v, width = width),
                Value::Int32(v) => print!("|{:>width$}", v, width = width),
                Value::Int64(v) => print!("|{:>width$}", v, width = width),
                Value::Float(v) => print!("|{:>width$}", v, width = width),
                Value::Double(v) => print!("|{:>width$}", v, width = width),
                Value::Text(v) => print!("|{:>width$}", v, width = width),
                Value::Null => print!("|{:>width$}", "null", width = width),
            });
            println!("|");
        });
        print_line_sep();
    }

    //execute_statement
    {
        let dataset = session.execute_statement("show timeseries", None)?;
        let mut table = Table::new();
        table.set_titles(Row::new(
            dataset
                .get_column_names()
                .iter()
                .map(|c| cell!(c))
                .collect(),
        ));
        dataset.for_each(|r: RowRecord| {
            table.add_row(Row::new(
                r.values.iter().map(|v: &Value| cell!(v)).collect(),
            ));
        });
        table.printstd();
    }

    //execute_batch_statement
    {
        session.execute_batch_statement(vec![
            "insert into root.sg_rs.dev6(time,s5) values(1,true)",
            "insert into root.sg_rs.dev6(time,s5) values(2,true)",
            "insert into root.sg_rs.dev6(time,s5) values(3,true)",
        ])?;
        session.delete_timeseries(vec!["root.sg_rs.dev6.s5"])?;
    }
    //execute_raw_data_query
    {
        let dataset = session.execute_raw_data_query(
            vec![
                "root.sg_rs.device2.restart_count",
                "root.sg_rs.device2.tick_count",
                "root.sg_rs.device2.description",
            ],
            0,
            i64::MAX,
        )?;
        let mut table = Table::new();
        table.set_titles(Row::new(
            dataset
                .get_column_names()
                .iter()
                .map(|c| cell!(c))
                .collect(),
        ));
        dataset.for_each(|r: RowRecord| {
            table.add_row(Row::new(
                r.values.iter().map(|v: &Value| cell!(v)).collect(),
            ));
        });
        table.printstd();
    }

    //execute_update_statement
    {
        if let Some(dataset) =
            session.execute_update_statement("delete timeseries root.sg_rs.dev0.*")?
        {
            dataset.for_each(|r| println!("timestamp: {} {:?}", r.timestamp, r.values));
        }
    }

    session.close()?;
    Ok(())
}

fn create_tablet(row_count: i32, start_timestamp: i64) -> Tablet {
    let mut tablet = Tablet::new(
        "root.sg_rs.device2",
        vec![
            MeasurementSchema::new(
                String::from("status"),
                TSDataType::Boolean,
                TSEncoding::Plain,
                TSCompressionType::SNAPPY,
                None,
            ),
            MeasurementSchema::new(
                String::from("restart_count"),
                TSDataType::Int32,
                TSEncoding::RLE,
                TSCompressionType::SNAPPY,
                None,
            ),
            MeasurementSchema::new(
                String::from("tick_count"),
                TSDataType::Int64,
                TSEncoding::RLE,
                TSCompressionType::SNAPPY,
                None,
            ),
            MeasurementSchema::new(
                String::from("temperature"),
                TSDataType::Float,
                TSEncoding::Plain,
                TSCompressionType::SNAPPY,
                None,
            ),
            MeasurementSchema::new(
                String::from("price"),
                TSDataType::Double,
                TSEncoding::Gorilla,
                TSCompressionType::SNAPPY,
                None,
            ),
            MeasurementSchema::new(
                String::from("description"),
                TSDataType::Text,
                TSEncoding::Plain,
                TSCompressionType::SNAPPY,
                None,
            ),
        ],
    );
    (0..row_count).for_each(|row| {
        let ts = start_timestamp + row as i64;
        tablet
            .add_row(
                vec![
                    Value::Bool(ts % 2 == 0),
                    Value::Int32(row),
                    Value::Int64(row as i64),
                    Value::Float(row as f32 + 0.1),
                    Value::Double(row as f64 + 0.2),
                    Value::Text(format!("ts: {}", ts)),
                ],
                ts,
            )
            .unwrap_or_else(|err| eprintln!("Add row failed, reason '{}'", err));
    });
    tablet
}

依赖关系

~1–1.6MB
~32K SLoC