35 个版本
新版本 0.12.2 | 2024年8月20日 |
---|---|
0.12.0 | 2024年7月16日 |
0.11.6 | 2023年9月27日 |
0.11.5 | 2023年6月12日 |
0.1.0 | 2017年5月17日 |
#26 在 数据库接口
26,355 每月下载量
用于 12 个 Crates (9 个直接使用)
170KB
4K SLoC
clickhouse-rs
ClickHouse 数据库官方纯 Rust 类型客户端。
- 使用
serde
进行行编码/解码。 - 支持
serde
属性:skip_serializing
,skip_deserializing
,rename
。 - 通过 HTTP 传输使用
RowBinary
编码。- 计划切换到 TCP 上的
Native
。
- 计划切换到 TCP 上的
- 支持 TLS(见下面的
native-tls
和rustls-tls
功能)。 - 支持压缩和解压缩(LZ4 和 LZ4HC)。
- 提供选择 API。
- 提供插入 API。
- 提供无限事务性插入 API(见下文)。
- 提供实时视图监控 API。
- 提供单元测试的模拟。
注意: ch2rs 可以用来从 ClickHouse 生成行类型。
用法
要使用该 crate,请将以下内容添加到您的 Cargo.toml
[dependencies]
clickhouse = "0.12.2"
[dev-dependencies]
clickhouse = { version = "0.12.2", features = ["test-util"] }
关于 ClickHouse 低于 v22.6 的说明
低于 v22.6(2022-06-16)的 CH 服务器在某些罕见情况下处理 RowBinary
出错。使用 0.11 并启用 wa-37420
功能以解决这个问题。不要用于新版本。
创建客户端
use clickhouse::Client;
let client = Client::default()
.with_url("https://127.0.0.1:8123")
.with_user("name")
.with_password("123")
.with_database("test");
- 重用创建的客户端或克隆它们以重用连接池。
选择行
use serde::Deserialize;
use clickhouse::Row;
#[derive(Row, Deserialize)]
struct MyRow<'a> {
no: u32,
name: &'a str,
}
let mut cursor = client
.query("SELECT ?fields FROM some WHERE no BETWEEN ? AND ?")
.bind(500)
.bind(504)
.fetch::<MyRow<'_>>()?;
while let Some(row) = cursor.next().await? { .. }
- 占位符
?字段
被替换为no, name
(Row
的字段)。 - 占位符
?
被替换为以下bind()
调用的值。 - 可以使用
fetch_one::
和fetch_all::
获取第一行或所有行。 sql::Identifier
可以用来绑定表名。
注意,即使游标返回了一些行,也可能返回错误。为了避免这种情况,请在服务器端启用缓冲区,使用client.with_option("wait_end_of_query", "1")
。更多详情请见这里。buffer_size
选项也可能很有用。
插入批量数据
use serde::Serialize;
use clickhouse::Row;
#[derive(Row, Serialize)]
struct MyRow {
no: u32,
name: String,
}
let mut insert = client.insert("some")?;
insert.write(&MyRow { no: 0, name: "foo".into() }).await?;
insert.write(&MyRow { no: 1, name: "bar".into() }).await?;
insert.end().await?;
- 如果没有调用
end()
,则INSERT
将被终止。 - 行是逐步发送的,以分散网络负载。
- 只有当所有行都位于同一分区且其数量小于
max_insert_block_size
时,ClickHouse才会原子性地插入批次。
无限插入
需要inserter
功能。
let mut inserter = client.inserter("some")?
.with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20)))
.with_max_bytes(50_000_000)
.with_max_rows(750_000)
.with_period(Some(Duration::from_secs(15)));
inserter.write(&MyRow { no: 0, name: "foo".into() })?;
inserter.write(&MyRow { no: 1, name: "bar".into() })?;
let stats = inserter.commit().await?;
if stats.rows > 0 {
println!(
"{} bytes, {} rows, {} transactions have been inserted",
stats.bytes, stats.rows, stats.transactions,
);
}
Inserter
在达到阈值(max_bytes
、max_rows
、period
)时结束活动插入。- 可以使用
with_period_bias
来偏置结束活动INSERT
之间的间隔,以避免并行插入器引起的负载峰值。 Inserter::time_left()
可以用来检测当前周期何时结束。如果您的流很少发出项目,请再次调用Inserter::commit()
以检查限制。- 使用quanta crate实现的时间阈值可以提高插入器的速度。如果启用
test-util
,则不使用(因此,自定义测试中可以由tokio::time::advance()
管理时间)。 - 在
commit()
调用之间的所有行都将插入到同一个INSERT
语句中。 - 如果您想终止插入,请不要忘记刷新。
inserter.end().await?;
执行DDL
client.query("DROP TABLE IF EXISTS some").execute().await?;
实时视图
需要watch
功能。
let mut cursor = client
.watch("SELECT max(no), argMax(name, no) FROM some")
.fetch::<Row<'_>>()?;
let (version, row) = cursor.next().await?.unwrap();
println!("live view updated: version={}, row={:?}", version, row);
// Use `only_events()` to iterate over versions only.
let mut cursor = client.watch("some_live_view").limit(20).only_events().fetch()?;
println!("live view updated: version={:?}", cursor.next().await?);
请参阅 示例。
功能标志
lz4
(默认启用)——启用Compression::Lz4
和Compression::Lz4Hc(_)
变体。如果启用,除了WATCH
之外的所有查询都将默认使用Compression::Lz4
。native-tls
——通过hyper-tls
支持使用HTTPS
方案的 URL,该方案链接到 OpenSSL。rustls-tls
——通过hyper-rustls
支持使用HTTPS
方案的 URL,该方案不链接到 OpenSSL。inserter
——启用client.inserter()
。test-util
——添加模拟。请参阅 示例。仅在dev-dependencies
中使用。watch
——启用client.watch
功能。请参阅相应的部分以获取详细信息。uuid
——将serde::uuid
添加到与 uuid 包一起使用。time
——将serde::time
添加到与 time 包一起使用。
注意:通过
HTTPS
URL 连接到 ClickHouse 时,必须启用native-tls
或rustls-tls
功能之一。如果两者都启用,则rustls-tls
功能将具有优先级。
数据类型
-
(U)Int(8|16|32|64|128)
映射到/从对应的(u|i)(8|16|32|64|128)
类型或围绕它们的新类型。 -
(U)Int256
不直接支持,但有 一个解决方案。 -
Float(32|64)
映射到/从对应的f(32|64)
或围绕它们的新类型。 -
Decimal(32|64|128)
与相应的i(32|64|128)
或其周围的 newtypes 进行映射。使用 fixnum 或其他有符号定点数的实现更为方便。 -
Boolean
与bool
或其周围的 newtypes 进行映射。 -
String
与任何字符串或字节类型进行映射,例如&str
、&[u8]
、String
、Vec<u8>
或SmartString
。也支持 newtypes。为了存储字节,考虑使用 serde_bytes,因为它更高效。示例
#[derive(Row, Debug, Serialize, Deserialize)] struct MyRow<'a> { str: &'a str, string: String, #[serde(with = "serde_bytes")] bytes: Vec<u8>, #[serde(with = "serde_bytes")] byte_slice: &'a [u8], }
-
FixedString(_)
目前还不支持。 -
Enum(8|16)
使用 serde_repr 支持映射。示例
use serde_repr::{Deserialize_repr, Serialize_repr}; #[derive(Row, Serialize, Deserialize)] struct MyRow { level: Level, } #[derive(Debug, Serialize_repr, Deserialize_repr)] #[repr(u8)] enum Level { Debug = 1, Info = 2, Warn = 3, Error = 4, }
-
UUID
通过使用serde::uuid
映射到/从uuid::Uuid
。需要uuid
功能。示例
#[derive(Row, Serialize, Deserialize)] struct MyRow { #[serde(with = "clickhouse::serde::uuid")] uuid: uuid::Uuid, }
-
IPv6
映射到/从std::net::Ipv6Addr
。 -
IPv4
通过使用serde::ipv4
映射到/从std::net::Ipv4Addr
。示例
#[derive(Row, Serialize, Deserialize)] struct MyRow { #[serde(with = "clickhouse::serde::ipv4")] ipv4: std::net::Ipv4Addr, }
-
Date
映射到/从u16
或其周围的 newtypes,并代表自1970-01-01
以来经过的天数。此外,通过使用serde::time::date
支持使用time::Date
,这需要time
功能。示例
#[derive(Row, Serialize, Deserialize)] struct MyRow { days: u16, #[serde(with = "clickhouse::serde::time::date")] date: Date, }
-
Date32
与i32
或其新类型映射,表示自1970-01-01
以来经过的天数。此外,使用serde::time::date32
支持time::Date
,需要time
功能。示例
#[derive(Row, Serialize, Deserialize)] struct MyRow { days: i32, #[serde(with = "clickhouse::serde::time::date32")] date: Date, }
-
DateTime
与u32
或其新类型映射,表示自 UNIX 纪元以来经过的秒数。此外,使用serde::time::datetime
支持time::OffsetDateTime
,需要time
功能。示例
#[derive(Row, Serialize, Deserialize)] struct MyRow { ts: u32, #[serde(with = "clickhouse::serde::time::datetime")] dt: OffsetDateTime, }
-
DateTime64(_)
与i32
或其新类型映射,表示自 UNIX 纪元以来经过的时间。此外,使用serde::time::datetime64::*
支持time::OffsetDateTime
,需要time
功能。示例
#[derive(Row, Serialize, Deserialize)] struct MyRow { ts: i64, // elapsed s/us/ms/ns depending on `DateTime64(X)` #[serde(with = "clickhouse::serde::time::datetime64::secs")] dt64s: OffsetDateTime, // `DateTime64(0)` #[serde(with = "clickhouse::serde::time::datetime64::millis")] dt64ms: OffsetDateTime, // `DateTime64(3)` #[serde(with = "clickhouse::serde::time::datetime64::micros")] dt64us: OffsetDateTime, // `DateTime64(6)` #[serde(with = "clickhouse::serde::time::datetime64::nanos")] dt64ns: OffsetDateTime, // `DateTime64(9)` }
-
Typle(A, B, ...)
与(A, B, ...)
或其新类型映射。 -
Array(_)
与任何切片映射,例如Vec<_>
,&[_]
。也支持新类型。 -
Map(K, V)
的行为类似于Array((K, V))
。 -
LowCardinality(_)
支持无缝映射。 -
Nullable(_)
与Option<_>
映射。对于clickhouse::serde::*
辅助器添加::option
。示例
#[derive(Row, Serialize, Deserialize)] struct MyRow { #[serde(with = "clickhouse::serde::ipv4::option")] ipv4_opt: Option<Ipv4Addr>, }
-
Nested
通过提供具有重命名的多个数组来支持。示例
// CREATE TABLE test(items Nested(name String, count UInt32)) #[derive(Row, Serialize, Deserialize)] struct MyRow { #[serde(rename = "items.name")] items_name: Vec<String>, #[serde(rename = "items.count")] items_count: Vec<u32>, }
-
目前不支持
JSON
和Geo
。
Mocking
该软件包提供模拟 CH 服务器和测试 DDL、SELECT
、INSERT
和 WATCH
查询的实用工具。
可以使用 test-util
功能启用此功能。仅在实际依赖项中使用。
请参阅示例。
依赖项
~7–22MB
~349K SLoC