3 个不稳定版本
| 0.2.0 | 2024年7月16日 |
|---|---|
| 0.1.1 | 2021年7月7日 |
| 0.1.0 | 2021年5月31日 |
#403 在 HTTP客户端
26,303 每月下载量
用于 14 个crate (3直接)
4KB
clickhouse-rs
ClickHouse的强类型客户端。
- 使用
serde进行行的编码/解码。 - 支持
serde属性:skip_serializing、skip_deserializing、rename。 - 通过HTTP传输使用
RowBinary编码。- 计划切换到TCP上的
Native。
- 计划切换到TCP上的
- 支持TLS(见下面的
native-tls和rustls-tls功能)。 - 支持压缩和解压缩(LZ4和LZ4HC)。
- 提供选择API。
- 提供插入API。
- 提供无限事务性插入API(见下文)。
- 提供监控实时视图的API。
- 提供单元测试的mocks。
注意:ch2rs用于从ClickHouse生成行类型很有用。
用法
要使用crate,将以下内容添加到您的Cargo.toml
[dependencies]
clickhouse = "0.12.1"
[dev-dependencies]
clickhouse = { version = "0.12.1", features = ["test-util"] }
关于v22.6之前的ClickHouse的说明
CH服务器版本低于v22.6(2022-06-16)在某些罕见情况下处理RowBinary 不正确。使用0.11并启用wa-37420功能来解决问题。不要用于新版本。
创建客户端
use clickhouse::Client;
let client = Client::default()
.with_url("https://:8123")
.with_user("name")
.with_password("123")
.with_database("test");
- 重复使用创建的客户端或克隆它们以重复使用连接池。
选择行
use serde::Deserialize;
use clickhouse::Row;
#[derive(Row, Deserialize)]
struct MyRow<'a> {
no: u32,
name: &'a str,
}
let mut cursor = client
.query("SELECT ?fields FROM some WHERE no BETWEEN ? AND ?")
.bind(500)
.bind(504)
.fetch::<MyRow<'_>>()?;
while let Some(row) = cursor.next().await? { .. }
- 占位符
?字段被替换为no, name(Row的字段)。 - 占位符
?被替换为以下bind()调用的值。 - 可以使用
fetch_one::<Row>()和fetch_all::<Row>()来分别获取第一行或所有行。 - 可以使用
sql::Identifier来绑定表名。
注意,游标即使在返回了一些行之后也可能返回错误。为了避免这种情况,请使用 client.with_option("wait_end_of_query", "1") 来在服务器端启用缓冲。更多详情请参见 这里。也可以使用 buffer_size 选项。
插入一批数据
use serde::Serialize;
use clickhouse::Row;
#[derive(Row, Serialize)]
struct MyRow {
no: u32,
name: String,
}
let mut insert = client.insert("some")?;
insert.write(&MyRow { no: 0, name: "foo".into() }).await?;
insert.write(&MyRow { no: 1, name: "bar".into() }).await?;
insert.end().await?;
- 如果没有调用
end(),则INSERT将被终止。 - 行正在逐渐发送,以分散网络负载。
- 只有当所有行都位于同一个分区并且行数小于
max_insert_block_size时,ClickHouse 才会原子性地插入批次。
无限插入
需要 inserter 功能。
let mut inserter = client.inserter("some")?
.with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20)))
.with_max_bytes(50_000_000)
.with_max_rows(750_000)
.with_period(Some(Duration::from_secs(15)));
inserter.write(&MyRow { no: 0, name: "foo".into() })?;
inserter.write(&MyRow { no: 1, name: "bar".into() })?;
let stats = inserter.commit().await?;
if stats.rows > 0 {
println!(
"{} bytes, {} rows, {} transactions have been inserted",
stats.bytes, stats.rows, stats.transactions,
);
}
Inserter在达到阈值(max_bytes,max_rows,period)时结束活动插入commit()。- 通过使用
with_period_bias来调整结束活动INSERT的间隔,可以避免并行插入器造成的负载峰值。 Inserter::time_left()可以用来检测当前周期何时结束。如果您的流很少发出项目,请再次调用Inserter::commit()以检查限制。- 使用 quanta crate 实现的时间阈值可以加速
inserter。如果启用了test-util,则不会使用(因此,自定义测试中可以使用tokio::time::advance()来管理时间)。 - 在同一个
INSERT语句中插入的所有行都位于commit()调用之间。 - 如果您想终止插入,别忘了刷新。
inserter.end().await?;
执行 DDL 操作
client.query("DROP TABLE IF EXISTS some").execute().await?;
实时视图
需要 watch 功能。
let mut cursor = client
.watch("SELECT max(no), argMax(name, no) FROM some")
.fetch::<Row<'_>>()?;
let (version, row) = cursor.next().await?.unwrap();
println!("live view updated: version={}, row={:?}", version, row);
// Use `only_events()` to iterate over versions only.
let mut cursor = client.watch("some_live_view").limit(20).only_events().fetch()?;
println!("live view updated: version={:?}", cursor.next().await?);
请参阅示例。
功能标志
lz4(默认启用)— 启用Compression::Lz4和Compression::Lz4Hc(_)变体。如果启用,默认情况下,除了WATCH外的所有查询都使用Compression::Lz4。native-tls— 通过hyper-tls支持带有HTTPS架构的 URL,该架构链接到 OpenSSL。rustls-tls— 通过hyper-rustls支持带有HTTPS架构的 URL,该架构不链接到 OpenSSL。inserter— 启用client.inserter()。test-util— 添加模拟。请参阅示例。仅在dev-dependencies中使用。watch— 启用client.watch功能。请参阅相应的部分以获取详细信息。uuid— 将serde::uuid添加到与 uuid 包一起使用。time— 将serde::time添加到与 time 包一起使用。
注意:当通过
HTTPSURL 连接到 ClickHouse 时,您必须启用native-tls或rustls-tls功能。如果两者都启用,则rustls-tls功能将优先。
数据类型
-
(U)Int(8|16|32|64|128)映射到/从相应的(u|i)(8|16|32|64|128)类型或它们周围的新类型。 -
(U)Int256不直接支持,但有一个解决方案。 -
Float(32|64)映射到/从相应的f(32|64)或它们周围的新类型。 -
Decimal(32|64|128)可以映射到/从相应的i(32|64|128)或其周围的 newtypes。使用 fixnum 或其他有符号定点数的实现更为方便。 -
Boolean可以映射到/从bool或其周围的 newtypes。 -
String可以映射到/从任何字符串或字节类型,例如&str、&[u8]、String、Vec<u8>或SmartString。也支持 newtypes。为了存储字节,请考虑使用 serde_bytes,因为它更高效。示例
#[derive(Row, Debug, Serialize, Deserialize)] struct MyRow<'a> { str: &'a str, string: String, #[serde(with = "serde_bytes")] bytes: Vec<u8>, #[serde(with = "serde_bytes")] byte_slice: &'a [u8], } -
FixedString(_)目前还不支持。 -
Enum(8|16)使用 serde_repr 支持。示例
use serde_repr::{Deserialize_repr, Serialize_repr}; #[derive(Row, Serialize, Deserialize)] struct MyRow { level: Level, } #[derive(Debug, Serialize_repr, Deserialize_repr)] #[repr(u8)] enum Level { Debug = 1, Info = 2, Warn = 3, Error = 4, } -
UUID通过使用serde::uuid映射到/从uuid::Uuid。需要uuid功能。示例
#[derive(Row, Serialize, Deserialize)] struct MyRow { #[serde(with = "clickhouse::serde::uuid")] uuid: uuid::Uuid, } -
IPv6通过使用serde::ipv4映射到/从std::net::Ipv6Addr。 -
IPv4通过使用serde::ipv4映射到/从std::net::Ipv4Addr。示例
#[derive(Row, Serialize, Deserialize)] struct MyRow { #[serde(with = "clickhouse::serde::ipv4")] ipv4: std::net::Ipv4Addr, } -
Date可以映射到/从u16或其周围的 newtypes,表示自1970-01-01以来经过的天数。此外,通过使用serde::time::date支持使用time::Date,这需要time功能。示例
#[derive(Row, Serialize, Deserialize)] struct MyRow { days: u16, #[serde(with = "clickhouse::serde::time::date")] date: Date, } -
Date32映射到/从i32或其新的类型,代表自1970-01-01以来经过的天数。此外,使用serde::time::date32支持使用time::Date,需要time功能。示例
#[derive(Row, Serialize, Deserialize)] struct MyRow { days: i32, #[serde(with = "clickhouse::serde::time::date32")] date: Date, } -
DateTime映射到/从u32或其新的类型,代表自 UNIX 纪元以来经过的秒数。此外,使用serde::time::datetime支持使用time::OffsetDateTime,需要time功能。示例
#[derive(Row, Serialize, Deserialize)] struct MyRow { ts: u32, #[serde(with = "clickhouse::serde::time::datetime")] dt: OffsetDateTime, } -
DateTime64(_)映射到/从i32或其新的类型,代表自 UNIX 纪元以来经过的时间。此外,使用serde::time::datetime64::*支持使用time::OffsetDateTime,需要time功能。示例
#[derive(Row, Serialize, Deserialize)] struct MyRow { ts: i64, // elapsed s/us/ms/ns depending on `DateTime64(X)` #[serde(with = "clickhouse::serde::time::datetime64::secs")] dt64s: OffsetDateTime, // `DateTime64(0)` #[serde(with = "clickhouse::serde::time::datetime64::millis")] dt64ms: OffsetDateTime, // `DateTime64(3)` #[serde(with = "clickhouse::serde::time::datetime64::micros")] dt64us: OffsetDateTime, // `DateTime64(6)` #[serde(with = "clickhouse::serde::time::datetime64::nanos")] dt64ns: OffsetDateTime, // `DateTime64(9)` } -
Typle(A, B, ...)映射到/从(A, B, ...)或其新的类型。 -
Array(_)映射到/从任何切片,例如Vec<_>,&[_]。也支持新的类型。 -
Map(K, V)的行为类似于Array((K, V))。 -
LowCardinality(_)可以无缝支持。 -
Nullable(_)映射到/从Option<_>。对于clickhouse::serde::*辅助函数添加::option。示例
#[derive(Row, Serialize, Deserialize)] struct MyRow { #[serde(with = "clickhouse::serde::ipv4::option")] ipv4_opt: Option<Ipv4Addr>, } -
Nested通过提供多个具有重命名的数组来支持。示例
// CREATE TABLE test(items Nested(name String, count UInt32)) #[derive(Row, Serialize, Deserialize)] struct MyRow { #[serde(rename = "items.name")] items_name: Vec<String>, #[serde(rename = "items.count")] items_count: Vec<u32>, } -
目前不支持
JSON和Geo。
Mocking
该软件包提供用于模拟 CH 服务器和测试 DDL、SELECT、INSERT 和 WATCH 查询的实用工具。
可以通过 test-util 功能启用此功能。仅在使用开发依赖项时使用它。
请参阅示例。
依赖关系
~0.4–0.9MB
~21K SLoC