Skip to content

Commit

Permalink
chore: add query_id
Browse files Browse the repository at this point in the history
  • Loading branch information
v3g42 committed Mar 14, 2024
1 parent e7975f6 commit 77202e8
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 16 deletions.
21 changes: 14 additions & 7 deletions dozer-sink-clickhouse/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,14 @@ impl ClickhouseClient {
datasource_name: &str,
fields: &[FieldDefinition],
table_options: Option<ClickhouseTableOptions>,
query_id: Option<String>,
) -> Result<(), QueryError> {
let mut client = self.pool.get_handle().await?;
let ddl = get_create_table_query(datasource_name, fields, table_options);
info!("Creating Clickhouse Table");
info!("{ddl}");
let query_id = query_id.unwrap_or("".to_string());
let ddl = Query::new(ddl).id(query_id);
client.execute(ddl).await?;
Ok(())
}
Expand All @@ -72,15 +75,19 @@ impl ClickhouseClient {
query_id: Option<String>,
) -> Result<SqlResult, QueryError> {
let mut client = self.pool.get_handle().await?;
// TODO: Include query_id in RowBinary protocol.
// https://github.com/suharev7/clickhouse-rs/issues/176
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Client/Connection.cpp
// https://www.propeldata.com/blog/how-to-check-your-clickhouse-version
/*
TODO: Include query_id in RowBinary protocol.
https://github.com/suharev7/clickhouse-rs/issues/176
https://github.com/ClickHouse/ClickHouse/blob/master/src/Client/Connection.cpp
https://www.propeldata.com/blog/how-to-check-your-clickhouse-version
*/

let query = Query::new(&query).id(query_id.map_or("".to_string(), |q| q.to_string()));
// let query = query_id.map_or(query.to_string(), |id| {
// format!("{0} settings log_comment = '{1}'", query, id)
// });
/*
let query = query_id.map_or(query.to_string(), |id| {
format!("{0} settings log_comment = '{1}'", query, id)
});
*/

let block = client.query(query).fetch_all().await?;

Expand Down
2 changes: 2 additions & 0 deletions dozer-sink-clickhouse/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ impl ClickhouseSinkFactory {
&repl_metadata.table_name,
&repl_metadata.schema.fields,
Some(create_table_options),
None,
)
.await?;

Expand Down Expand Up @@ -105,6 +106,7 @@ impl SinkFactory for ClickhouseSinkFactory {
&config.sink_table_name,
&schema.fields,
self.config.create_table_options.clone(),
None,
)
.await?;
}
Expand Down
10 changes: 1 addition & 9 deletions dozer-sink-clickhouse/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ use clickhouse_rs::{Block, Pool};
use std::error::Error;

#[tokio::test]
#[ignore]
async fn clickhouse_test() -> Result<(), Box<dyn Error>> {
let uuid = "248c40d9-d1eb-47c4-8801-943dbab34df9";
let database_url = "tcp://default@localhost:9000/query_test";
Expand All @@ -97,14 +98,5 @@ async fn clickhouse_test() -> Result<(), Box<dyn Error>> {

let table = Query::new("payment").id(uuid);
client.insert(table, block).await?;

// let block = client.query("SELECT * FROM payment").fetch_all().await?;

// for row in block.rows() {
// let id: u32 = row.get("customer_id")?;
// let amount: u32 = row.get("amount")?;
// let name: Option<&str> = row.get("account_name")?;
// println!("Found payment {}: {} {:?}", id, amount, name);
// }
Ok(())
}

0 comments on commit 77202e8

Please sign in to comment.