diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ae2e328..6760331e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ +# 1.2.0 +* Adding a KeyToValue transformation to allow for key to be stored in a separate column in ClickHouse + # 1.1.4 * Bugfix to address field value to column name mapping for Tuples -* Adding a KeyToValue transformation to allow for key to be stored in a separate column in ClickHouse # 1.1.3 * Update to java-client 0.6.3 diff --git a/VERSION b/VERSION index c6412202..79127d85 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -v1.1.4 +v1.2.0 diff --git a/build.gradle.kts b/build.gradle.kts index fed24033..c2df048c 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -7,8 +7,6 @@ */ import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar -import java.io.ByteArrayOutputStream -import java.net.URI import java.time.LocalDateTime import java.time.format.DateTimeFormatter import org.gradle.api.tasks.testing.logging.TestExceptionFormat @@ -34,7 +32,6 @@ plugins { signing // checkstyle id("com.github.gmazzo.buildconfig") version "5.3.5" - //id("com.github.spotbugs") version "4.7.9" id("com.diffplug.spotless") version "6.25.0" id("com.github.johnrengelman.shadow") version "7.1.2" } @@ -50,7 +47,6 @@ repositories { } extra.apply { - set("clickHouseDriverVersion", "0.6.3") set("kafkaVersion", "2.7.0") set("avroVersion", "1.9.2") diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java b/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java index 2c82ba31..2362d057 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java @@ -262,7 +262,7 @@ public ClickHouseSinkConfig(Map props) { } } } - this.clientVersion = props.getOrDefault(CLIENT_VERSION, "V2"); + this.clientVersion = props.getOrDefault(CLIENT_VERSION, "V1"); LOGGER.debug("ClickHouseSinkConfig: hostname: {}, port: {}, database: {}, username: {}, sslEnabled: {}, timeout: {}, retry: {}, exactlyOnce: {}", hostname, port, database, username, sslEnabled, timeout, retry, exactlyOnce); diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseBase.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseBase.java index cb936fc0..970b3490 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseBase.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseBase.java @@ -91,20 +91,19 @@ protected ClickHouseHelperClient createClient(Map props, boolean .setRetry(csc.getRetry()) .build(); - if (withDatabase) { - createDatabase(this.database, tmpChc); - props.put(ClickHouseSinkConnector.DATABASE, this.database); - ClickHouseHelperClient chc = new ClickHouseHelperClient.ClickHouseClientBuilder(hostname, port, csc.getProxyType(), csc.getProxyHost(), csc.getProxyPort()) - .setDatabase(this.database) + createDatabase(ClickHouseBase.database, tmpChc); + props.put(ClickHouseSinkConnector.DATABASE, ClickHouseBase.database); + tmpChc = new ClickHouseHelperClient.ClickHouseClientBuilder(hostname, port, csc.getProxyType(), csc.getProxyHost(), csc.getProxyPort()) + .setDatabase(ClickHouseBase.database) .setUsername(username) .setPassword(password) .sslEnable(sslEnabled) .setTimeout(timeout) .setRetry(csc.getRetry()) .build(); - return chc; } + chc = tmpChc; return chc; } diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskSchemalessProxyTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskSchemalessProxyTest.java index 139b3b2c..5c7b6d20 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskSchemalessProxyTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskSchemalessProxyTest.java @@ -8,7 +8,6 @@ import com.clickhouse.kafka.connect.sink.helper.SchemalessTestData; import eu.rekawek.toxiproxy.Proxy; import eu.rekawek.toxiproxy.ToxiproxyClient; -import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.jupiter.api.*; import org.testcontainers.clickhouse.ClickHouseContainer; @@ -16,9 +15,7 @@ import org.testcontainers.containers.ToxiproxyContainer; import java.io.IOException; -import java.math.BigDecimal; import java.util.*; -import java.util.stream.LongStream; import static org.junit.jupiter.api.Assertions.*; diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaProxyTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaProxyTest.java index 03042299..199b6965 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaProxyTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaProxyTest.java @@ -62,7 +62,7 @@ private Map getTestProperties() { @Test public void proxyPingTest() throws IOException { - ClickHouseHelperClient chc = createClient(getTestProperties(), false); + ClickHouseHelperClient chc = createClient(getTestProperties()); assertTrue(chc.ping()); proxy.disable(); assertFalse(chc.ping()); @@ -72,7 +72,7 @@ public void proxyPingTest() throws IOException { @Test public void arrayTypesTest() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props, false); + ClickHouseHelperClient chc = createClient(props); String topic = "array_string_table_test"; ClickHouseTestHelpers.dropTable(chc, topic); @@ -92,7 +92,7 @@ public void arrayTypesTest() { @Test public void mapTypesTest() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props, false); + ClickHouseHelperClient chc = createClient(props); String topic = "map_table_test"; ClickHouseTestHelpers.dropTable(chc, topic); @@ -114,7 +114,7 @@ public void mapTypesTest() { // https://github.com/ClickHouse/clickhouse-kafka-connect/issues/33 public void materializedViewsBug() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props, false); + ClickHouseHelperClient chc = createClient(props); String topic = "m_array_string_table_test"; ClickHouseTestHelpers.dropTable(chc, topic); @@ -135,7 +135,7 @@ public void materializedViewsBug() { // https://github.com/ClickHouse/clickhouse-kafka-connect/issues/38 public void specialCharTableNameTest() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props, false); + ClickHouseHelperClient chc = createClient(props); String topic = "special-char-table-test"; ClickHouseTestHelpers.dropTable(chc, topic); @@ -157,7 +157,7 @@ public void specialCharTableNameTest() { // https://github.com/ClickHouse/clickhouse-kafka-connect/issues/62 public void nullValueDataTest() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props, false); + ClickHouseHelperClient chc = createClient(props); String topic = "null-value-table-test"; ClickHouseTestHelpers.dropTable(chc, topic); @@ -178,7 +178,7 @@ public void nullValueDataTest() { // https://github.com/ClickHouse/clickhouse-kafka-connect/issues/57 public void supportDatesTest() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props, false); + ClickHouseHelperClient chc = createClient(props); String topic = "support-dates-table-test"; ClickHouseTestHelpers.dropTable(chc, topic); @@ -197,7 +197,7 @@ public void supportDatesTest() { @Test public void detectUnsupportedDataConversions() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props, false); + ClickHouseHelperClient chc = createClient(props); String topic = "support-unsupported-dates-table-test"; ClickHouseTestHelpers.dropTable(chc, topic); @@ -218,7 +218,7 @@ public void detectUnsupportedDataConversions() { @Test public void withEmptyDataRecordsTest() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props, false); + ClickHouseHelperClient chc = createClient(props); String topic = "schema_empty_records_table_test"; ClickHouseTestHelpers.dropTable(chc, topic); @@ -235,7 +235,7 @@ public void withEmptyDataRecordsTest() { @Test public void withLowCardinalityTest() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props, false); + ClickHouseHelperClient chc = createClient(props, true); String topic = "schema_empty_records_lc_table_test"; ClickHouseTestHelpers.dropTable(chc, topic); @@ -252,7 +252,7 @@ public void withLowCardinalityTest() { @Test public void withUUIDTest() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props, false); + ClickHouseHelperClient chc = createClient(props, true); String topic = "schema_empty_records_lc_table_test"; ClickHouseTestHelpers.dropTable(chc, topic); @@ -269,7 +269,7 @@ public void withUUIDTest() { @Test public void schemaWithDefaultsTest() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props, false); + ClickHouseHelperClient chc = createClient(props, true); String topic = "default-value-table-test"; ClickHouseTestHelpers.dropTable(chc, topic); @@ -288,7 +288,7 @@ public void schemaWithDefaultsTest() { @Test public void schemaWithDecimalTest() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props, false); + ClickHouseHelperClient chc = createClient(props, true); String topic = "decimal-value-table-test"; ClickHouseTestHelpers.dropTable(chc, topic); @@ -307,7 +307,7 @@ public void schemaWithDecimalTest() { @Test public void schemaWithBytesTest() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props, false); + ClickHouseHelperClient chc = createClient(props, true); String topic = "bytes-value-table-test"; ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` (`string` String) Engine = MergeTree ORDER BY `string`"); diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java index 48f454b4..55384ba8 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java @@ -134,7 +134,8 @@ public void materializedViewsBug() { ClickHouseTestHelpers.dropTable(chc, topic + "mate"); ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `arr` Array(String), `arr_empty` Array(String), `arr_int8` Array(Int8), `arr_int16` Array(Int16), `arr_int32` Array(Int32), `arr_int64` Array(Int64), `arr_float32` Array(Float32), `arr_float64` Array(Float64), `arr_bool` Array(Bool) ) Engine = MergeTree ORDER BY off16"); - ClickHouseTestHelpers.createTable(chc, topic + "mate", "CREATE MATERIALIZED VIEW %s ( `off16` Int16 ) Engine = MergeTree ORDER BY `off16` POPULATE AS SELECT off16 FROM " + topic); + ClickHouseTestHelpers.createTable(chc, topic, "CREATE MATERIALIZED VIEW %s_mv TO " + topic + "_mate AS SELECT off16 FROM " + topic); + ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s_mate ( `off16` Int16 ) Engine = Null"); Collection sr = SchemaTestData.createArrayType(topic, 1); ClickHouseSinkTask chst = new ClickHouseSinkTask(); diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java b/src/test/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java index 9ffa415a..5209624d 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java @@ -5,10 +5,7 @@ import com.clickhouse.client.api.query.QueryResponse; import com.clickhouse.client.api.query.QuerySettings; import com.clickhouse.client.api.query.Records; -import com.clickhouse.data.ClickHouseColumn; import com.clickhouse.data.ClickHouseFormat; -import com.clickhouse.data.ClickHouseRecord; -import com.clickhouse.data.ClickHouseValue; import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseFieldDescriptor; import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient; import com.clickhouse.kafka.connect.sink.db.mapping.Column; @@ -26,12 +23,9 @@ import java.io.InputStreamReader; import java.io.Serializable; import java.util.*; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; public class ClickHouseTestHelpers { private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseTestHelpers.class); @@ -74,6 +68,7 @@ public static void dropTable(ClickHouseHelperClient chc, String tableName) { } } public static OperationMetrics createTable(ClickHouseHelperClient chc, String tableName, String createTableQuery) { + LOGGER.info("Creating table: {}, Query: {}", tableName, createTableQuery); OperationMetrics operationMetrics = createTable(chc, tableName, createTableQuery, new HashMap<>()); if (isCloud()) { try { @@ -92,7 +87,7 @@ public static OperationMetrics createTable(ClickHouseHelperClient chc, String ta settings.setOption(entry.getKey(), entry.getValue()); } try { - Records records = chc.getClient().queryRecords(createTableQueryTmp, settings).get(10, java.util.concurrent.TimeUnit.SECONDS); + Records records = chc.getClient().queryRecords(createTableQueryTmp, settings).get(120, java.util.concurrent.TimeUnit.SECONDS); return records.getMetrics(); } catch (Exception e) { throw new RuntimeException(e); @@ -141,7 +136,7 @@ public static int countRows(ClickHouseHelperClient chc, String tableName) { String queryCount = String.format("SELECT COUNT(*) FROM `%s`", tableName); try { - Records records = chc.getClient().queryRecords(queryCount).get(10, TimeUnit.SECONDS); + Records records = chc.getClient().queryRecords(queryCount).get(120, TimeUnit.SECONDS); // Note we probrbly need asInteger() here String value = records.iterator().next().getString(1); return Integer.parseInt(value);