9个版本 (1个稳定版)

1.0.0 2022年8月18日
0.1.7 2022年8月17日
0.1.2 2022年7月22日
0.1.0 2022年6月14日

数据库接口中排名第2675

MIT许可证

230KB
5K SLoC

Pinot Client Rust


用于查询Apache Pinot的Rust库。

安装Pinot

要本地安装Pinot,请按照此Pinot快速入门链接安装并启动本地Pinot批处理快速入门。

bin/quick-start-batch.sh

或者,可以使用此仓库的docker-compose.yaml文件编排的Docker容器中的Pinot数据库。

make prepare-pinot

示例

查看客户端库GitHub仓库

git clone [email protected]:yougov/pinot-client-rust.git
cd pinot-client-rust

启动包含Pinot数据库的Docker容器

make prepare-pinot

构建并运行一个示例应用程序以从Pinot查询

cargo run --example pql-query
cargo run --example sql-query-deserialize-to-data-row
cargo run --example sql-query-deserialize-to-struct

用法

创建Pinot连接

Pinot客户端可以通过以下方式初始化:

  1. Zookeeper路径。
let client = pinot_client_rust::connection::client_from_zookeeper(
    &pinot_client_rust::zookeeper::ZookeeperConfig::new(
        vec!["localhost:2181".to_string()],
        "/PinotCluster".to_string(),
    ),
    None
);
  1. 一组代理地址。
let client = pinot_client_rust::connection::client_from_broker_list(
    vec!["localhost:8099".to_string()], None);

异步查询

可以使用pinot_client_rust::async_connection::AsyncConnection建立异步连接,该连接具有与上述描述的同步实例化方法等效的方法。

查询Pinot

请参考此示例

代码片段

fn main() {
    let client = pinot_client_rust::connection::client_from_broker_list(
        vec!["localhost:8099".to_string()], None).unwrap();
    let broker_response = client.execute_sql::<pinot_client_rust::response::data::DataRow>(
        "baseballStats",
        "select count(*) as cnt, sum(homeRuns) as sum_homeRuns from baseballStats group by teamID limit 10"
    ).unwrap();
    if let Some(stats) = broker_response.stats {
        log::info!(
            "Query Stats: response time - {} ms, scanned docs - {}, total docs - {}",
            stats.time_used_ms,
            stats.num_docs_scanned,
            stats.total_docs,
        );
    }
}

响应格式

查询响应由两种经纪人响应结构之一定义。SQL查询返回SqlResponse,其泛型参数由所有实现FromRow特质的struct支持,而PQL查询返回PqlResponseSqlResponse包含一个Table,SQL查询数据的持有者,而PqlResponse包含AggregationResultsSelectionResults,PQL查询数据的持有者。对于给定的请求,两种SqlResponsePqlResponse的异常都存储在Exception数组中。对于给定的请求,两种SqlResponsePqlResponse的统计信息都存储在ResponseStats中。

常见

Exception定义如下

/// Pinot exception.
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
pub struct PinotException {
    #[serde(rename(deserialize = "errorCode"))]
    pub error_code: i32,
    pub message: String,
}

ResponseStats定义如下

/// ResponseStats carries all stats returned by a query.
#[derive(Clone, Debug, PartialEq)]
pub struct ResponseStats {
    pub trace_info: HashMap<String, String>,
    pub num_servers_queried: i32,
    pub num_servers_responded: i32,
    pub num_segments_queried: i32,
    pub num_segments_processed: i32,
    pub num_segments_matched: i32,
    pub num_consuming_segments_queried: i32,
    pub num_docs_scanned: i64,
    pub num_entries_scanned_in_filter: i64,
    pub num_entries_scanned_post_filter: i64,
    pub num_groups_limit_reached: bool,
    pub total_docs: i64,
    pub time_used_ms: i32,
    pub min_consuming_freshness_time_ms: i64,
}

PQL

PqlResponse 被定义为

/// PqlResponse is the data structure for broker response to a PQL query.
#[derive(Clone, Debug, PartialEq)]
pub struct PqlResponse {
    pub aggregation_results: Vec<AggregationResult>,
    pub selection_results: Option<SelectionResults>,
    pub stats: Option<ResponseStats>,
}

SQL

SqlResponse 被定义为

/// SqlResponse is the data structure for a broker response to an SQL query.
#[derive(Clone, Debug, PartialEq)]
pub struct SqlResponse<T: FromRow> {
    pub table: Option<Table<T>>,
    pub stats: Option<ResponseStats>,
}

Table 被定义为

/// Table is the holder for SQL queries.
#[derive(Clone, Debug, PartialEq)]
pub struct Table<T: FromRow> {
    schema: Schema,
    rows: Vec<T>,
}

Schema 被定义为

/// Schema is response schema with a bimap to allow easy name <-> index retrieval
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Schema {
    column_data_types: Vec<DataType>,
    column_name_to_index: bimap::BiMap::<String, usize>,
}

Schema 定义了多个函数,例如

fn get_column_count(&self) -> usize;
fn get_column_name(&self, column_index: usize) -> Result<&str>;
fn get_column_index(&self, column_name: &str) -> Result<usize>;
fn get_column_data_type(&self, column_index: usize) -> Result<DataType>;
fn get_column_data_type_by_name(&self, column_name: &str) -> Result<DataType>;

DataType 被定义为

/// Pinot native types
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum DataType {
    Int,
    Long,
    Float,
    Double,
    Boolean,
    Timestamp,
    String,
    Json,
    Bytes,
    IntArray,
    LongArray,
    FloatArray,
    DoubleArray,
    BooleanArray,
    TimestampArray,
    StringArray,
    BytesArray,
}

FromRow 被定义为

/// FromRow represents any structure which can deserialize
/// the Table.rows json field provided a `Schema`
pub trait FromRow: Sized {
    fn from_row(
        data_schema: &Schema,
        row: Vec<Value>,
    ) -> std::result::Result<Self, serde_json::Error>;
}

除了由 DataRow 实现,FromRow 还由所有实现 serde::de::Deserialize 的实现者实现,这是通过首先将响应反序列化为 JSON,然后在每行反序列化为最终形式之前,将列名到值的 JSON 映射替换来实现的。此外,还提供了一些 serde 反序列化函数来反序列化复杂的 Pinot 类型

/// Converts Pinot timestamps into `Vec<DateTime<Utc>>` using `deserialize_timestamps_from_json()`.
fn deserialize_timestamps<'de, D>(deserializer: D) -> std::result::Result<Vec<DateTime<Utc>>, D::Error>...

/// Converts Pinot timestamps into `DateTime<Utc>` using `deserialize_timestamp_from_json()`.
pub fn deserialize_timestamp<'de, D>(deserializer: D) -> std::result::Result<DateTime<Utc>, D::Error>...

/// Converts Pinot hex strings into `Vec<Vec<u8>>` using `deserialize_bytes_array_from_json()`.
pub fn deserialize_bytes_array<'de, D>(deserializer: D) -> std::result::Result<Vec<Vec<u8>>, D::Error>...

/// Converts Pinot hex string into `Vec<u8>` using `deserialize_bytes_from_json()`.
pub fn deserialize_bytes<'de, D>(deserializer: D) -> std::result::Result<Vec<u8>, D::Error>...

/// Deserializes json potentially packaged into a string by calling `deserialize_json_from_json()`.
pub fn deserialize_json<'de, D>(deserializer: D) -> std::result::Result<Value, D::Error>

例如使用方法,请参阅此 示例

DataRow 被定义为

/// A row of `Data`
#[derive(Clone, Debug, PartialEq)]
pub struct DataRow {
    row: Vec<Data>,
}

Data 被定义为

/// Typed Pinot data
#[derive(Clone, Debug, PartialEq)]
pub enum Data {
    Int(i32),
    Long(i64),
    Float(f32),
    Double(f64),
    Boolean(bool),
    Timestamp(DateTime<Utc>),
    String(String),
    Json(Value),
    Bytes(Vec<u8>),
    IntArray(Vec<i32>),
    LongArray(Vec<i64>),
    FloatArray(Vec<f32>),
    DoubleArray(Vec<f64>),
    BooleanArray(Vec<bool>),
    TimestampArray(Vec<DateTime<Utc>>),
    StringArray(Vec<String>),
    BytesArray(Vec<Vec<u8>>),
    Null(DataType),
}

Data 定义了多个函数,例如

fn data_type(&self) -> DataType;
fn get_int(&self) -> Result<i32>;
fn get_long(&self) -> Result<i64>;
fn get_float(&self) -> Result<f32>;
fn get_double(&self) -> Result<f64>;
fn get_boolean(&self) -> Result<bool>;
fn get_timestamp(&self) -> Result<DateTime<Utc>>;
fn get_string(&self) -> Result<&str>;
fn get_json(&self) -> Result<&Value>;
fn get_bytes(&self) -> Result<&Vec<u8>>;
fn is_null(&self) -> bool;

除了行数之外,DataRow 还包含一些方便的对应函数,这些函数针对上述列索引。

依赖项

~6–22MB
~301K SLoC