#druid #tokio #async-client

druid-io

异步Apache Druid客户端库

1个不稳定版本

0.1.0 2020年9月7日

#1881数据库接口

MIT 许可证

61KB
1.5K SLoC

Apache Druid的异步Rust客户端

完全异步,支持未来的Rust编程语言Apache Druid客户端库。

该库提供了静态类型API,用于原生查询

安装

该库托管在crates.io

[dependencies]
druid-io = "*"

支持的原生查询

  • 时间序列
  • TopN
  • 分组
  • 扫描
  • 搜索
  • 时间边界
  • 段元数据
  • 数据源元数据

用法

客户端

通过完全静态提供的代理列表连接到druid集群


let druid_client = DruidClient::new(vec!["localhost:8082".to_string()]);

通过Zookeeper连接到Druid集群 - 支持自动发现新代理和负载均衡


TODO:

查询

时间序列

查看时间序列查询文档

#[derive(Serialize, Deserialize, Debug)]
pub struct TimeAggr {
    count: usize,
    count_fraction: f32,
    user: String,
}

let timeseries = Timeseries {
    data_source: DataSource::table("wikipedia"),
    limit: Some(10),
    descending: false,
    granularity: Granularity::All,
    filter: Some(Filter::selector("user", "Taffe316")),
    aggregations: vec![
        Aggregation::count("count"),
        Aggregation::StringFirst {
            name: "user".into(),
            field_name: "user".into(),
            max_string_bytes: 1024,
        },
    ],
    post_aggregations: vec![PostAggregation::Arithmetic {
        name: "count_fraction".into(),
        function: "/".into(),
        fields: vec![
            PostAggregator::field_access("count_percent", "count"),
            PostAggregator::constant("hundred", 100.into()),
        ],
        ordering: None,
    }],
    intervals: vec!["-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z".into()],
    context: context,
};
let result = tokio_test::block_on(druid_client.timeseries::<TimeAggr>(&timeseries));

TopN

查看Apache Druid TopN查询文档

#[derive(Serialize, Deserialize, Debug)]
struct WikiPage {
    page: String,
    user: Option<String>,
    count: usize,
}

let top_n = TopN {
    data_source: DataSource::table("wikipedia"),
    dimension: Dimension::default("page"),
    threshold: 10,
    metric: "count".into(),
    aggregations: vec![
        Aggregation::count("count"),
        Aggregation::StringFirst {
            name: "user".into(),
            field_name: "user".into(),
            max_string_bytes: 1024,
        },
    ],
    intervals: vec!["-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z".into()],
    granularity: Granularity::All,
    context: Default::default(),
};
let druid_client = DruidClient::new(vec!["localhost:8082".to_string()]);
let result = tokio_test::block_on(druid_client.top_n::<WikiPage>(&top_n));

分组

查看Apache Druid GroupBy查询文档


let group_by = GroupBy {
    data_source: DataSource::table("wikipedia"),
    dimensions: vec![Dimension::Default {
        dimension: "page".into(),
        output_name: "page".into(),
        output_type: OutputType::STRING,
    }],
    limit_spec: Some(LimitSpec {
        limit: 10,
        columns: vec![OrderByColumnSpec::new(
            "page",
            Ordering::Descending,
            SortingOrder::Alphanumeric,
        )],
    }),
    granularity: Granularity::All,
    filter: Some(Filter::selector("user", "Taffe316")),
    aggregations: vec![
        Aggregation::count("count"),
        Aggregation::StringFirst {
            name: "user".into(),
            field_name: "user".into(),
            max_string_bytes: 1024,
        },
    ],
    post_aggregations: vec![PostAggregation::Arithmetic {
        name: "count_fraction".into(),
        function: "/".into(),
        fields: vec![
            PostAggregator::field_access("count_percent", "count"),
            PostAggregator::constant("hundred", 100.into()),
        ],
        ordering: None,
    }],
    having: Some(HavingSpec::greater_than("count_fraction", 0.01.into())),
    intervals: vec!["-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z".into()],
    subtotal_spec: Default::default(),
    context: Default::default(),
};
let result = tokio_test::block_on(druid_client.group_by::<WikiPage>(&group_by));

扫描(带有内部连接)

查看Apache Druid TimeBoundary查询文档

让我们尝试一个更复杂的东西:内部连接

#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
struct ScanEvent {
    #[serde(rename(deserialize = "__time"))]
    time: usize,
    city_name: Option<String>,
    comment: Option<String>,
    namespace: Option<String>,
    page: Option<String>,
    region_iso_code: Option<String>,
    user: String,
    #[serde(rename(deserialize = "c.languages"))]
    languages: Option<String>,
}
let scan = Scan {
    data_source: DataSource::join(JoinType::Inner)
        .left(DataSource::table("wikipedia"))
        .right(
            DataSource::query(
                Scan {
                    data_source: DataSource::table("countries"),
                    batch_size: 10,
                    intervals: vec![
                        "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z"
                            .into(),
                    ],
                    result_format: ResultFormat::List,
                    columns: vec!["Name".into(), "languages".into()],
                    limit: None,
                    filter: None,
                    ordering: Some(Ordering::None),
                    context: std::collections::HashMap::new(),
                }
                .into(),
            ),
            "c.",
        )
        .condition("countryName == \"c.Name\"")
        .build()
        .unwrap(),
    batch_size: 10,
    intervals: vec!["-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z".into()],
    result_format: ResultFormat::List,
    columns: vec![],
    limit: Some(10),
    filter: None,
    ordering: Some(Ordering::None),
    context: Default::default(),
};

let result = tokio_test::block_on(druid_client.scan::<ScanEvent>(&scan));

依赖关系

~6–10MB
~211K SLoC