diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java index 40deff79..c1d2c2a2 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java @@ -841,7 +841,7 @@ protected void doInsertJsonV1(List records, Table table, QueryIdentifier break; } long beforeSerialize = System.currentTimeMillis(); - String gsonString = gson.toJson(data, gsonType); + String gsonString = gson.toJson(cleanupExtraFields(data, table), gsonType); dataSerializeTime += System.currentTimeMillis() - beforeSerialize; LOGGER.trace("topic {} partition {} offset {} payload {}", record.getTopic(), @@ -914,7 +914,7 @@ protected void doInsertJsonV2(List records, Table table, QueryIdentifier break; } long beforeSerialize = System.currentTimeMillis(); - String gsonString = gson.toJson(data, gsonType); + String gsonString = gson.toJson(cleanupExtraFields(data, table), gsonType); dataSerializeTime += System.currentTimeMillis() - beforeSerialize; LOGGER.trace("topic {} partition {} offset {} payload {}", record.getTopic(), @@ -935,6 +935,17 @@ protected void doInsertJsonV2(List records, Table table, QueryIdentifier s3 = System.currentTimeMillis(); LOGGER.info("batchSize: {} serialization ms: {} data ms: {} send ms: {} (QueryId: [{}])", records.size(), dataSerializeTime, s2 - s1, s3 - s2, queryId.getQueryId()); } + + protected Map cleanupExtraFields(Map m, Table t) { + Map cleaned = new HashMap<>(); + for (Column c : t.getRootColumnsList()) { + if (m.containsKey(c.getName())) { + cleaned.put(c.getName(), m.get(c.getName())); + } + } + return cleaned; + } + protected void doInsertString(List records, Table table, QueryIdentifier queryId) throws IOException, ExecutionException, InterruptedException { if(chc.isUseClientV2()) { doInsertStringV2(records, table, queryId);