1 个不稳定版本
| 0.1.0 | 2024年7月11日 |
|---|
#626 在 解析器实现
155KB
3.5K SLoC
clickhouse.rs
ClickHouse 的类型客户端。
- 使用
serde对行进行编码/解码。 - 支持
serde属性:skip_serializing、skip_deserializing、rename。 - 使用
RowBinary编码通过 HTTP 传输。- 计划切换到
Native通过 TCP。
- 计划切换到
- 支持 TLS。
- 支持压缩和解压缩(LZ4 和 LZ4HC)。
- 提供选择 API。
- 提供插入 API。
- 提供无限事务性(见下文)插入 API。
- 提供实时视图监视 API。
- 提供单元测试的模拟。
注意: ch2rs 可以用于从 ClickHouse 生成行类型。
用法
要使用此包,将以下内容添加到您的 Cargo.toml
[dependencies]
clickhouse = "0.11.6"
[dev-dependencies]
clickhouse = { version = "0.11.6", features = ["test-util"] }
关于 ClickHouse v22.6 之前版本的说明
CH 服务器版本低于 v22.6(2022-06-16)在某些罕见情况下处理 RowBinary 不正确。使用 0.11 并启用 wa-37420 功能来解决此问题。不要用于较新版本。
创建客户端
use clickhouse::Client;
let client = Client::default()
.with_url("https://:8123")
.with_user("name")
.with_password("123")
.with_database("test");
- 重复使用创建的客户端或克隆它们以重复使用连接池。
选择行
use serde::Deserialize;
use clickhouse::Row;
#[derive(Row, Deserialize)]
struct MyRow<'a> {
no: u32,
name: &'a str,
}
let mut cursor = client
.query("SELECT ?fields FROM some WHERE no BETWEEN ? AND ?")
.bind(500)
.bind(504)
.fetch::<MyRow<'_>>()?;
while let Some(row) = cursor.next().await? { .. }
- 占位符
?fields被替换为no, name(Row的字段)。 - 占位符
?被替换为后续bind()调用的值。 - 可以使用方便的
fetch_one::<Row>()和fetch_all::<Row>()来获取第一行或所有行。 - 可以使用
sql::Identifier来绑定表名。
请注意,游标即使在返回一些行之后也可能返回错误。为了避免这种情况,请使用 client.with_option("wait_end_of_query", "1") 来在服务器端启用缓冲。有关更多详细信息,请参阅 这里。buffer_size 选项也可能很有用。
插入一批数据
use serde::Serialize;
use clickhouse::Row;
#[derive(Row, Serialize)]
struct MyRow {
no: u32,
name: String,
}
let mut insert = client.insert("some")?;
insert.write(&MyRow { no: 0, name: "foo".into() }).await?;
insert.write(&MyRow { no: 1, name: "bar".into() }).await?;
insert.end().await?;
- 如果没有调用
end(),则INSERT操作将被中止。 - 行正在逐渐发送以分散网络负载。
- 如果所有行都适合同一分区且其数量小于
max_insert_block_size,则 ClickHouse 仅以原子方式插入批次。
无限插入
需要 inserter 功能。
let mut inserter = client.inserter("some")?
.with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20)))
.with_max_bytes(50_000_000)
.with_max_rows(750_000)
.with_period(Some(Duration::from_secs(15)));
inserter.write(&MyRow { no: 0, name: "foo".into() })?;
inserter.write(&MyRow { no: 1, name: "bar".into() })?;
let stats = inserter.commit().await?;
if stats.rows > 0 {
println!(
"{} bytes, {} rows, {} transactions have been inserted",
stats.bytes, stats.rows, stats.transactions,
);
}
Inserter在达到阈值(max_bytes,max_rows,period)时结束活动的插入。- 可以使用
with_period_bias来偏移结束活动INSERT之间的间隔,以避免并行插入器引起的负载峰值。 Inserter::time_left()可以用来检测当前周期何时结束。如果您的流很少发出项目,请再次调用Inserter::commit()以检查限制。- 使用 quanta crate 实现时间阈值以提高插入器的速度。如果启用
test-util,则不会使用(因此,在自定义测试中可以通过tokio::time::advance()来管理时间)。 - 在
commit()调用之间的所有行都将在同一INSERT语句中插入。 - 如果您想终止插入,请不要忘记刷新。
inserter.end().await?;
执行 DDL
client.query("DROP TABLE IF EXISTS some").execute().await?;
实时视图
需要 watch 功能。
let mut cursor = client
.watch("SELECT max(no), argMax(name, no) FROM some")
.fetch::<Row<'_>>()?;
let (version, row) = cursor.next().await?.unwrap();
println!("live view updated: version={}, row={:?}", version, row);
// Use `only_events()` to iterate over versions only.
let mut cursor = client.watch("some_live_view").limit(20).only_events().fetch()?;
println!("live view updated: version={:?}", cursor.next().await?);
请参阅 示例。
功能标志
lz4(默认启用)— 启用Compression::Lz4和Compression::Lz4Hc(_)变体。如果启用,默认情况下使用Compression::Lz4对所有查询进行处理,除了WATCH。tls(默认启用)— 支持以HTTPS为主干的 URL。inserter— 启用client.inserter()。test-util— 添加模拟。见 示例。仅在使用dev-dependencies时使用。watch— 启用client.watch功能。详见相应部分。uuid— 为与 uuid 包协同工作,添加serde::uuid。time— 为与 time 包协同工作,添加serde::time。
数据类型
-
(U)Int(8|16|32|64|128)映射到/从对应的(u|i)(8|16|32|64|128)类型或其周围的新类型。 -
(U)Int256不直接支持,但有 一个解决方案。 -
Float(32|64)映射到/从对应的f(32|64)或其周围的新类型。 -
Decimal(32|64|128)映射到/从对应的i(32|64|128)或其周围的新类型。使用 fixnum 或其他有符号定点数的实现会更方便。 -
Boolean映射到/从bool或其周围的新类型。 -
String可以映射到任何字符串或字节类型,例如&str,&[u8],String,Vec<u8>或SmartString。也支持新类型。为了存储字节,可以考虑使用 serde_bytes,因为它更高效。示例
#[derive(Row, Debug, Serialize, Deserialize)] struct MyRow<'a> { str: &'a str, string: String, #[serde(with = "serde_bytes")] bytes: Vec<u8>, #[serde(with = "serde_bytes")] byte_slice: &'a [u8], } -
FixedString(_)目前还不支持。 -
Enum(8|16)通过使用 serde_repr 支持。示例
use serde_repr::{Deserialize_repr, Serialize_repr}; #[derive(Row, Serialize, Deserialize)] struct MyRow { level: Level, } #[derive(Debug, Serialize_repr, Deserialize_repr)] #[repr(u8)] enum Level { Debug = 1, Info = 2, Warn = 3, Error = 4, } -
UUID通过使用serde::uuid映射到/从uuid::Uuid。需要uuid功能。示例
#[derive(Row, Serialize, Deserialize)] struct MyRow { #[serde(with = "clickhouse::serde::uuid")] uuid: uuid::Uuid, } -
IPv6映射到/从std::net::Ipv6Addr。 -
IPv4通过使用serde::ipv4映射到/从std::net::Ipv4Addr。示例
#[derive(Row, Serialize, Deserialize)] struct MyRow { #[serde(with = "clickhouse::serde::ipv4")] ipv4: std::net::Ipv4Addr, } -
Date映射到/从u16或围绕它的新类型,并代表自1970-01-01以来经过的天数。另外,通过使用serde::time::date支持time::Date,该功能需要time功能。示例
#[derive(Row, Serialize, Deserialize)] struct MyRow { days: u16, #[serde(with = "clickhouse::serde::time::date")] date: Date, } -
Date32映射到/从i32或围绕它的新类型,并代表自1970-01-01以来经过的天数。另外,通过使用serde::time::date32支持time::Date,该功能需要time功能。示例
#[derive(Row, Serialize, Deserialize)] struct MyRow { days: i32, #[serde(with = "clickhouse::serde::time::date32")] date: Date, } -
DateTime映射到/从u32或围绕它的新类型,并代表自 UNIX 纪元以来经过的秒数。另外,通过使用serde::time::datetime支持time::OffsetDateTime,该功能需要time功能。示例
#[derive(Row, Serialize, Deserialize)] struct MyRow { ts: u32, #[serde(with = "clickhouse::serde::time::datetime")] dt: OffsetDateTime, } -
DateTime64(_)映射到/从i32或其新类型,并代表自 UNIX 纪元以来的时间流逝。此外,通过使用serde::time::datetime64::*支持time::OffsetDateTime,这需要time功能。示例
#[derive(Row, Serialize, Deserialize)] struct MyRow { ts: i64, // elapsed s/us/ms/ns depending on `DateTime64(X)` #[serde(with = "clickhouse::serde::time::datetime64::secs")] dt64s: OffsetDateTime, // `DateTime64(0)` #[serde(with = "clickhouse::serde::time::datetime64::millis")] dt64ms: OffsetDateTime, // `DateTime64(3)` #[serde(with = "clickhouse::serde::time::datetime64::micros")] dt64us: OffsetDateTime, // `DateTime64(6)` #[serde(with = "clickhouse::serde::time::datetime64::nanos")] dt64ns: OffsetDateTime, // `DateTime64(9)` } -
Typle(A, B, ...)映射到/从(A, B, ...)或其新类型。 -
Array(_)映射到/从任何切片,例如Vec<_>,&[_]。也支持新类型。 -
Map(K, V)的行为类似于Array((K, V))。 -
LowCardinality(_)支持无缝。 -
Nullable(_)映射到/从Option<_>。对于clickhouse::serde::*辅助程序添加::option。示例
#[derive(Row, Serialize, Deserialize)] struct MyRow { #[serde(with = "clickhouse::serde::ipv4::option")] ipv4_opt: Option<Ipv4Addr>, } -
Nested通过提供多个带重命名的数组来支持。示例
// CREATE TABLE test(items Nested(name String, count UInt32)) #[derive(Row, Serialize, Deserialize)] struct MyRow { #[serde(rename = "items.name")] items_name: Vec<String>, #[serde(rename = "items.count")] items_count: Vec<u32>, } -
JSON和Geo目前不支持。
Mocking
该软件包提供了用于模拟 CH 服务器和测试 DDL、SELECT、INSERT 和 WATCH 查询的实用工具。
可以通过 test-util 功能启用此功能。**仅**在 dev-dependencies 中使用。
请参阅示例。
依赖项
~8–21MB
~329K SLoC