Skip to content

Commit

Permalink
Merge pull request #469 from ClickHouse/trim-excess-fields
Browse files Browse the repository at this point in the history
Trim excess fields
  • Loading branch information
Paultagoras authored Nov 14, 2024
2 parents ab75a94 + 3631b8d commit 556c2bb
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# 1.2.5
* Remove unused avro property from `build.gradle.kts`
* Trim schemaless data to only pass the fields that are in the table

# 1.2.4
* Adjusting underlying client version to 0.7.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ protected void doInsertJsonV1(List<Record> records, Table table, QueryIdentifier
break;
}
long beforeSerialize = System.currentTimeMillis();
String gsonString = gson.toJson(data, gsonType);
String gsonString = gson.toJson(cleanupExtraFields(data, table), gsonType);
dataSerializeTime += System.currentTimeMillis() - beforeSerialize;
LOGGER.trace("topic {} partition {} offset {} payload {}",
record.getTopic(),
Expand Down Expand Up @@ -914,7 +914,7 @@ protected void doInsertJsonV2(List<Record> records, Table table, QueryIdentifier
break;
}
long beforeSerialize = System.currentTimeMillis();
String gsonString = gson.toJson(data, gsonType);
String gsonString = gson.toJson(cleanupExtraFields(data, table), gsonType);
dataSerializeTime += System.currentTimeMillis() - beforeSerialize;
LOGGER.trace("topic {} partition {} offset {} payload {}",
record.getTopic(),
Expand All @@ -935,6 +935,17 @@ protected void doInsertJsonV2(List<Record> records, Table table, QueryIdentifier
s3 = System.currentTimeMillis();
LOGGER.info("batchSize: {} serialization ms: {} data ms: {} send ms: {} (QueryId: [{}])", records.size(), dataSerializeTime, s2 - s1, s3 - s2, queryId.getQueryId());
}

protected Map<String, Object> cleanupExtraFields(Map<String, Object> m, Table t) {
Map<String, Object> cleaned = new HashMap<>();
for (Column c : t.getRootColumnsList()) {
if (m.containsKey(c.getName())) {
cleaned.put(c.getName(), m.get(c.getName()));
}
}
return cleaned;
}

protected void doInsertString(List<Record> records, Table table, QueryIdentifier queryId) throws IOException, ExecutionException, InterruptedException {
if(chc.isUseClientV2()) {
doInsertStringV2(records, table, queryId);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
package com.clickhouse.kafka.connect.sink;

import com.clickhouse.kafka.connect.ClickHouseSinkConnector;
import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient;
import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers;
import com.clickhouse.kafka.connect.sink.helper.SchemalessTestData;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.testcontainers.clickhouse.ClickHouseContainer;

import java.util.*;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class ClickHouseSinkTaskSchemalessTest extends ClickHouseBase {
Expand All @@ -36,6 +34,24 @@ public void primitiveTypesTest() {
assertTrue(ClickHouseTestHelpers.validateRows(chc, topic, sr));
}

@Test
public void primitiveTypesSubsetTest() {
Map<String, String> props = createProps();;
ClickHouseHelperClient chc = createClient(props);
// `arr_int8` Array(Int8), `arr_int16` Array(Int16), `arr_int32` Array(Int32), `arr_int64` Array(Int64), `arr_float32` Array(Float32), `arr_float64` Array(Float64), `arr_bool` Array(Bool)
String topic = createTopicName("schemaless_primitive_types_table_test");
ClickHouseTestHelpers.dropTable(chc, topic);
ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `str` String, `p_int8` Int8) Engine = MergeTree ORDER BY off16");
Collection<SinkRecord> sr = SchemalessTestData.createPrimitiveTypes(topic, 1);

ClickHouseSinkTask chst = new ClickHouseSinkTask();
chst.start(props);
chst.put(sr);
chst.stop();
assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic));
assertFalse(ClickHouseTestHelpers.validateRows(chc, topic, sr));
}

@Test
public void withEmptyDataRecordsTest() {
Map<String, String> props = createProps();
Expand Down

0 comments on commit 556c2bb

Please sign in to comment.