1个不稳定版本
0.1.0 | 2020年9月7日 |
---|
#1881 在 数据库接口
61KB
1.5K SLoC
Apache Druid的异步Rust客户端
完全异步,支持未来的Rust编程语言Apache Druid客户端库。
该库提供了静态类型API,用于原生查询
安装
该库托管在crates.io。
[dependencies]
druid-io = "*"
支持的原生查询
- 时间序列
- TopN
- 分组
- 扫描
- 搜索
- 时间边界
- 段元数据
- 数据源元数据
用法
客户端
通过完全静态提供的代理列表连接到druid集群
let druid_client = DruidClient::new(vec!["localhost:8082".to_string()]);
通过Zookeeper连接到Druid集群 - 支持自动发现新代理和负载均衡
TODO:
查询
时间序列
查看时间序列查询文档
#[derive(Serialize, Deserialize, Debug)]
pub struct TimeAggr {
count: usize,
count_fraction: f32,
user: String,
}
let timeseries = Timeseries {
data_source: DataSource::table("wikipedia"),
limit: Some(10),
descending: false,
granularity: Granularity::All,
filter: Some(Filter::selector("user", "Taffe316")),
aggregations: vec![
Aggregation::count("count"),
Aggregation::StringFirst {
name: "user".into(),
field_name: "user".into(),
max_string_bytes: 1024,
},
],
post_aggregations: vec![PostAggregation::Arithmetic {
name: "count_fraction".into(),
function: "/".into(),
fields: vec![
PostAggregator::field_access("count_percent", "count"),
PostAggregator::constant("hundred", 100.into()),
],
ordering: None,
}],
intervals: vec!["-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z".into()],
context: context,
};
let result = tokio_test::block_on(druid_client.timeseries::<TimeAggr>(×eries));
TopN
#[derive(Serialize, Deserialize, Debug)]
struct WikiPage {
page: String,
user: Option<String>,
count: usize,
}
let top_n = TopN {
data_source: DataSource::table("wikipedia"),
dimension: Dimension::default("page"),
threshold: 10,
metric: "count".into(),
aggregations: vec![
Aggregation::count("count"),
Aggregation::StringFirst {
name: "user".into(),
field_name: "user".into(),
max_string_bytes: 1024,
},
],
intervals: vec!["-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z".into()],
granularity: Granularity::All,
context: Default::default(),
};
let druid_client = DruidClient::new(vec!["localhost:8082".to_string()]);
let result = tokio_test::block_on(druid_client.top_n::<WikiPage>(&top_n));
分组
let group_by = GroupBy {
data_source: DataSource::table("wikipedia"),
dimensions: vec![Dimension::Default {
dimension: "page".into(),
output_name: "page".into(),
output_type: OutputType::STRING,
}],
limit_spec: Some(LimitSpec {
limit: 10,
columns: vec![OrderByColumnSpec::new(
"page",
Ordering::Descending,
SortingOrder::Alphanumeric,
)],
}),
granularity: Granularity::All,
filter: Some(Filter::selector("user", "Taffe316")),
aggregations: vec![
Aggregation::count("count"),
Aggregation::StringFirst {
name: "user".into(),
field_name: "user".into(),
max_string_bytes: 1024,
},
],
post_aggregations: vec![PostAggregation::Arithmetic {
name: "count_fraction".into(),
function: "/".into(),
fields: vec![
PostAggregator::field_access("count_percent", "count"),
PostAggregator::constant("hundred", 100.into()),
],
ordering: None,
}],
having: Some(HavingSpec::greater_than("count_fraction", 0.01.into())),
intervals: vec!["-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z".into()],
subtotal_spec: Default::default(),
context: Default::default(),
};
let result = tokio_test::block_on(druid_client.group_by::<WikiPage>(&group_by));
扫描(带有内部连接)
查看Apache Druid TimeBoundary查询文档
让我们尝试一个更复杂的东西:内部连接
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
struct ScanEvent {
#[serde(rename(deserialize = "__time"))]
time: usize,
city_name: Option<String>,
comment: Option<String>,
namespace: Option<String>,
page: Option<String>,
region_iso_code: Option<String>,
user: String,
#[serde(rename(deserialize = "c.languages"))]
languages: Option<String>,
}
let scan = Scan {
data_source: DataSource::join(JoinType::Inner)
.left(DataSource::table("wikipedia"))
.right(
DataSource::query(
Scan {
data_source: DataSource::table("countries"),
batch_size: 10,
intervals: vec![
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z"
.into(),
],
result_format: ResultFormat::List,
columns: vec!["Name".into(), "languages".into()],
limit: None,
filter: None,
ordering: Some(Ordering::None),
context: std::collections::HashMap::new(),
}
.into(),
),
"c.",
)
.condition("countryName == \"c.Name\"")
.build()
.unwrap(),
batch_size: 10,
intervals: vec!["-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z".into()],
result_format: ResultFormat::List,
columns: vec![],
limit: Some(10),
filter: None,
ordering: Some(Ordering::None),
context: Default::default(),
};
let result = tokio_test::block_on(druid_client.scan::<ScanEvent>(&scan));
依赖关系
~6–10MB
~211K SLoC