Skip to content

Commit

Permalink
Merge pull request #327 from ClickHouse/update-fixed-strings
Browse files Browse the repository at this point in the history
Adding String support and tweaking validation
  • Loading branch information
Paultagoras authored Feb 26, 2024
2 parents c362270 + 5b19586 commit 92bdb7a
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,11 @@ private boolean validateDataSchema(Table table, Record record, boolean onlyField
case "DateTime":
case "DateTime64":
case "UUID":
case "FIXED_STRING":
break;//I notice we just break here, rather than actually validate the type
default:
if (!colTypeName.equals(dataTypeName)) {
if (!((colTypeName.equals("STRING") || colTypeName.equalsIgnoreCase("FIXED_STRING")) && dataTypeName.equals("BYTES"))) {
if (!(colTypeName.equals("STRING") && dataTypeName.equals("BYTES"))) {
LOGGER.debug("Data schema name: {}", objSchema.name());
if (!("DECIMAL".equalsIgnoreCase(colTypeName) && objSchema.name().equals("org.apache.kafka.connect.data.Decimal"))) {
validSchema = false;
Expand Down Expand Up @@ -404,11 +405,10 @@ private void doWriteFixedString(Type columnType, ClickHousePipedOutputStream str
}

if (Objects.requireNonNull(columnType) == Type.FIXED_STRING) {
if (value instanceof byte[]) {
if (value instanceof String) {
BinaryStreamUtils.writeFixedString(stream, (String) value, length, StandardCharsets.UTF_8);
} else if (value instanceof byte[]) {
byte[] bytes = (byte[]) value;
if (bytes.length != length) {
throw new DataException(String.format("Fixed string length mismatch: expected %d, got %d", length, bytes.length));
}
BinaryStreamUtils.writeFixedString(stream, new String(bytes, StandardCharsets.UTF_8), length, StandardCharsets.UTF_8);
} else {
String msg = String.format("Not implemented conversion from %s to %s", value.getClass(), columnType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.shaded.org.apache.commons.lang3.RandomUtils;

Expand All @@ -19,6 +21,7 @@
import static org.junit.jupiter.api.Assertions.*;

public class ClickHouseSinkTaskWithSchemaTest {
private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseSinkTaskWithSchemaTest.class);

private static ClickHouseContainer db = null;
private static ClickHouseHelperClient chc = null;
Expand Down Expand Up @@ -420,9 +423,12 @@ public void schemaWithFixedStringTest() {

String topic = "fixed-string-value-table-test";
int fixedStringSize = RandomUtils.nextInt(1, 100);
LOGGER.info("FixedString size: " + fixedStringSize);
ClickHouseTestHelpers.dropTable(chc, topic);
ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, " +
"`fixed_string` FixedString("+fixedStringSize+") ) Engine = MergeTree ORDER BY off16");
"`fixed_string_string` FixedString("+fixedStringSize+"), " +
"`fixed_string_bytes` FixedString("+fixedStringSize+")" +
") Engine = MergeTree ORDER BY off16");

Collection<SinkRecord> sr = SchemaTestData.createFixedStringData(topic, 1, fixedStringSize);
ClickHouseSinkTask chst = new ClickHouseSinkTask();
Expand All @@ -439,18 +445,19 @@ public void schemaWithFixedStringMismatchTest() {
ClickHouseHelperClient chc = createClient(props);

String topic = "fixed-string-mismatch-table-test";
int fixedStringSize = RandomUtils.nextInt(1, 100);
int fixedStringSize = RandomUtils.nextInt(2, 100);
LOGGER.info("FixedString size: " + fixedStringSize);
ClickHouseTestHelpers.dropTable(chc, topic);
ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, " +
"`fixed_string` FixedString(" + (fixedStringSize + 1 ) + ") ) Engine = MergeTree ORDER BY off16");
"`fixed_string_string` FixedString(" + (fixedStringSize - 1 ) + ") ) Engine = MergeTree ORDER BY off16");

Collection<SinkRecord> sr = SchemaTestData.createFixedStringData(topic, 1, fixedStringSize);
ClickHouseSinkTask chst = new ClickHouseSinkTask();
chst.start(props);
try {
chst.put(sr);
} catch (RuntimeException e) {
assertInstanceOf(DataException.class, Utils.getRootCause(e), "Size mismatch for FixedString");
assertInstanceOf(IllegalArgumentException.class, Utils.getRootCause(e), "Could not detect size mismatch for FixedString");
}
chst.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -735,15 +735,17 @@ public static Collection<SinkRecord> createFixedStringData(String topic, int par

Schema NESTED_SCHEMA = SchemaBuilder.struct()
.field("off16", Schema.INT16_SCHEMA)
.field("fixed_string", Schema.BYTES_SCHEMA)
.field("fixed_string_string", Schema.STRING_SCHEMA)
.field("fixed_string_bytes", Schema.BYTES_SCHEMA)
.build();


List<SinkRecord> array = new ArrayList<>();
LongStream.range(0, totalRecords).forEachOrdered(n -> {
Struct value_struct = new Struct(NESTED_SCHEMA)
.put("off16", (short)n)
.put("fixed_string", RandomStringUtils.random(fixedSize, true, true).getBytes());
.put("fixed_string_string", RandomStringUtils.random(fixedSize, true, true))
.put("fixed_string_bytes", RandomStringUtils.random(fixedSize, true, true).getBytes());


SinkRecord sr = new SinkRecord(
Expand Down

0 comments on commit 92bdb7a

Please sign in to comment.