3 个不稳定版本

0.2.0 2024年7月16日
0.1.1 2021年7月7日
0.1.0 2021年5月31日

#403HTTP客户端

Download history 6466/week @ 2024-05-02 6319/week @ 2024-05-09 6625/week @ 2024-05-16 5318/week @ 2024-05-23 5704/week @ 2024-05-30 5086/week @ 2024-06-06 6577/week @ 2024-06-13 6609/week @ 2024-06-20 6561/week @ 2024-06-27 5850/week @ 2024-07-04 6807/week @ 2024-07-11 6349/week @ 2024-07-18 5824/week @ 2024-07-25 6111/week @ 2024-08-01 6755/week @ 2024-08-08 6445/week @ 2024-08-15

26,303 每月下载量
用于 14 个crate (3直接)

MIT/Apache

4KB

clickhouse-rs

ClickHouse的强类型客户端。

Crates.io Documentation License Build Status

  • 使用serde进行行的编码/解码。
  • 支持serde属性:skip_serializingskip_deserializingrename
  • 通过HTTP传输使用RowBinary编码。
    • 计划切换到TCP上的Native
  • 支持TLS(见下面的native-tlsrustls-tls功能)。
  • 支持压缩和解压缩(LZ4和LZ4HC)。
  • 提供选择API。
  • 提供插入API。
  • 提供无限事务性插入API(见下文)。
  • 提供监控实时视图的API。
  • 提供单元测试的mocks。

注意:ch2rs用于从ClickHouse生成行类型很有用。

用法

要使用crate,将以下内容添加到您的Cargo.toml

[dependencies]
clickhouse = "0.12.1"

[dev-dependencies]
clickhouse = { version = "0.12.1", features = ["test-util"] }

关于v22.6之前的ClickHouse的说明

CH服务器版本低于v22.6(2022-06-16)在某些罕见情况下处理RowBinary 不正确。使用0.11并启用wa-37420功能来解决问题。不要用于新版本。

创建客户端

use clickhouse::Client;

let client = Client::default()
    .with_url("https://127.0.0.1:8123")
    .with_user("name")
    .with_password("123")
    .with_database("test");
  • 重复使用创建的客户端或克隆它们以重复使用连接池。

选择行

use serde::Deserialize;
use clickhouse::Row;

#[derive(Row, Deserialize)]
struct MyRow<'a> {
    no: u32,
    name: &'a str,
}

let mut cursor = client
    .query("SELECT ?fields FROM some WHERE no BETWEEN ? AND ?")
    .bind(500)
    .bind(504)
    .fetch::<MyRow<'_>>()?;

while let Some(row) = cursor.next().await? { .. }
  • 占位符 ?字段 被替换为 no, nameRow的字段)。
  • 占位符 ? 被替换为以下 bind() 调用的值。
  • 可以使用 fetch_one::<Row>()fetch_all::<Row>() 来分别获取第一行或所有行。
  • 可以使用 sql::Identifier 来绑定表名。

注意,游标即使在返回了一些行之后也可能返回错误。为了避免这种情况,请使用 client.with_option("wait_end_of_query", "1") 来在服务器端启用缓冲。更多详情请参见 这里。也可以使用 buffer_size 选项。

插入一批数据

use serde::Serialize;
use clickhouse::Row;

#[derive(Row, Serialize)]
struct MyRow {
    no: u32,
    name: String,
}

let mut insert = client.insert("some")?;
insert.write(&MyRow { no: 0, name: "foo".into() }).await?;
insert.write(&MyRow { no: 1, name: "bar".into() }).await?;
insert.end().await?;
  • 如果没有调用 end(),则 INSERT 将被终止。
  • 行正在逐渐发送,以分散网络负载。
  • 只有当所有行都位于同一个分区并且行数小于 max_insert_block_size 时,ClickHouse 才会原子性地插入批次。

无限插入

需要 inserter 功能。

let mut inserter = client.inserter("some")?
    .with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20)))
    .with_max_bytes(50_000_000)
    .with_max_rows(750_000)
    .with_period(Some(Duration::from_secs(15)));

inserter.write(&MyRow { no: 0, name: "foo".into() })?;
inserter.write(&MyRow { no: 1, name: "bar".into() })?;
let stats = inserter.commit().await?;
if stats.rows > 0 {
    println!(
        "{} bytes, {} rows, {} transactions have been inserted",
        stats.bytes, stats.rows, stats.transactions,
    );
}
  • Inserter 在达到阈值(max_bytesmax_rowsperiod)时结束活动插入 commit()
  • 通过使用 with_period_bias 来调整结束活动 INSERT 的间隔,可以避免并行插入器造成的负载峰值。
  • Inserter::time_left() 可以用来检测当前周期何时结束。如果您的流很少发出项目,请再次调用 Inserter::commit() 以检查限制。
  • 使用 quanta crate 实现的时间阈值可以加速 inserter。如果启用了 test-util,则不会使用(因此,自定义测试中可以使用 tokio::time::advance() 来管理时间)。
  • 在同一个 INSERT 语句中插入的所有行都位于 commit() 调用之间。
  • 如果您想终止插入,别忘了刷新。
inserter.end().await?;

执行 DDL 操作

client.query("DROP TABLE IF EXISTS some").execute().await?;

实时视图

需要 watch 功能。

let mut cursor = client
    .watch("SELECT max(no), argMax(name, no) FROM some")
    .fetch::<Row<'_>>()?;

let (version, row) = cursor.next().await?.unwrap();
println!("live view updated: version={}, row={:?}", version, row);

// Use `only_events()` to iterate over versions only.
let mut cursor = client.watch("some_live_view").limit(20).only_events().fetch()?;
println!("live view updated: version={:?}", cursor.next().await?);
  • 请谨慎使用 这里
  • 此代码使用或创建一个名为 lv_{sha1(query)} 的临时实时视图,以便并行监视器可以重用相同的实时视图。
  • 您可以使用名称而不是查询来指定。
  • 由于 该问题,此 API 在底层使用 JSONEachRowWithProgress
  • 只能使用结构体行。避免使用未指定名称的 fetch::<u64>() 等表达式。

请参阅示例

功能标志

  • lz4(默认启用)— 启用 Compression::Lz4Compression::Lz4Hc(_) 变体。如果启用,默认情况下,除了 WATCH 外的所有查询都使用 Compression::Lz4
  • native-tls — 通过 hyper-tls 支持带有 HTTPS 架构的 URL,该架构链接到 OpenSSL。
  • rustls-tls — 通过 hyper-rustls 支持带有 HTTPS 架构的 URL,该架构不链接到 OpenSSL。
  • inserter — 启用 client.inserter()
  • test-util — 添加模拟。请参阅示例。仅在 dev-dependencies 中使用。
  • watch — 启用 client.watch 功能。请参阅相应的部分以获取详细信息。
  • uuid — 将 serde::uuid 添加到与 uuid 包一起使用。
  • time — 将 serde::time 添加到与 time 包一起使用。

注意:当通过 HTTPS URL 连接到 ClickHouse 时,您必须启用 native-tlsrustls-tls 功能。如果两者都启用,则 rustls-tls 功能将优先。

数据类型

  • (U)Int(8|16|32|64|128) 映射到/从相应的 (u|i)(8|16|32|64|128) 类型或它们周围的新类型。

  • (U)Int256 不直接支持,但有一个解决方案

  • Float(32|64) 映射到/从相应的 f(32|64) 或它们周围的新类型。

  • Decimal(32|64|128) 可以映射到/从相应的 i(32|64|128) 或其周围的 newtypes。使用 fixnum 或其他有符号定点数的实现更为方便。

  • Boolean 可以映射到/从 bool 或其周围的 newtypes。

  • String 可以映射到/从任何字符串或字节类型,例如 &str&[u8]StringVec<u8>SmartString。也支持 newtypes。为了存储字节,请考虑使用 serde_bytes,因为它更高效。

    示例
    #[derive(Row, Debug, Serialize, Deserialize)]
    struct MyRow<'a> {
        str: &'a str,
        string: String,
        #[serde(with = "serde_bytes")]
        bytes: Vec<u8>,
        #[serde(with = "serde_bytes")]
        byte_slice: &'a [u8],
    }
    
  • FixedString(_) 目前还不支持。

  • Enum(8|16) 使用 serde_repr 支持。

    示例
    use serde_repr::{Deserialize_repr, Serialize_repr};
    
    #[derive(Row, Serialize, Deserialize)]
    struct MyRow {
        level: Level,
    }
    
    #[derive(Debug, Serialize_repr, Deserialize_repr)]
    #[repr(u8)]
    enum Level {
        Debug = 1,
        Info = 2,
        Warn = 3,
        Error = 4,
    }
    
  • UUID 通过使用 serde::uuid 映射到/从 uuid::Uuid。需要 uuid 功能。

    示例
    #[derive(Row, Serialize, Deserialize)]
    struct MyRow {
        #[serde(with = "clickhouse::serde::uuid")]
        uuid: uuid::Uuid,
    }
    
  • IPv6 通过使用 serde::ipv4 映射到/从 std::net::Ipv6Addr

  • IPv4 通过使用 serde::ipv4 映射到/从 std::net::Ipv4Addr

    示例
    #[derive(Row, Serialize, Deserialize)]
    struct MyRow {
        #[serde(with = "clickhouse::serde::ipv4")]
        ipv4: std::net::Ipv4Addr,
    }
    
  • Date 可以映射到/从 u16 或其周围的 newtypes,表示自 1970-01-01 以来经过的天数。此外,通过使用 serde::time::date 支持使用 time::Date,这需要 time 功能。

    示例
    #[derive(Row, Serialize, Deserialize)]
    struct MyRow {
        days: u16,
        #[serde(with = "clickhouse::serde::time::date")]
        date: Date,
    }
    
  • Date32 映射到/从 i32 或其新的类型,代表自 1970-01-01 以来经过的天数。此外,使用 serde::time::date32 支持使用 time::Date,需要 time 功能。

    示例
    #[derive(Row, Serialize, Deserialize)]
    struct MyRow {
        days: i32,
        #[serde(with = "clickhouse::serde::time::date32")]
        date: Date,
    }
    
  • DateTime 映射到/从 u32 或其新的类型,代表自 UNIX 纪元以来经过的秒数。此外,使用 serde::time::datetime 支持使用 time::OffsetDateTime,需要 time 功能。

    示例
    #[derive(Row, Serialize, Deserialize)]
    struct MyRow {
        ts: u32,
        #[serde(with = "clickhouse::serde::time::datetime")]
        dt: OffsetDateTime,
    }
    
  • DateTime64(_) 映射到/从 i32 或其新的类型,代表自 UNIX 纪元以来经过的时间。此外,使用 serde::time::datetime64::* 支持使用 time::OffsetDateTime,需要 time 功能。

    示例
    #[derive(Row, Serialize, Deserialize)]
    struct MyRow {
        ts: i64, // elapsed s/us/ms/ns depending on `DateTime64(X)`
        #[serde(with = "clickhouse::serde::time::datetime64::secs")]
        dt64s: OffsetDateTime,  // `DateTime64(0)`
        #[serde(with = "clickhouse::serde::time::datetime64::millis")]
        dt64ms: OffsetDateTime, // `DateTime64(3)`
        #[serde(with = "clickhouse::serde::time::datetime64::micros")]
        dt64us: OffsetDateTime, // `DateTime64(6)`
        #[serde(with = "clickhouse::serde::time::datetime64::nanos")]
        dt64ns: OffsetDateTime, // `DateTime64(9)`
    }
    
  • Typle(A, B, ...) 映射到/从 (A, B, ...) 或其新的类型。

  • Array(_) 映射到/从任何切片,例如 Vec<_>&[_]。也支持新的类型。

  • Map(K, V) 的行为类似于 Array((K, V))

  • LowCardinality(_) 可以无缝支持。

  • Nullable(_) 映射到/从 Option<_>。对于 clickhouse::serde::* 辅助函数添加 ::option

    示例
    #[derive(Row, Serialize, Deserialize)]
    struct MyRow {
        #[serde(with = "clickhouse::serde::ipv4::option")]
        ipv4_opt: Option<Ipv4Addr>,
    }
    
  • Nested 通过提供多个具有重命名的数组来支持。

    示例
    // CREATE TABLE test(items Nested(name String, count UInt32))
    #[derive(Row, Serialize, Deserialize)]
    struct MyRow {
        #[serde(rename = "items.name")]
        items_name: Vec<String>,
        #[serde(rename = "items.count")]
        items_count: Vec<u32>,
    }
    
  • 目前不支持 JSONGeo

Mocking

该软件包提供用于模拟 CH 服务器和测试 DDL、SELECTINSERTWATCH 查询的实用工具。

可以通过 test-util 功能启用此功能。仅在使用开发依赖项时使用它。

请参阅示例

依赖关系

~0.4–0.9MB
~21K SLoC