1 个不稳定版本

0.1.0 2020年8月7日

#2441数据库接口

Download history 140/week @ 2024-03-11 106/week @ 2024-03-18 198/week @ 2024-03-25 131/week @ 2024-04-01 77/week @ 2024-04-08 111/week @ 2024-04-15 156/week @ 2024-04-22 81/week @ 2024-04-29 40/week @ 2024-05-06 123/week @ 2024-05-13 23/week @ 2024-05-20 36/week @ 2024-05-27 17/week @ 2024-06-03 111/week @ 2024-06-10 119/week @ 2024-06-17 62/week @ 2024-06-24

309 每月下载量
2 crate 中使用

MIT 许可证

23KB
490

ClickHouse Rust 客户端

Build Status Rust

基于 tokio 的异步纯 Rust ClickHouse 客户端库

开发状态: alpha

在 Linux x86-64 (ubuntu 20.04 LTS)、Windows 10 上测试

为什么

  • 为了创建一个小的、健壮的 ClickHouse 驱动程序,ClickHouse 是一个快速的、开源的列式数据库
  • 为了学习 Rust 并发和零成本抽象

支持的功能

  • 基于 tokio 的异步引擎
  • 本地 ClickHouse 协议
  • LZ4 压缩
  • 持久连接池
  • 简单的行到对象映射器
  • 日期 | DateTime | DateTime64- 读写
  • (U)Int(8|16|32|64) - 读写
  • Float32 | Float64 - 读写
  • UUID - 读写
  • String | FixedString- 读写
  • Ipv4 | Ipv6 - 读写
  • Nullable(*) - 读写
  • Decimal - 读写
  • Enum8, Enum16 - 读写

用例

  • 使用 ClickHouse 服务器支持的 SQL 语法执行查询
  • 执行任意的 DDL 命令
  • 查询服务器状态
  • 向 ClickHouse 服务器插入大(可能持续)数据流
  • 使用轮询方法进行负载均衡

快速入门

需要 rust 1.42。

该包尚未在 crates.io 上发布。从 主页 Git 下载源代码

git module update --init --recursive

构建需要 rust 1.41 稳定版或 nightly,tokio-0.2.x。

  • 将以下行添加到 dependencies 部分的 Cargo.toml
 clickhouse-driver = { version="0.1.0-alpha.3", path="../path_to_package/clickhouse-driver"}
 clickhouse-driver-lz4 = { version="0.1.0", path="../path_to_package/lz4a"}
 clickhouse-driver-cthrs = { version="0.1.0", path="../path_to_package/cityhash-rs"}

  • 在 main.rs 中添加用法
  extern crate clickhouse_driver;   
  use clickhouse_driver::prelude::*;

要连接到服务器,请提供连接 URL

tcp://username:password@localhost/database?paramname=paramvalue&...

例如

tcp://user:default@localhost/log?ping_timout=200ms&execute_timeout=5s&query_timeout=20s&pool_max=4&compression=lz4

支持的 URL 参数

  • compression - 接受 'lz4' 或 'none'。lz4 - 快速且高效的压缩方法。如果用于大数据块,可以显着减少传输数据的大小和时间。对于小数据,最好选择不压缩;

  • connection_timeout - 建立连接的超时时间。默认为 500ms;

  • execute_timeout - 等待 execute 方法调用结果的超时时间。如果执行用于更改大型表,则可能需要很长时间才能完成。在这种情况下,设置适当的参数。在其他情况下,请保留默认值(180 秒);

  • query_timout - 在查询调用中等待服务器返回下一块数据的超时时间。注意:大数据查询可能需要较长时间。此超时要求在超时结束前只接收一个数据块。默认值为180秒;

  • insert_timeout - 等待 insert 调用结果超时。如果服务器在 insert_timeout 结束前未收到消息,则插入方法调用返回错误。由于插入数据处理是异步的,它不包括服务器块处理时间。默认值为180秒;

  • ping_timout - 在发送 ping 响应之前等待的时间。如果服务器在 ping_timeout 结束前未返回 pong 响应,则认为该主机不可用;
    在 ping_timeout 结束时;

  • retry_timeout - 如果服务器未返回,发送下一个 ping 之前等待的秒数;

  • ping_before_query - 1(默认)或0。如果设置此选项,则要求驱动程序在返回连接到池后检查 Clickhouse 服务器的可用性;

  • pool_min - 连接池的最小大小。池中可以保持的空闲连接数;默认值为2;

  • pool_max - 池可以建立的连接的最大数量。如果在池达到最大值且没有空闲连接时需要新的连接,则此任务将被放入等待队列。默认值为10;

  • readonly - 0(默认)|1|2。0 - 允许所有命令。2 - select 查询和更改设置,1 - 只允许 select 查询;

  • keepalive - keepalive TCP 选项;

  • host - 替代主机;

所有超时参数都接受整数 - 秒数。要指定毫秒超时,请将 ms 添加到末尾。示例

  • 200ms(200 毫秒)
  • 20(20 秒)
  • 10s(10 秒)

示例


struct Blob {
    id: u64,
    url: String,
    date: ServerDate,
    client: Uuid,
    ip: Ipv4Addr,
    value: Decimal32,
}

impl Deserialize for Blob {
    fn deserialize(row: Row) -> errors::Result<Self> {
        let err = || errors::ConversionError::UnsupportedConversion;

        let id: u64 = row.value(0)?.ok_or_else(err)?;
        let url: &str = row.value(1)?.ok_or_else(err)?;
        let date: ServerDate = row.value(2)?.ok_or_else(err)?;
        let client: Uuid = row.value(3)?.ok_or_else(err)?;
        let ip = row.value(4)?.ok_or_else(err)?;
        let value: Decimal32 = row.value(5)?.ok_or_else(err)?;

        Ok(Blob {
            id,
            date,
            client,
            value,
            url: url.to_string(),
            ip,
        })
    }
}

#[tokio::main]
async fn main() -> Result<(), io::Error> {

    //    CREATE TABLE IF NOT EXISTS blob (
    //        id          UInt64,
    //        url         String,
    //        date        DateTime,
    //        client      UUID,
    //        ip          IPv4
    //    ) ENGINE=MergeTree PARTITION BY id ORDER BY date

    let database_url =
        env::var("DATABASE_URL").unwrap_or_else(|_| "tcp://127.0.0.1:9000?compression=lz4".into());

    let pool = Pool::create(database_url.as_str())?;
    {
        let mut conn = pool.connection().await?;
        conn.ping().await?;

        let mut result = conn
            .query("SELECT id, url, date, client, ip FROM blob WHERE id=150  ORDER BY date LIMIT 30000")
            .await?;

        while let Some(block) = result.next().await? {
            for blob in block.iter::<Blob>() {
                ...
            }
        }
    }

    Ok(())
}
  • 更多示例请参阅 clickhouse-driver/examples/ 目录*

已知问题和限制

  • 不支持多维数组;
  • 数组数据类型只读;
  • LowCardinality - 只读且仅基于 String 类型;
  • 插入方法仅支持有限的数据类型 insert 要求插入的数据与表列类型完全匹配
    • Int8(16|32|64) - i8(16|32|64)
    • UInt8(16|32|64) - u8(16|32|64)
    • Float32 - f32
    • Float64 - f64
    • Date - chrono::Date
    • DateTime - chrono::DateTime
    • UUID - Uuid
    • IPv4 - AddrIpv4
    • IPv6 - AddrIpv6
    • String - &str,String, 或 &[u8]
    • Enum8|16 - &str 或 String。此外,枚举索引的 i16 值也可以检索。

路线图

  • Array 列数据类型 - 可读/写
  • Tuple - 没有计划支持
  • AggregateFunction - 没有计划支持
  • LowCardinality - 添加写支持,扩展到 DateDateTime 类型
  • Serde - 除了临时的接口外,还提供行序列化/反序列化接口
  • TLS
  • C-API ?
  • async_std 运行时

依赖关系

~225KB