13 个版本 (破坏性更新)
0.12.0 | 2024年7月12日 |
---|---|
0.9.0 | 2024年5月7日 |
0.7.0 | 2024年2月27日 |
0.6.0 | 2023年12月28日 |
0.3.0 | 2023年7月21日 |
#2018 in 网页编程
每月下载量:4,417
在 google-cloud-default 中使用
8MB
145K SLoC
google-cloud-bigquery
Google Cloud Platform BigQuery 客户端库。
安装
[dependencies]
google-cloud-bigquery = version
快速入门
CreateClient
create()
函数将尝试从环境变量 GOOGLE_APPLICATION_CREDENTIALS
、GOOGLE_APPLICATION_CREDENTIALS_JSON
或元数据服务器中读取凭证。
这也在 google-cloud-auth 中描述。
use google_cloud_bigquery::client::{ClientConfig, Client};
async fn run() {
let (config, project_id) = ClientConfig::new_with_auth().await.unwrap();
let client = Client::new(config).await.unwrap();
}
当您无法使用 gcloud
认证但以其他方式获取凭证(例如不同的环境变量)时,您可以解析自己的 'credentials-file' 版本并像这样使用它。
use google_cloud_auth::credentials::CredentialsFile;
// or google_cloud_bigquery::client::google_cloud_auth::credentials::CredentialsFile
use google_cloud_bigquery::client::{ClientConfig, Client};
async fn run(cred: CredentialsFile) {
let (config, project_id) = ClientConfig::new_with_credentials(cred).await.unwrap();
let client = Client::new(config).await.unwrap();
}
读取数据
查询
use google_cloud_bigquery::http::job::query::QueryRequest;
use google_cloud_bigquery::query::row::Row;
use google_cloud_bigquery::client::Client;
async fn run(client: &Client, project_id: &str) {
let request = QueryRequest {
query: "SELECT * FROM dataset.table".to_string(),
..Default::default()
};
let mut iter = client.query::<Row>(project_id, request).await.unwrap();
while let Some(row) = iter.next().await.unwrap() {
let col1 = row.column::<String>(0);
let col2 = row.column::<Option<String>>(1);
}
}
读取表
use google_cloud_bigquery::storage::row::Row;
use google_cloud_bigquery::client::Client;
use google_cloud_bigquery::http::table::TableReference;
async fn run(client: &Client, project_id: &str) {
let table = TableReference {
project_id: project_id.to_string(),
dataset_id: "dataset".to_string(),
table_id: "table".to_string(),
};
let mut iter = client.read_table::<Row>(&table, None).await.unwrap();
while let Some(row) = iter.next().await.unwrap() {
let col1 = row.column::<String>(0);
let col2 = row.column::<Option<String>>(1);
}
}
值
默认支持的类型,由 row.column::<T>()
解码的是
- String(用于 STRING)
- bool(用于 BOOL)
- i64(用于 INT64)
- f64(用于 FLOAT)
- bigdecimal::BigDecimal(用于 NUMERIC, BIGNUMERIC)
- Vec(用于 BINARY)
- time::OffsetDateTime(用于 TIMESTAMP)
- time::Date(用于 DATE)
- time::Time(用于 TIME)
- T: StructDecodable(用于 STRUCT)
- Option(用于所有 NULLABLE)
- Vec(用于 ARRAY)
插入数据
表格数据 API
use google_cloud_bigquery::http::tabledata::insert_all::{InsertAllRequest, Row};
use google_cloud_bigquery::client::Client;
#[derive(serde::Serialize)]
pub struct TestData {
pub col1: String,
#[serde(with = "time::serde::rfc3339::option")]
pub col_timestamp: Option<time::OffsetDateTime>,
// Must serialize as base64 string to insert binary data
// #[serde(default, with = "Base64Standard")]
pub col_binary: Vec<u8>
}
async fn run(client: &Client, project_id: &str, data: TestData) {
let data1 = Row {
insert_id: None,
json: data,
};
let request = InsertAllRequest {
rows: vec![data1],
..Default::default()
};
let result = client.tabledata().insert(project_id, "dataset", "table", &request).await.unwrap();
let error = result.insert_errors;
}
运行加载作业
例如:从 GCS 加载 CSV 数据
use google_cloud_bigquery::client::Client;
use google_cloud_bigquery::http::bigquery_job_client::BigqueryJobClient;
use google_cloud_bigquery::http::job::cancel::CancelJobRequest;
use google_cloud_bigquery::http::job::get::GetJobRequest;
use google_cloud_bigquery::http::job::get_query_results::GetQueryResultsRequest;
use google_cloud_bigquery::http::job::query::QueryRequest;
use google_cloud_bigquery::http::job::{Job, JobConfiguration, JobConfigurationLoad, JobReference, JobState, JobType, OperationType, TrainingType, WriteDisposition};
use google_cloud_bigquery::http::table::{SourceFormat, TableReference};
async fn run(client: &Client, project_id: &str, data_path: &str) {
let job = Job {
job_reference: JobReference {
project_id: project_id.to_string(),
job_id: "job_id".to_string(),
location: Some("asia-northeast1".to_string())
},
// CSV configuration
configuration: JobConfiguration {
job: JobType::Load(JobConfigurationLoad {
source_uris: vec![format!("gs://{}.csv",data_path)],
source_format: Some(SourceFormat::Csv),
field_delimiter: Some("|".to_string()),
encoding: Some("UTF-8".to_string()),
skip_leading_rows: Some(0),
autodetect: Some(true),
write_disposition: Some(WriteDisposition::WriteTruncate),
destination_table: TableReference {
project_id: project_id.to_string(),
dataset_id: "dataset".to_string(),
table_id: "table".to_string(),
},
..Default::default()
}),
..Default::default()
},
..Default::default()
};
// Run job
let created = client.job().create(&job).await.unwrap();
// Check status
assert!(created.status.errors.is_none());
assert!(created.status.error_result.is_none());
assert!(created.status.state == JobState::Running || created.status.state == JobState::Done);
}
功能
HTTP API
流式传输
依赖关系
~29–46MB
~871K SLoC