1 个不稳定版本
0.1.0 | 2020年8月7日 |
---|
#433 在 压缩
22 每月下载量
在 clickhouse-driver 中使用
305KB
5.5K SLoC
ClickHouse Rust 客户端
基于 tokio 的异步纯 Rust ClickHouse 客户端库
开发状态: alpha
在 Linux x86-64 (ubuntu 20.04 LTS),Windows 10 上测试过。
原因
- 为了创建一个用于 ClickHouse 的轻量级且健壮的驱动器,ClickHouse 是一个快速的开源列式数据库
- 为了学习 Rust 并发和零成本抽象
支持的功能
- 基于 tokio 的异步引擎
- 原生 ClickHouse 协议
- LZ4 压缩
- 持久连接池
- 简单的行到对象映射器
- 日期 | DateTime | DateTime64- 读写
- (U)Int(8|16|32|64) - 读写
- Float32 | Float64 - 读写
- UUID - 读写
- String | FixedString- 读写
- Ipv4 | Ipv6 - 读写
- Nullable(*) - 读写
- Decimal - 读写
- Enum8, Enum16 - 读写
用例
- 使用 ClickHouse 服务器支持的 SQL 语法执行查询
- 执行任意的 DDL 命令
- 查询服务器状态
- 将大量数据流插入 ClickHouse 服务器
- 使用轮询法进行负载均衡
快速入门
需要 rust 1.42。
该包尚未在 crates.io 上发布。从 主页 git 下载源代码
git module update --init --recursive
构建需要 rust 1.41 稳定版或夜间版,tokio-0.2.x。
- 将以下行添加到
dependencies
部分的Cargo.toml
clickhouse-driver = { version="0.1.0-alpha.3", path="../path_to_package/clickhouse-driver"}
clickhouse-driver-lz4 = { version="0.1.0", path="../path_to_package/lz4a"}
clickhouse-driver-cthrs = { version="0.1.0", path="../path_to_package/cityhash-rs"}
- 在 main.rs 中添加使用说明
extern crate clickhouse_driver;
use clickhouse_driver::prelude::*;
要连接到服务器,提供连接 URL
tcp://username:password@localhost/database?paramname=paramvalue&...
例如
tcp://user:default@localhost/log?ping_timout=200ms&execute_timeout=5s&query_timeout=20s&pool_max=4&compression=lz4
支持的 URL 参数
-
compression
- 接受 'lz4' 或 'none'。lz4 - 快速且高效的压缩方法。如果用于大数据块,可以显著减少传输数据的大小和时间。对于小数据,最好选择无压缩; -
connection_timeout
- 建立连接的超时时间。默认为 500ms; -
execute_timeout
- 等待 execute 方法调用结果的超时时间。如果执行用于更改大表,可能需要很长时间才能完成。在这种情况下,设置适当的参数。在其他情况下,保留默认值(180 秒); -
query_timout
- 在查询调用中等待从服务器接收下一块数据的超时时间。注意。大量数据查询可能需要很长时间。此超时要求在整个超时期间只能接收到一个数据块。默认值为180秒; -
insert_timeout
- 等待insert
调用结果超时。如果服务器在insert_timeout结束前未收到消息,则insert方法调用返回错误。由于插入数据处理是异步的,它不包括服务器块处理时间。默认值为180秒; -
ping_timout
- 在收到ping响应之前等待的时间。如果服务器在ping_timeout结束前未返回pong响应,则认为该主机不可用;
does not return pong response until the end of ping_timeout; -
retry_timeout
- 在发送下一个ping之前等待的秒数,如果服务器未返回; -
ping_before_query
- 1(默认)或0。如果设置此选项,则需要驱动程序在从连接池返回连接后检查Clickhouse服务器的责任; -
pool_min
- 最小连接池大小。池中可以保留的空闲连接数;默认值为2; -
pool_max
- 池可以创建的最大连接数。如果在池达到最大值且没有空闲连接时需要新的连接,则此任务将放入等待队列。默认值为10; -
readonly
- 0(默认)|1|2。0 - 允许所有命令。2 - select查询和更改设置,1 - 只允许select查询; -
keepalive
- keepalive TCP选项; -
host
- 替代主机(s);
所有超时参数接受整数 - 秒数。要指定以毫秒为单位的超时,请在末尾添加ms
。示例
200ms
(200毫秒)20
(20秒)10s
(10秒)
示例
struct Blob {
id: u64,
url: String,
date: ServerDate,
client: Uuid,
ip: Ipv4Addr,
value: Decimal32,
}
impl Deserialize for Blob {
fn deserialize(row: Row) -> errors::Result<Self> {
let err = || errors::ConversionError::UnsupportedConversion;
let id: u64 = row.value(0)?.ok_or_else(err)?;
let url: &str = row.value(1)?.ok_or_else(err)?;
let date: ServerDate = row.value(2)?.ok_or_else(err)?;
let client: Uuid = row.value(3)?.ok_or_else(err)?;
let ip = row.value(4)?.ok_or_else(err)?;
let value: Decimal32 = row.value(5)?.ok_or_else(err)?;
Ok(Blob {
id,
date,
client,
value,
url: url.to_string(),
ip,
})
}
}
#[tokio::main]
async fn main() -> Result<(), io::Error> {
// CREATE TABLE IF NOT EXISTS blob (
// id UInt64,
// url String,
// date DateTime,
// client UUID,
// ip IPv4
// ) ENGINE=MergeTree PARTITION BY id ORDER BY date
let database_url =
env::var("DATABASE_URL").unwrap_or_else(|_| "tcp://localhost:9000?compression=lz4".into());
let pool = Pool::create(database_url.as_str())?;
{
let mut conn = pool.connection().await?;
conn.ping().await?;
let mut result = conn
.query("SELECT id, url, date, client, ip FROM blob WHERE id=150 ORDER BY date LIMIT 30000")
.await?;
while let Some(block) = result.next().await? {
for blob in block.iter::<Blob>() {
...
}
}
}
Ok(())
}
- 有关更多示例,请参阅clickhouse-driver/examples/目录
已知问题和限制
- 不支持多维数组;
- 数组数据类型只读;
- LowCardinality - 只读,基于String类型;
- Insert方法只支持有限的数据类型
insert
要求插入的数据与表列类型完全匹配- Int8(16|32|64) - i8(16|32|64);
- UInt8(16|32|64) - u8(16|32|64);
- Float32 - f32;
- Float64 - f64;
- Date - chrono::Date;
- DateTime - chrono::DateTime;
- UUID - Uuid;
- IPv4 - AddrIpv4;
- IPv6 - AddrIpv6;
- String - &str,String,或 &[u8];
- Enum8|16 - &str或String。此外,枚举索引的i16值也可以检索。
路线图
Array
列数据类型 - 读写;Tuple
- 没有计划支持;AggregateFunction
- 没有计划支持;LowCardinality
- 添加写支持,扩展到Date
,DateTime
类型;Serde
- 除了临时接口外,还提供行序列化/反序列化接口;TLS;
- C-API ?;
async_std
运行时;
依赖关系
~225KB;