Skip to content

Commit

Permalink
Merge pull request #408 from ClickHouse/add-rowbinary-option-flag
Browse files Browse the repository at this point in the history
Adding a flag to allow bypassing RowBinary and also fixed a bug aroun…
  • Loading branch information
Paultagoras authored Jun 27, 2024
2 parents c9d2a86 + 162b054 commit 532e750
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 30 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
* Bugfix to address string encoding issue
* Bugfix to address issue with nested types and flatten_nested setting conflict
* Bugfix to avoid storing keeper state in same column name if virtual topic is enabled
* Added a flag to allow bypassing RowBinary and RowBinaryWithDefaults format for schema insertions
* Bugfix to remove erroneous error messages about complex type handling

## 1.1.0
* Updated java-client to 0.6.0-patch4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ public enum InsertFormats {
JSON
}

private boolean bypassRowBinary = false;

private InsertFormats insertFormat = InsertFormats.NONE;
public static class UTF8String implements ConfigDef.Validator {

Expand Down Expand Up @@ -242,6 +244,7 @@ public ClickHouseSinkConfig(Map<String, String> props) {
this.enableDbTopicSplit = Boolean.parseBoolean(props.getOrDefault(ENABLE_DB_TOPIC_SPLIT, "false"));
this.dbTopicSplitChar = props.getOrDefault(DB_TOPIC_SPLIT_CHAR, "");
this.keeperOnCluster = props.getOrDefault(KEEPER_ON_CLUSTER, "");
this.bypassRowBinary = Boolean.parseBoolean(props.getOrDefault("bypassRowBinary", "false"));

LOGGER.debug("ClickHouseSinkConfig: hostname: {}, port: {}, database: {}, username: {}, sslEnabled: {}, timeout: {}, retry: {}, exactlyOnce: {}",
hostname, port, database, username, sslEnabled, timeout, retry, exactlyOnce);
Expand Down Expand Up @@ -509,6 +512,15 @@ private static ConfigDef createConfigDef() {
ConfigDef.Width.SHORT,
"Keeper on cluster"
);
configDef.define("bypassRowBinary",
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.LOW,
"Bypass RowBinary format, sometimes needed with data gaps. default: false",
group,
++orderInGroup,
ConfigDef.Width.SHORT,
"Bypass RowBinary format.");
return configDef;
}
}
36 changes: 11 additions & 25 deletions src/main/java/com/clickhouse/kafka/connect/sink/data/Record.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.clickhouse.kafka.connect.sink.data.convert.SchemaRecordConvertor;
import com.clickhouse.kafka.connect.sink.data.convert.StringRecordConvertor;
import com.clickhouse.kafka.connect.sink.kafka.OffsetContainer;
import lombok.Getter;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
Expand All @@ -18,12 +19,18 @@
import java.util.Map;

public class Record {
@Getter
private OffsetContainer recordOffsetContainer = null;
private Object value;
@Getter
private Map<String, Data> jsonMap = null;
@Getter
private List<Field> fields = null;
@Getter
private SchemaType schemaType;
@Getter
private SinkRecord sinkRecord = null;
@Getter
private String database = null;

public Record(SchemaType schemaType, OffsetContainer recordOffsetContainer, List<Field> fields, Map<String, Data> jsonMap, String database, SinkRecord sinkRecord) {
Expand All @@ -39,35 +46,14 @@ public String getTopicAndPartition() {
return recordOffsetContainer.getTopicAndPartitionKey();
}

public OffsetContainer getRecordOffsetContainer() {
return recordOffsetContainer;
}

public Map<String, Data> getJsonMap() {
return jsonMap;
}

public List<Field> getFields() {
return fields;
}

public SinkRecord getSinkRecord() {
return sinkRecord;
}

public String getTopic() {
return recordOffsetContainer.getTopic();
}

public SchemaType getSchemaType() {
return this.schemaType;
}
public String getDatabase() { return this.database; }

private static RecordConvertor schemaRecordConvertor = new SchemaRecordConvertor();
private static RecordConvertor schemalessRecordConvertor = new SchemalessRecordConvertor();
private static RecordConvertor emptyRecordConvertor = new EmptyRecordConvertor();
private static RecordConvertor stringRecordConvertor = new StringRecordConvertor();
private static final RecordConvertor schemaRecordConvertor = new SchemaRecordConvertor();
private static final RecordConvertor schemalessRecordConvertor = new SchemalessRecordConvertor();
private static final RecordConvertor emptyRecordConvertor = new EmptyRecordConvertor();
private static final RecordConvertor stringRecordConvertor = new StringRecordConvertor();
private static RecordConvertor getConvertor(Schema schema, Object data) {
if (data == null ) {
return emptyRecordConvertor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,11 @@ public void doInsert(List<Record> records, QueryIdentifier queryId, ErrorReporte

switch (first.getSchemaType()) {
case SCHEMA:
doInsertRawBinary(records, table, queryId, table.hasDefaults());
if (csc.isBypassRowBinary()) {
doInsertJson(records, table, queryId);
} else {
doInsertRawBinary(records, table, queryId, table.hasDefaults());
}
break;
case SCHEMA_LESS:
doInsertJson(records, table, queryId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import com.clickhouse.kafka.connect.util.Utils;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.Getter;
import lombok.Setter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,11 @@ private void updateParent(Column parent, Column child) {
parent.getTupleFields().add(child);
return;
default:
LOGGER.error("Unsupported complex parent type: {}", parent.getType());
if (child.getName().endsWith(".null")) {
LOGGER.debug("Ignoring complex column: {}", child);
} else {
LOGGER.warn("Unsupported complex parent type: {}", parent.getType());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
package com.clickhouse.kafka.connect.sink.db.mapping;

import com.clickhouse.kafka.connect.sink.ClickHouseBase;
import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient;
import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers.newDescriptor;
import static org.junit.jupiter.api.Assertions.*;

class TableTest {
class TableTest extends ClickHouseBase {

@Test
public void extractMapOfPrimitives() {
Table table = new Table("t");

Column map = Column.extractColumn(newDescriptor("map", "Map(String, Decimal(5)"));
Column map = Column.extractColumn(newDescriptor("map", "Map(String, Decimal(5))"));
Column mapValues = Column.extractColumn(newDescriptor("map.values", "Array(Decimal(5))"));

assertEquals(Type.MAP, map.getType());
Expand All @@ -28,6 +32,22 @@ public void extractMapOfPrimitives() {
assertEquals(5, mapValueType.getPrecision());
}

@Test
public void extractNullables() {
Map<String, String> props = createProps();
ClickHouseHelperClient chc = createClient(props);

String tableName = createTopicName("extract-table-test");
ClickHouseTestHelpers.dropTable(chc, tableName);
ClickHouseTestHelpers.createTable(chc, tableName, "CREATE TABLE `%s` (`off16` Int16, date_number Nullable(Date)) Engine = MergeTree ORDER BY off16");

Table table = chc.describeTable(tableName);
assertNotNull(table);
assertEquals(table.getRootColumnsList().size(), 2);
assertEquals(table.getAllColumnsList().size(), 3);
ClickHouseTestHelpers.dropTable(chc, tableName);
}

@Test
public void extractMapWithComplexValue() {
Table table = new Table("t");
Expand Down

0 comments on commit 532e750

Please sign in to comment.