From 5f892f5513fea14429b5b8abf3ffba929c842265 Mon Sep 17 00:00:00 2001
From: Paultagoras <paul.moore@clickhouse.com>
Date: Wed, 21 Feb 2024 18:15:59 -0500
Subject: [PATCH 1/2] Adding String support and tweaking validation

---
 .../kafka/connect/sink/db/ClickHouseWriter.java        | 10 +++++-----
 .../connect/sink/ClickHouseSinkTaskWithSchemaTest.java | 10 ++++++----
 .../kafka/connect/sink/helper/SchemaTestData.java      |  6 ++++--
 3 files changed, 15 insertions(+), 11 deletions(-)

diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java
index 99e6db1d..8c7499ec 100644
--- a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java
+++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java
@@ -201,10 +201,11 @@ private boolean validateDataSchema(Table table, Record record, boolean onlyField
                         case "DateTime":
                         case "DateTime64":
                         case "UUID":
+                        case "FIXED_STRING":
                             break;//I notice we just break here, rather than actually validate the type
                         default:
                             if (!colTypeName.equals(dataTypeName)) {
-                                if (!((colTypeName.equals("STRING") || colTypeName.equalsIgnoreCase("FIXED_STRING")) && dataTypeName.equals("BYTES"))) {
+                                if (!(colTypeName.equals("STRING") && dataTypeName.equals("BYTES"))) {
                                     LOGGER.debug("Data schema name: {}", objSchema.name());
                                     if (!("DECIMAL".equalsIgnoreCase(colTypeName) && objSchema.name().equals("org.apache.kafka.connect.data.Decimal"))) {
                                         validSchema = false;
@@ -404,11 +405,10 @@ private void doWriteFixedString(Type columnType, ClickHousePipedOutputStream str
         }
 
         if (Objects.requireNonNull(columnType) == Type.FIXED_STRING) {
-            if (value instanceof byte[]) {
+            if (value instanceof String) {
+                BinaryStreamUtils.writeFixedString(stream, (String) value, length, StandardCharsets.UTF_8);
+            } else if (value instanceof byte[]) {
                 byte[] bytes = (byte[]) value;
-                if (bytes.length != length) {
-                    throw new DataException(String.format("Fixed string length mismatch: expected %d, got %d", length, bytes.length));
-                }
                 BinaryStreamUtils.writeFixedString(stream, new String(bytes, StandardCharsets.UTF_8), length, StandardCharsets.UTF_8);
             } else {
                 String msg = String.format("Not implemented conversion from %s to %s", value.getClass(), columnType);
diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java
index 87628414..8a444bad 100644
--- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java
+++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java
@@ -422,7 +422,9 @@ public void schemaWithFixedStringTest() {
         int fixedStringSize = RandomUtils.nextInt(1, 100);
         ClickHouseTestHelpers.dropTable(chc, topic);
         ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, " +
-                "`fixed_string` FixedString("+fixedStringSize+") ) Engine = MergeTree ORDER BY off16");
+                "`fixed_string_string` FixedString("+fixedStringSize+"), " +
+                "`fixed_string_bytes` FixedString("+fixedStringSize+")" +
+                ") Engine = MergeTree ORDER BY off16");
 
         Collection<SinkRecord> sr = SchemaTestData.createFixedStringData(topic, 1, fixedStringSize);
         ClickHouseSinkTask chst = new ClickHouseSinkTask();
@@ -439,10 +441,10 @@ public void schemaWithFixedStringMismatchTest() {
         ClickHouseHelperClient chc = createClient(props);
 
         String topic = "fixed-string-mismatch-table-test";
-        int fixedStringSize = RandomUtils.nextInt(1, 100);
+        int fixedStringSize = RandomUtils.nextInt(2, 100);
         ClickHouseTestHelpers.dropTable(chc, topic);
         ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, " +
-                "`fixed_string` FixedString(" + (fixedStringSize + 1 ) + ") ) Engine = MergeTree ORDER BY off16");
+                "`fixed_string_string` FixedString(" + (fixedStringSize - 1 ) + ") ) Engine = MergeTree ORDER BY off16");
 
         Collection<SinkRecord> sr = SchemaTestData.createFixedStringData(topic, 1, fixedStringSize);
         ClickHouseSinkTask chst = new ClickHouseSinkTask();
@@ -450,7 +452,7 @@ public void schemaWithFixedStringMismatchTest() {
         try {
             chst.put(sr);
         } catch (RuntimeException e) {
-            assertInstanceOf(DataException.class, Utils.getRootCause(e), "Size mismatch for FixedString");
+            assertInstanceOf(IllegalArgumentException.class, Utils.getRootCause(e), "Could not detect size mismatch for FixedString");
         }
         chst.stop();
     }
diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java b/src/test/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java
index 3242dd64..a0c3661d 100644
--- a/src/test/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java
+++ b/src/test/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java
@@ -735,7 +735,8 @@ public static Collection<SinkRecord> createFixedStringData(String topic, int par
 
         Schema NESTED_SCHEMA = SchemaBuilder.struct()
                 .field("off16", Schema.INT16_SCHEMA)
-                .field("fixed_string", Schema.BYTES_SCHEMA)
+                .field("fixed_string_string", Schema.STRING_SCHEMA)
+                .field("fixed_string_bytes", Schema.BYTES_SCHEMA)
                 .build();
 
 
@@ -743,7 +744,8 @@ public static Collection<SinkRecord> createFixedStringData(String topic, int par
         LongStream.range(0, totalRecords).forEachOrdered(n -> {
             Struct value_struct = new Struct(NESTED_SCHEMA)
                     .put("off16", (short)n)
-                    .put("fixed_string", RandomStringUtils.random(fixedSize, true, true).getBytes());
+                    .put("fixed_string_string", RandomStringUtils.random(fixedSize, true, true))
+                    .put("fixed_string_bytes", RandomStringUtils.random(fixedSize, true, true).getBytes());
 
 
             SinkRecord sr = new SinkRecord(

From 5b195867472c45ed1953c6f3163a3dffe4fbb55f Mon Sep 17 00:00:00 2001
From: Paultagoras <paul.moore@clickhouse.com>
Date: Mon, 26 Feb 2024 11:19:32 -0500
Subject: [PATCH 2/2] Update ClickHouseSinkTaskWithSchemaTest.java

---
 .../kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java
index 8a444bad..ffeef298 100644
--- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java
+++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java
@@ -11,6 +11,8 @@
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testcontainers.clickhouse.ClickHouseContainer;
 import org.testcontainers.shaded.org.apache.commons.lang3.RandomUtils;
 
@@ -19,6 +21,7 @@
 import static org.junit.jupiter.api.Assertions.*;
 
 public class ClickHouseSinkTaskWithSchemaTest {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseSinkTaskWithSchemaTest.class);
 
     private static ClickHouseContainer db = null;
     private static ClickHouseHelperClient chc = null;
@@ -420,6 +423,7 @@ public void schemaWithFixedStringTest() {
 
         String topic = "fixed-string-value-table-test";
         int fixedStringSize = RandomUtils.nextInt(1, 100);
+        LOGGER.info("FixedString size: " + fixedStringSize);
         ClickHouseTestHelpers.dropTable(chc, topic);
         ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, " +
                 "`fixed_string_string` FixedString("+fixedStringSize+"), " +
@@ -442,6 +446,7 @@ public void schemaWithFixedStringMismatchTest() {
 
         String topic = "fixed-string-mismatch-table-test";
         int fixedStringSize = RandomUtils.nextInt(2, 100);
+        LOGGER.info("FixedString size: " + fixedStringSize);
         ClickHouseTestHelpers.dropTable(chc, topic);
         ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, " +
                 "`fixed_string_string` FixedString(" + (fixedStringSize - 1 ) + ") ) Engine = MergeTree ORDER BY off16");