From 80509bc0133570f8e1ad984f095f4e466832a8fe Mon Sep 17 00:00:00 2001
From: mzitnik <mark@clickhouse.com>
Date: Mon, 25 Nov 2024 18:22:46 +0200
Subject: [PATCH] Memory reduction.

---
 .../kafka/connect/sink/ProxySinkTask.java        |  4 ++++
 .../kafka/connect/sink/data/Record.java          |  3 +++
 .../sink/data/convert/SchemaRecordConvertor.java |  2 +-
 .../kafka/connect/sink/db/ClickHouseWriter.java  | 12 ++++++++++++
 .../clickhouse/kafka/connect/util/Memory.java    | 16 ++++++++++++++++
 .../sink/ClickHouseSinkTaskWithSchemaTest.java   |  4 +++-
 6 files changed, 39 insertions(+), 2 deletions(-)
 create mode 100644 src/main/java/com/clickhouse/kafka/connect/util/Memory.java

diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/ProxySinkTask.java b/src/main/java/com/clickhouse/kafka/connect/sink/ProxySinkTask.java
index fb94e764..4f81e440 100644
--- a/src/main/java/com/clickhouse/kafka/connect/sink/ProxySinkTask.java
+++ b/src/main/java/com/clickhouse/kafka/connect/sink/ProxySinkTask.java
@@ -9,6 +9,7 @@
 import com.clickhouse.kafka.connect.sink.state.StateProvider;
 import com.clickhouse.kafka.connect.sink.state.provider.InMemoryState;
 import com.clickhouse.kafka.connect.sink.state.provider.KeeperStateProvider;
+import com.clickhouse.kafka.connect.util.Memory;
 import com.clickhouse.kafka.connect.util.jmx.ExecutionTimer;
 import com.clickhouse.kafka.connect.util.jmx.MBeanServerUtils;
 import com.clickhouse.kafka.connect.util.jmx.SinkTaskStatistics;
@@ -85,6 +86,7 @@ public void put(final Collection<SinkRecord> records) throws IOException, Execut
         LOGGER.trace(String.format("Got %d records from put API.", records.size()));
         ExecutionTimer processingTime = ExecutionTimer.start();
 
+        LOGGER.debug(String.format("Memory: before conversion: %s", Memory.get()));
         Map<String, List<Record>> dataRecords = records.stream()
                 .map(v -> Record.convert(v,
                         clickHouseSinkConfig.isEnableDbTopicSplit(),
@@ -92,12 +94,14 @@ public void put(final Collection<SinkRecord> records) throws IOException, Execut
                         clickHouseSinkConfig.getDatabase() ))
                 .collect(Collectors.groupingBy(Record::getTopicAndPartition));
         statistics.recordProcessingTime(processingTime);
+        LOGGER.debug(String.format("Memory: before processing: %s", Memory.get()));
         // TODO - Multi process???
         for (String topicAndPartition : dataRecords.keySet()) {
             // Running on etch topic & partition
             List<Record> rec = dataRecords.get(topicAndPartition);
             processing.doLogic(rec);
         }
+        LOGGER.debug(String.format("Memory: after processing: %s", Memory.get()));
         statistics.taskProcessingTime(taskTime);
     }
 }
diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/data/Record.java b/src/main/java/com/clickhouse/kafka/connect/sink/data/Record.java
index 231a7652..0a121059 100644
--- a/src/main/java/com/clickhouse/kafka/connect/sink/data/Record.java
+++ b/src/main/java/com/clickhouse/kafka/connect/sink/data/Record.java
@@ -40,6 +40,9 @@ public Record(SchemaType schemaType, OffsetContainer recordOffsetContainer, List
         this.database = database;
     }
 
+    public void setJsonMap(Map<String, Data> jsonMap) {
+        this.jsonMap = jsonMap;
+    }
     public String getTopicAndPartition() {
         return recordOffsetContainer.getTopicAndPartitionKey();
     }
diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/data/convert/SchemaRecordConvertor.java b/src/main/java/com/clickhouse/kafka/connect/sink/data/convert/SchemaRecordConvertor.java
index 106d8960..c1680454 100644
--- a/src/main/java/com/clickhouse/kafka/connect/sink/data/convert/SchemaRecordConvertor.java
+++ b/src/main/java/com/clickhouse/kafka/connect/sink/data/convert/SchemaRecordConvertor.java
@@ -18,7 +18,7 @@ public Record doConvert(SinkRecord sinkRecord, String topic,String configuration
         int partition = sinkRecord.kafkaPartition().intValue();
         long offset = sinkRecord.kafkaOffset();
         Struct struct = (Struct) sinkRecord.value();
-        Map<String, Data> data = StructToJsonMap.toJsonMap((Struct) sinkRecord.value());
+        Map<String, Data> data = null; //StructToJsonMap.toJsonMap((Struct) sinkRecord.value());
         return new Record(SchemaType.SCHEMA, new OffsetContainer(topic, partition, offset), struct.schema().fields(), data, database, sinkRecord);
     }
 }
diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java
index aa42ad6f..e7428177 100644
--- a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java
+++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java
@@ -699,6 +699,8 @@ protected void doInsertRawBinaryV2(List<Record> records, Table table, QueryIdent
         Record first = records.get(0);
         String database = first.getDatabase();
 
+        Map<String, Data> dataRecord = StructToJsonMap.toJsonMap((Struct) first.getSinkRecord().value());
+        first.setJsonMap(dataRecord);
         if (!csc.isBypassSchemaValidation() && !validateDataSchema(table, first, false))
             throw new RuntimeException("Data schema validation failed.");
         // Let's test first record
@@ -722,12 +724,15 @@ protected void doInsertRawBinaryV2(List<Record> records, Table table, QueryIdent
         ByteArrayOutputStream stream = new ByteArrayOutputStream();
         for (Record record : records) {
             if (record.getSinkRecord().value() != null) {
+                Map<String, Data> dataRecordTmp = StructToJsonMap.toJsonMap((Struct) record.getSinkRecord().value());
+                record.setJsonMap(dataRecordTmp);
                 for (Column col : table.getRootColumnsList()) {
                     LOGGER.debug("Writing column: {}", col.getName());
                     long beforePushStream = System.currentTimeMillis();
                     doWriteCol(record, col, stream, supportDefaults);
                     pushStreamTime += System.currentTimeMillis() - beforePushStream;
                 }
+                record.setJsonMap(null);
             }
         }
 
@@ -753,6 +758,9 @@ protected void doInsertRawBinaryV1(List<Record> records, Table table, QueryIdent
         Record first = records.get(0);
         String database = first.getDatabase();
 
+        Map<String, Data> dataRecord = StructToJsonMap.toJsonMap((Struct) first.getSinkRecord().value());
+        first.setJsonMap(dataRecord);
+
         if (!csc.isBypassSchemaValidation() && !validateDataSchema(table, first, false))
             throw new RuntimeException("Data schema validation failed.");
         // Let's test first record
@@ -776,11 +784,15 @@ protected void doInsertRawBinaryV1(List<Record> records, Table table, QueryIdent
                 // write bytes into the piped stream
                 for (Record record : records) {
                     if (record.getSinkRecord().value() != null) {
+                        Map<String, Data> dataRecordTmp = StructToJsonMap.toJsonMap((Struct) record.getSinkRecord().value());
+                        record.setJsonMap(dataRecordTmp);
                         for (Column col : table.getRootColumnsList()) {
                             long beforePushStream = System.currentTimeMillis();
                             doWriteCol(record, col, stream, supportDefaults);
                             pushStreamTime += System.currentTimeMillis() - beforePushStream;
                         }
+                        record.setJsonMap(null);
+
                     }
                 }
                 // We need to close the stream before getting a response
diff --git a/src/main/java/com/clickhouse/kafka/connect/util/Memory.java b/src/main/java/com/clickhouse/kafka/connect/util/Memory.java
new file mode 100644
index 00000000..9ce6f2ef
--- /dev/null
+++ b/src/main/java/com/clickhouse/kafka/connect/util/Memory.java
@@ -0,0 +1,16 @@
+package com.clickhouse.kafka.connect.util;
+
+public class Memory {
+    long free = Runtime.getRuntime().freeMemory();
+    long total = Runtime.getRuntime().totalMemory();
+    long max = Runtime.getRuntime().maxMemory();
+    long used = total - free;
+
+    public static Memory get() {
+        return new Memory();
+    }
+
+    public String toString() {
+        return String.format("Memory: free:[%s] total:[%s] max:[%s] used:[%s]", free, total, max, used);
+    }
+}
diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java
index 8dd8ca01..13a454d1 100644
--- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java
+++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java
@@ -5,6 +5,7 @@
 import com.clickhouse.kafka.connect.sink.helper.SchemaTestData;
 import com.clickhouse.kafka.connect.sink.junit.extension.FromVersionConditionExtension;
 import com.clickhouse.kafka.connect.sink.junit.extension.SinceClickHouseVersion;
+import com.clickhouse.kafka.connect.util.Memory;
 import com.clickhouse.kafka.connect.util.Utils;
 import org.apache.kafka.connect.errors.DataException;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -31,6 +32,7 @@ public class ClickHouseSinkTaskWithSchemaTest extends ClickHouseBase {
     private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseSinkTaskWithSchemaTest.class);
     @Test
     public void arrayTypesTest() {
+        LOGGER.debug(String.format("Memory: before test start: %s", Memory.get()));
         Map<String, String> props = createProps();
         ClickHouseHelperClient chc = createClient(props);
 
@@ -42,12 +44,12 @@ public void arrayTypesTest() {
                 "`arr_map` Array(Map(String, String))  ) Engine = MergeTree ORDER BY off16");
         // https://github.com/apache/kafka/blob/trunk/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java#L95-L98
         Collection<SinkRecord> sr = SchemaTestData.createArrayType(topic, 1);
+        LOGGER.debug(String.format("Memory: before test start: %s", Memory.get()));
 
         ClickHouseSinkTask chst = new ClickHouseSinkTask();
         chst.start(props);
         chst.put(sr);
         chst.stop();
-
         assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic));
         assertTrue(ClickHouseTestHelpers.validateRows(chc, topic, sr));
     }