Skip to content

Commit

Permalink
Update ClickHouseWriter.java
Browse files Browse the repository at this point in the history
  • Loading branch information
Paultagoras committed Nov 14, 2024
1 parent 0abbcde commit dca4ec7
Showing 1 changed file with 13 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ protected void doInsertJsonV1(List<Record> records, Table table, QueryIdentifier
break;
}
long beforeSerialize = System.currentTimeMillis();
String gsonString = gson.toJson(data, gsonType);
String gsonString = gson.toJson(cleanupExtraFields(data, table), gsonType);
dataSerializeTime += System.currentTimeMillis() - beforeSerialize;
LOGGER.trace("topic {} partition {} offset {} payload {}",
record.getTopic(),
Expand Down Expand Up @@ -914,7 +914,7 @@ protected void doInsertJsonV2(List<Record> records, Table table, QueryIdentifier
break;
}
long beforeSerialize = System.currentTimeMillis();
String gsonString = gson.toJson(data, gsonType);
String gsonString = gson.toJson(cleanupExtraFields(data, table), gsonType);
dataSerializeTime += System.currentTimeMillis() - beforeSerialize;
LOGGER.trace("topic {} partition {} offset {} payload {}",
record.getTopic(),
Expand All @@ -935,6 +935,17 @@ protected void doInsertJsonV2(List<Record> records, Table table, QueryIdentifier
s3 = System.currentTimeMillis();
LOGGER.info("batchSize: {} serialization ms: {} data ms: {} send ms: {} (QueryId: [{}])", records.size(), dataSerializeTime, s2 - s1, s3 - s2, queryId.getQueryId());
}

protected Map<String, Object> cleanupExtraFields(Map<String, Object> m, Table t) {
Map<String, Object> cleaned = new HashMap<>();
for (Column c : t.getRootColumnsList()) {
if (m.containsKey(c.getName())) {
cleaned.put(c.getName(), m.get(c.getName()));
}
}
return cleaned;
}

protected void doInsertString(List<Record> records, Table table, QueryIdentifier queryId) throws IOException, ExecutionException, InterruptedException {
if(chc.isUseClientV2()) {
doInsertStringV2(records, table, queryId);
Expand Down

0 comments on commit dca4ec7

Please sign in to comment.