#google-cloud #big-query #gcp

google-cloud-bigquery

Google Cloud Platform BigQuery 客户端库

13 个版本 (破坏性更新)

0.12.0 2024年7月12日
0.9.0 2024年5月7日
0.7.0 2024年2月27日
0.6.0 2023年12月28日
0.3.0 2023年7月21日

#2018 in 网页编程

Download history 2270/week @ 2024-05-02 2661/week @ 2024-05-09 2718/week @ 2024-05-16 1779/week @ 2024-05-23 1000/week @ 2024-05-30 1028/week @ 2024-06-06 1079/week @ 2024-06-13 1002/week @ 2024-06-20 1062/week @ 2024-06-27 1196/week @ 2024-07-04 1146/week @ 2024-07-11 1153/week @ 2024-07-18 1151/week @ 2024-07-25 945/week @ 2024-08-01 1223/week @ 2024-08-08 912/week @ 2024-08-15

每月下载量:4,417
google-cloud-default 中使用

MIT 许可证

8MB
145K SLoC

Bazel 112K SLoC // 0.1% comments Rust 33K SLoC // 0.0% comments Shell 181 SLoC // 0.3% comments Go 148 SLoC // 0.2% comments Forge Config 1 SLoC // 0.8% comments

google-cloud-bigquery

Google Cloud Platform BigQuery 客户端库。

crates.io

安装

[dependencies]
google-cloud-bigquery = version

快速入门

CreateClient

create() 函数将尝试从环境变量 GOOGLE_APPLICATION_CREDENTIALSGOOGLE_APPLICATION_CREDENTIALS_JSON 或元数据服务器中读取凭证。

这也在 google-cloud-auth 中描述。

use google_cloud_bigquery::client::{ClientConfig, Client};

async fn run() {
    let (config, project_id) = ClientConfig::new_with_auth().await.unwrap();
    let client = Client::new(config).await.unwrap();
}

当您无法使用 gcloud 认证但以其他方式获取凭证(例如不同的环境变量)时,您可以解析自己的 'credentials-file' 版本并像这样使用它。

use google_cloud_auth::credentials::CredentialsFile;
// or google_cloud_bigquery::client::google_cloud_auth::credentials::CredentialsFile
use google_cloud_bigquery::client::{ClientConfig, Client};

async fn run(cred: CredentialsFile) {
    let (config, project_id) = ClientConfig::new_with_credentials(cred).await.unwrap();
    let client = Client::new(config).await.unwrap();
}

读取数据

查询

use google_cloud_bigquery::http::job::query::QueryRequest;
use google_cloud_bigquery::query::row::Row;
use google_cloud_bigquery::client::Client;

async fn run(client: &Client, project_id: &str) {
    let request = QueryRequest {
        query: "SELECT * FROM dataset.table".to_string(),
        ..Default::default()
    };
    let mut iter = client.query::<Row>(project_id, request).await.unwrap();
    while let Some(row) = iter.next().await.unwrap() {
        let col1 = row.column::<String>(0);
        let col2 = row.column::<Option<String>>(1);
    }
}

读取表

use google_cloud_bigquery::storage::row::Row;
use google_cloud_bigquery::client::Client;
use google_cloud_bigquery::http::table::TableReference;

async fn run(client: &Client, project_id: &str) {
    let table = TableReference {
        project_id: project_id.to_string(),
        dataset_id: "dataset".to_string(),
        table_id: "table".to_string(),
    };
    let mut iter = client.read_table::<Row>(&table, None).await.unwrap();
    while let Some(row) = iter.next().await.unwrap() {
        let col1 = row.column::<String>(0);
        let col2 = row.column::<Option<String>>(1);
    }
}

默认支持的类型,由 row.column::<T>() 解码的是

  • String(用于 STRING)
  • bool(用于 BOOL)
  • i64(用于 INT64)
  • f64(用于 FLOAT)
  • bigdecimal::BigDecimal(用于 NUMERIC, BIGNUMERIC)
  • Vec(用于 BINARY)
  • time::OffsetDateTime(用于 TIMESTAMP)
  • time::Date(用于 DATE)
  • time::Time(用于 TIME)
  • T: StructDecodable(用于 STRUCT)
  • Option(用于所有 NULLABLE)
  • Vec(用于 ARRAY)

插入数据

表格数据 API

use google_cloud_bigquery::http::tabledata::insert_all::{InsertAllRequest, Row};
use google_cloud_bigquery::client::Client;

#[derive(serde::Serialize)]
pub struct TestData {
    pub col1: String,
    #[serde(with = "time::serde::rfc3339::option")]
    pub col_timestamp: Option<time::OffsetDateTime>,
    // Must serialize as base64 string to insert binary data
    // #[serde(default, with = "Base64Standard")]
    pub col_binary: Vec<u8>
}

async fn run(client: &Client, project_id: &str, data: TestData) {
    let data1 = Row {
        insert_id: None,
        json: data,
    };
    let request = InsertAllRequest {
        rows: vec![data1],
        ..Default::default()
    };
    let result = client.tabledata().insert(project_id, "dataset", "table", &request).await.unwrap();
    let error = result.insert_errors;
}

运行加载作业

例如:从 GCS 加载 CSV 数据

use google_cloud_bigquery::client::Client;
use google_cloud_bigquery::http::bigquery_job_client::BigqueryJobClient;
use google_cloud_bigquery::http::job::cancel::CancelJobRequest;
use google_cloud_bigquery::http::job::get::GetJobRequest;
use google_cloud_bigquery::http::job::get_query_results::GetQueryResultsRequest;
use google_cloud_bigquery::http::job::query::QueryRequest;
use google_cloud_bigquery::http::job::{Job, JobConfiguration, JobConfigurationLoad, JobReference, JobState, JobType, OperationType, TrainingType, WriteDisposition};
use google_cloud_bigquery::http::table::{SourceFormat, TableReference};

async fn run(client: &Client, project_id: &str, data_path: &str) {
    let job = Job {
        job_reference: JobReference {
            project_id: project_id.to_string(),
            job_id: "job_id".to_string(),
            location: Some("asia-northeast1".to_string())
        },
        // CSV configuration
        configuration: JobConfiguration {
            job: JobType::Load(JobConfigurationLoad {
                source_uris: vec![format!("gs://{}.csv",data_path)],
                source_format: Some(SourceFormat::Csv),
                field_delimiter: Some("|".to_string()),
                encoding: Some("UTF-8".to_string()),
                skip_leading_rows: Some(0),
                autodetect: Some(true),
                write_disposition: Some(WriteDisposition::WriteTruncate),
                destination_table: TableReference {
                    project_id: project_id.to_string(),
                    dataset_id: "dataset".to_string(),
                    table_id: "table".to_string(),
                },
                ..Default::default()
            }),
            ..Default::default()
        },
        ..Default::default()
    };

    // Run job
    let created = client.job().create(&job).await.unwrap();

    // Check status
    assert!(created.status.errors.is_none());
    assert!(created.status.error_result.is_none());
    assert!(created.status.state == JobState::Running || created.status.state == JobState::Done);
}

功能

HTTP API

流式传输

依赖关系

~29–46MB
~871K SLoC