8个版本 (破坏性)
0.7.0 | 2024年2月21日 |
---|---|
0.6.0 | 2023年12月12日 |
0.5.0 | 2023年11月15日 |
0.4.1 | 2023年9月9日 |
0.1.0 | 2022年6月14日 |
#289 in 数据库实现
360KB
9K SLoC
OpenSrv - ClickHouse
模拟ClickHouse服务器的绑定。
用法
查看完整示例这里
struct Session {
last_progress_send: Instant,
metadata: ClickHouseMetadata,
}
#[async_trait::async_trait]
impl opensrv_clickhouse::ClickHouseSession for Session {
async fn execute_query(&self, ctx: &mut CHContext, connection: &mut Connection) -> Result<()> {
let query = ctx.state.query.clone();
tracing::debug!("Receive query {}", query);
let start = Instant::now();
// simple logic for insert
if query.starts_with("INSERT") || query.starts_with("insert") {
// ctx.state.out
let sample_block = Block::new().column("abc", Vec::<u32>::new());
let (sender, rec) = mpsc::channel(4);
ctx.state.out = Some(sender);
connection.write_block(&sample_block).await?;
let sent_all_data = ctx.state.sent_all_data.clone();
tokio::spawn(async move {
let mut rows = 0;
let mut stream = ReceiverStream::new(rec);
while let Some(block) = stream.next().await {
rows += block.row_count();
println!(
"got insert block: {:?}, total_rows: {}",
block.row_count(),
rows
);
}
sent_all_data.notify_one();
});
return Ok(());
}
let mut clickhouse_stream = SimpleBlockStream {
idx: 0,
start: 10,
end: 24,
blocks: 10,
};
while let Some(block) = clickhouse_stream.next().await {
let block = block?;
connection.write_block(&block).await?;
if self.last_progress_send.elapsed() >= Duration::from_millis(10) {
let progress = self.get_progress();
connection
.write_progress(progress, ctx.client_revision)
.await?;
}
}
let duration = start.elapsed();
tracing::debug!(
"ClickHouseHandler executor cost:{:?}, statistics:{:?}",
duration,
"xxx",
);
Ok(())
}
fn metadata(&self) -> &ClickHouseMetadata {
&self.metadata
}
fn get_progress(&self) -> Progress {
Progress {
rows: 100,
bytes: 1000,
total_rows: 1000,
}
}
}
获取帮助
致谢
此项目曾经是sundy-li/clickhouse-srv。
许可协议
依赖
~8–20MB
~294K SLoC