#clickhouse #database-client #tokio #async-io

clickhouse-rs

异步Yandex ClickHouse客户端库

2个版本

0.2.0-alpha.72021年5月1日
0.2.0-alpha.62021年1月15日

#2013数据库接口


3 个Crates中使用 (通过 v-common-search)

MIT 许可证

410KB
12K SLoC

异步ClickHouse客户端

Build Status Crate info Documentation dependency status Coverage Status

为Rust编程语言提供的异步Yandex ClickHouse客户端库。

安装

库托管在 crates.io

[dependencies]
clickhouse-rs = "*"

支持的数据类型

  • 日期
  • 日期时间
  • Decimal(P, S)
  • Float32, Float64
  • 字符串, 固定字符串(N)
  • UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64
  • Nullable(T)
  • Array(UInt/Int/Float/String/Date/DateTime)
  • IPv4/IPv6
  • UUID

DNS

schema://user:password@host[:port]/database?param1=value1&...&paramN=valueN

参数

  • compression - 是否使用压缩(默认为 none)。可能的选项

    • none
    • lz4
  • readonly - 限制读取数据、写入数据和更改设置查询的权限。 (默认为 none)。可能的选项

    • 0 - 允许所有查询。
    • 1 - 只允许读取数据查询。
    • 2 - 允许读取数据和更改设置查询。
  • connection_timeout - 连接超时(默认为 500 ms

  • query_timeout - 查询超时(默认为 180 sec)。

  • insert_timeout - 插入超时(默认为 180 sec)。

  • execute_timeout - 执行超时(默认为 180 sec)。

  • keepalive - 毫秒级的TCP keep alive超时。

  • nodelay - 是否启用 TCP_NODELAY (默认为 true)。

  • pool_min - Pool 打开连接的下限(默认为 10)。

  • pool_max - Pool 打开的连接的上限(默认为 20)。

  • ping_before_query - 在执行任何查询之前每次都ping服务器。 (默认为 true)。

  • send_retries - 向服务器发送请求的重试次数。 (默认为 3)。

  • retry_timeout - 下次重试前等待的时间。 (默认为 5)。

  • ping_timeout - ping 的超时时间(默认为 500 毫秒)。

  • alt_hosts - 以逗号分隔的单一地址主机列表,用于负载均衡。

示例

tcp://user:password@host:9000/clicks?compression=lz4&ping_timeout=42ms

可选功能

clickhouse-rs 将一些功能放在可选功能后面,以优化最常见用例的编译时间。以下功能可用。

  • tokio_io (默认启用) — 基于 Tokio 的 I/O。
  • async_std — 基于 async-std 的 I/O(不能与 tokio_io 同时使用)。
  • tls — TLS 支持(仅与 tokio_io 一起允许)。

示例

use clickhouse_rs::{Block, Pool};
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let ddl = r"
        CREATE TABLE IF NOT EXISTS payment (
            customer_id  UInt32,
            amount       UInt32,
            account_name Nullable(FixedString(3))
        ) Engine=Memory";

    let block = Block::new()
        .column("customer_id",  vec![1_u32,  3,  5,  7,  9])
        .column("amount",       vec![2_u32,  4,  6,  8, 10])
        .column("account_name", vec![Some("foo"), None, None, None, Some("bar")]);

    let pool = Pool::new(database_url);

    let mut client = pool.get_handle().await?;
    client.execute(ddl).await?;
    client.insert("payment", block).await?;
    let block = client.query("SELECT * FROM payment").fetch_all().await?;

    for row in block.rows() {
        let id: u32             = row.get("customer_id")?;
        let amount: u32         = row.get("amount")?;
        let name: Option<&str>  = row.get("account_name")?;
        println!("Found payment {}: {} {:?}", id, amount, name);
    }

    Ok(())
}

依赖项

~7–20MB
~290K SLoC