116 个版本
0.12.3 | 2024年6月6日 |
---|---|
0.12.0 | 2024年3月1日 |
0.11.23 | 2024年5月19日 |
0.11.0 | 2023年12月28日 |
0.3.12 | 2022年11月29日 |
#1683 in 数据库接口
每月下载量 193
在 2 个 crate 中使用 (通过 taos)
370KB
8K SLoC
TDengine 的官方 Rust 连接器
Docs.rs | Crates.io 版本 | Crates.io 下载量 | CodeCov |
---|---|---|---|
这是 TDengine 的官方 Rust 连接器。
依赖
- Rust 当然。
如果你使用默认功能,则将依赖于
- TDengine 客户端库和头文件。
用法
默认情况下,启用原生和 WebSocket 客户端
[dependencies]
taos = "*"
仅 WebSocket 客户端
[dependencies]
taos = { version = "*", default-features = false, features = ["ws"] }
仅原生
[dependencies]
taos = { version = "*", default-features = false, features = ["native"] }
查询
use chrono::{DateTime, Local};
use taos::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dsn = "taos://127.0.0.1:6030";
let builder = TaosBuilder::from_dsn(dsn)?;
let taos = builder.build()?;
let db = "query";
// prepare database
taos.exec_many([
format!("DROP DATABASE IF EXISTS `{db}`"),
format!("CREATE DATABASE `{db}`"),
format!("USE `{db}`"),
])
.await?;
let inserted = taos.exec_many([
// create super table
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
TAGS (`groupid` INT, `location` BINARY(16))",
// create child table
"CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')",
// insert into child table
"INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
// insert with NULL values
"INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
// insert and automatically create table with tags if not exists
"INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)",
// insert many records in a single sql
"INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
]).await?;
assert_eq!(inserted, 6);
let mut result = taos.query("select * from `meters`").await?;
for field in result.fields() {
println!("got field: {}", field.name());
}
// Query option 1, use rows stream.
let mut rows = result.rows();
while let Some(row) = rows.try_next().await? {
for (name, value) in row {
println!("got value of {}: {}", name, value);
}
}
// Query options 2, use deserialization with serde.
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
groupid: i32,
// binary/varchar to String
location: String,
}
let records: Vec<Record> = taos
.query("select * from `meters`")
.await?
.deserialize()
.try_collect()
.await?;
dbg!(records);
Ok(())
}
订阅
use std::time::Duration;
use chrono::{DateTime, Local};
use taos::*;
// Query options 2, use deserialization with serde.
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// float to f32
current: Option<f32>,
// int to i32
voltage: Option<i32>,
phase: Option<f32>,
}
async fn prepare(taos: Taos) -> anyhow::Result<()> {
let inserted = taos.exec_many([
// create child table
"CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')",
// insert into child table
"INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
// insert with NULL values
"INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
// insert and automatically create table with tags if not exists
"INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)",
// insert many records in a single sql
"INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
]).await?;
assert_eq!(inserted, 6);
Ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// std::env::set_var("RUST_LOG", "debug");
pretty_env_logger::init();
let dsn = "taos://127.0.0.1:6030";
let builder = TaosBuilder::from_dsn(dsn)?;
let taos = builder.build()?;
let db = "tmq";
// prepare database
taos.exec_many([
format!("DROP TOPIC IF EXISTS tmq_meters"),
format!("DROP DATABASE IF EXISTS `{db}`"),
format!("CREATE DATABASE `{db}`"),
format!("USE `{db}`"),
// create super table
format!("CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT)\
TAGS (`groupid` INT, `location` BINARY(16))"),
// create topic for subscription
format!("CREATE TOPIC tmq_meters with META AS DATABASE {db}")
])
.await?;
let task = tokio::spawn(prepare(taos));
tokio::time::sleep(Duration::from_secs(1)).await;
// subscribe
let tmq = TmqBuilder::from_dsn("taos://127.0.0.1:6030/?group.id=test")?;
let mut consumer = tmq.build()?;
consumer.subscribe(["tmq_meters"]).await?;
{
let mut stream = consumer.stream();
while let Some((offset, message)) = stream.try_next().await? {
// get information from offset
// the topic
let topic = offset.topic();
// the vgroup id, like partition id in kafka.
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
// one block for one table, get table name if needed
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?;
println!(
"** table: {}, got {} records: {:#?}\n",
name.unwrap(),
records.len(),
records
);
}
}
consumer.commit(offset).await?;
}
}
consumer.unsubscribe().await;
task.await??;
Ok(())
}
贡献
欢迎所有贡献。
许可证
与 TDengine 保持一致。
依赖
~30–46MB
~865K SLoC