#google-cloud #big-query #gcp #async-client

gcp-bigquery-client

适用于GCP BigQuery的舒适异步客户端库

38个版本 (21个破坏性版本)

0.23.0 2024年8月10日
0.22.0 2024年7月8日
0.20.0 2024年4月28日
0.18.1 2024年3月30日
0.9.0 2021年3月28日

#32 in 数据库接口

Download history 15568/week @ 2024-05-03 20728/week @ 2024-05-10 17676/week @ 2024-05-17 14631/week @ 2024-05-24 13917/week @ 2024-05-31 17040/week @ 2024-06-07 18212/week @ 2024-06-14 18932/week @ 2024-06-21 13611/week @ 2024-06-28 14856/week @ 2024-07-05 18078/week @ 2024-07-12 15261/week @ 2024-07-19 16324/week @ 2024-07-26 16535/week @ 2024-08-02 18617/week @ 2024-08-09 19034/week @ 2024-08-16

73,194 每月下载量
用于 18 个Crates (3 个直接)

MIT/Apache

525KB
7.5K SLoC

GCP BigQuery客户端

github crates.io docs.rs

适用于GCP BigQuery的舒适Rust异步客户端库。

  • 支持所有BigQuery API端点(尚未全部通过单元测试覆盖)
  • 支持服务帐户密钥认证、工作负载身份、安装流和其他yup-oauth2机制
  • 通过构建器模式创建表和行
  • 将复杂的Rust结构体持久化到结构化的BigQuery表中
  • 异步API
  • 支持JSON列类型
  • 支持get方法的serde::de::DeserializeOwned
  • 支持BigQuery仿真器
  • 对BigQuery存储写入API的部分支持

特性

  • rust-tls (默认): 基于 RUSTLS
  • native-tls: 基于 OpenSSL

欢迎贡献。
请在此GitHub [讨论区](https://github.com/lquerel/gcp-bigquery-client/discussions) 上发布您的建议和想法。

示例

此示例执行以下操作

  • 加载一组环境变量以设置 $PROJECT_ID$DATASET_ID$TABLE_ID$GOOGLE_APPLICATION_CREDENTIALS
  • 初始化BigQuery客户端
  • 在GCP项目中创建一个数据集 $PROJECT_ID
  • 在之前创建的数据集中创建一个表(表模式)
  • 通过BigQuery流式API在之前创建的表中插入一组行。插入的行基于实现Serialize特质的常规Rust结构体。
  • 在之前创建的表上执行选择查询
  • 删除之前创建的表
  • 删除之前创建的数据集
    // Init BigQuery client
    let client = gcp_bigquery_client::Client::from_service_account_key_file(gcp_sa_key).await?;

    // Delete the dataset if needed
    let result = client.dataset().delete(project_id, dataset_id, true).await;
    if let Ok(_) = result {
        println!("Removed previous dataset '{}'", dataset_id);
    }

    // Create a new dataset
    let dataset = client
        .dataset()
        .create(
            Dataset::new(project_id, dataset_id)
                .location("US")
                .friendly_name("Just a demo dataset")
                .label("owner", "me")
                .label("env", "prod"),
        )
        .await?;

    // Create a new table
    let table = dataset
        .create_table(
            &client,
            Table::from_dataset(
                &dataset,
                table_id,
                TableSchema::new(vec![
                    TableFieldSchema::timestamp("ts"),
                    TableFieldSchema::integer("int_value"),
                    TableFieldSchema::float("float_value"),
                    TableFieldSchema::bool("bool_value"),
                    TableFieldSchema::string("string_value"),
                    TableFieldSchema::record(
                        "record_value",
                        vec![
                            TableFieldSchema::integer("int_value"),
                            TableFieldSchema::string("string_value"),
                            TableFieldSchema::record(
                                "record_value",
                                vec![
                                    TableFieldSchema::integer("int_value"),
                                    TableFieldSchema::string("string_value"),
                                ],
                            ),
                        ],
                    ),
                ]),
            )
            .friendly_name("Demo table")
            .description("A nice description for this table")
            .label("owner", "me")
            .label("env", "prod")
            .expiration_time(SystemTime::now() + Duration::from_secs(3600))
            .time_partitioning(
                TimePartitioning::per_day()
                    .expiration_ms(Duration::from_secs(3600 * 24 * 7))
                    .field("ts"),
            ),
        )
        .await?;
    println!("Table created -> {:?}", table);

    // Insert data via BigQuery Streaming API
    let mut insert_request = TableDataInsertAllRequest::new();
    insert_request.add_row(
        None,
        MyRow {
            ts: OffsetDateTime::now_utc(),
            int_value: 1,
            float_value: 1.0,
            bool_value: false,
            string_value: "first".into(),
            record_value: FirstRecordLevel {
                int_value: 10,
                string_value: "sub_level_1.1".into(),
                record_value: SecondRecordLevel {
                    int_value: 20,
                    string_value: "leaf".to_string(),
                },
            },
        },
    )?;
    insert_request.add_row(
        None,
        MyRow {
            ts: OffsetDateTime::now_utc(),
            int_value: 2,
            float_value: 2.0,
            bool_value: true,
            string_value: "second".into(),
            record_value: FirstRecordLevel {
                int_value: 11,
                string_value: "sub_level_1.2".into(),
                record_value: SecondRecordLevel {
                    int_value: 21,
                    string_value: "leaf".to_string(),
                },
            },
        },
    )?;
    insert_request.add_row(
        None,
        MyRow {
            ts: OffsetDateTime::now_utc(),
            int_value: 3,
            float_value: 3.0,
            bool_value: false,
            string_value: "third".into(),
            record_value: FirstRecordLevel {
                int_value: 12,
                string_value: "sub_level_1.3".into(),
                record_value: SecondRecordLevel {
                    int_value: 22,
                    string_value: "leaf".to_string(),
                },
            },
        },
    )?;
    insert_request.add_row(
        None,
        MyRow {
            ts: OffsetDateTime::now_utc(),
            int_value: 4,
            float_value: 4.0,
            bool_value: true,
            string_value: "fourth".into(),
            record_value: FirstRecordLevel {
                int_value: 13,
                string_value: "sub_level_1.4".into(),
                record_value: SecondRecordLevel {
                    int_value: 23,
                    string_value: "leaf".to_string(),
                },
            },
        },
    )?;

    client
        .tabledata()
        .insert_all(project_id, dataset_id, table_id, insert_request)
        .await?;

    // Query
    let mut rs = client
        .job()
        .query(
            project_id,
            QueryRequest::new(format!(
                "SELECT COUNT(*) AS c FROM `{}.{}.{}`",
                project_id, dataset_id, table_id
            )),
        )
        .await?;
    while rs.next_row() {
        println!("Number of rows inserted: {}", rs.get_i64_by_name("c")?.unwrap());
    }

    // Delete the table previously created
    client.table().delete(project_id, dataset_id, table_id).await?;

    // Delete the dataset previously created
    client.dataset().delete(project_id, dataset_id, true).await?;

示例BigQuery加载作业可以在示例目录中找到。

状态

本库的API在版本1.0之前可能发生变化。

实现端点列表

  • 数据集 - 所有方法
  • 表 - 所有方法
  • 表数据 - 所有方法
  • 作业 - 所有方法
  • 模型 - 所有方法(未测试)
  • 项目(未测试)
  • 例行程序 - 所有方法(未测试)
  • 存储写入API - 部分支持

许可证

根据您的要求,许可协议可以是Apache许可证,版本2.0MIT许可证
除非您明确声明,否则根据Apache-2.0许可证定义的任何有意提交以包含在此库中的贡献,均应如上所述双重许可,不附加任何额外条款或条件。

依赖项

~21–44MB
~1M SLoC