Skip to content

Commit

Permalink
Support msk (#46)
Browse files Browse the repository at this point in the history
* refactor escapeTopicName to a Utils class

* change confluent version to 6.1.0 in integration tests

* adding gson to package

* remove unused code
mzitnik authored Jan 31, 2023
1 parent b4e546a commit e7a8f71
Showing 7 changed files with 71 additions and 70 deletions.
4 changes: 2 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
@@ -38,7 +38,7 @@ plugins {
}

group = "com.clickhouse.kafka"
version = "0.0.4"
version = "0.0.5"
description = "The official ClickHouse Apache Kafka Connect Connector."

repositories {
@@ -84,7 +84,7 @@ dependencies {
clickhouseDependencies("io.lettuce:lettuce-core:6.2.0.RELEASE")
clickhouseDependencies("com.clickhouse:clickhouse-client:${project.extra["clickHouseDriverVersion"]}")
clickhouseDependencies("com.clickhouse:clickhouse-http-client:${project.extra["clickHouseDriverVersion"]}")

clickhouseDependencies("com.google.code.gson:gson:2.10")

// Unit Tests
testImplementation(platform("org.junit:junit-bom:${project.extra["junitJupiterVersion"]}"))
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@
import org.junit.Ignore;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.ClickHouseContainer;
import org.testcontainers.containers.ConfluentPlatform;
@@ -178,123 +179,121 @@ private void sleep(long l) {
}
}


@Test
@Description("stockGenSingleTaskSchemalessTest")
public void stockGenSingleTaskSchemalessTest() throws IOException {
@Order(1)
@Description("stockGenSingleTask")
public void stockGenSingleTaskTest() throws IOException {
dropStateTable();
// Create KeeperMap table
//createStateTable();

String topicName = "stock_gen_topic_single_schemaless_task";
String flatTableName = String.format("%s_flat", topicName);
String topicName = "stock_gen_topic_single_task";
int parCount = 1;
String payloadDataGen = String.join("", Files.readAllLines(Paths.get("src/integrationTest/resources/stock_gen.json")));

confluentPlatform.createTopic(topicName, 1);
confluentPlatform.createConnect(String.format(payloadDataGen, "DatagenConnectorConnector_Single_Schemaless", "DatagenConnectorConnector_Single_Schemaless", parCount, topicName));
sleep(5 * 1000);
String ksqlCreateStreamPayload = String.format("{\"ksql\": \"CREATE STREAM tmp_%s (side STRING, symbol STRING, userid STRING) WITH (KAFKA_TOPIC='%s', VALUE_FORMAT = 'AVRO');\"}", topicName, topicName);
System.out.println(ksqlCreateStreamPayload);
confluentPlatform.runKsql(ksqlCreateStreamPayload);
sleep(5 * 1000);
String ksqlCreateStreamJSONPayload = String.format("{\"ksql\": \"CREATE STREAM %s_flat WITH (KAFKA_TOPIC='%s_flat', VALUE_FORMAT = 'JSON') AS SELECT side, symbol FROM tmp_%s EMIT CHANGES;\"}", topicName, topicName, topicName);
System.out.println(ksqlCreateStreamJSONPayload);
confluentPlatform.runKsql(ksqlCreateStreamJSONPayload);
sleep(5 * 1000);

confluentPlatform.createConnect(String.format(payloadDataGen, "DatagenConnectorConnector_Single", "DatagenConnectorConnector_Single", parCount, topicName));

// Now let's create the correct table & configure Sink to insert data to ClickHouse
dropFlatTable(topicName);
createFlatTable(topicName);
String payloadClickHouseSink = String.join("", Files.readAllLines(Paths.get("src/integrationTest/resources/clickhouse_sink_json.json")));
dropTable(topicName);
createTable(topicName);
String payloadClickHouseSink = String.join("", Files.readAllLines(Paths.get("src/integrationTest/resources/clickhouse_sink.json")));

sleep(5 * 1000);

confluentPlatform.createConnect(String.format(payloadClickHouseSink, "ClickHouseSinkConnectorConnector_Single_Schemaless", "ClickHouseSinkConnectorConnector_Single_Schemaless", parCount, flatTableName, hostname, port, password));
confluentPlatform.createConnect(String.format(payloadClickHouseSink, "ClickHouseSinkConnectorConnector_Single", "ClickHouseSinkConnectorConnector_Single", parCount, topicName, hostname, port, password));

long count = 0;
while (count < 10000) {
sleep(2*1000);
sleep(5*1000);
long endOffset = confluentPlatform.getOffset(topicName, 0 );
if (endOffset % 100 == 0)
System.out.println(endOffset);
if (endOffset >= 10000 / 4) {
if (endOffset == 10000) {
break;
}
count+=1;
}
// TODO : see the progress of the offset currently it is 1 min
sleep(2 * 1000);
sleep(30 * 1000);


count = countRows(flatTableName);
count = countRows(topicName);
System.out.println(count);
while (count < 10000 / 10) {
long tmpCount = countRows(flatTableName);
while (count < 10000) {
long tmpCount = countRows(topicName);
System.out.println(tmpCount);
sleep(2 * 1000);
if (tmpCount > count)
count = tmpCount;
}
assertTrue(countRows(flatTableName) >= 1000);
//assertEquals(10000, countRows(flatTableName));
assertEquals(10000, countRows(topicName));

}

@Ignore

@Test
@Description("stockGenSingleTask")
public void stockGenSingleTaskTest() throws IOException {
@Order(2)
@Description("stockGenSingleTaskSchemalessTest")
public void stockGenSingleTaskSchemalessTest() throws IOException {
dropStateTable();
// Create KeeperMap table
//createStateTable();

String topicName = "stock_gen_topic_single_task";
String topicName = "stock_gen_topic_single_schemaless_task";
String flatTableName = String.format("%s_flat", topicName);
int parCount = 1;
String payloadDataGen = String.join("", Files.readAllLines(Paths.get("src/integrationTest/resources/stock_gen.json")));

confluentPlatform.createTopic(topicName, 1);
confluentPlatform.createConnect(String.format(payloadDataGen, "DatagenConnectorConnector_Single", "DatagenConnectorConnector_Single", parCount, topicName));
confluentPlatform.createConnect(String.format(payloadDataGen, "DatagenConnectorConnector_Single_Schemaless", "DatagenConnectorConnector_Single_Schemaless", parCount, topicName));
sleep(5 * 1000);
String ksqlCreateStreamPayload = String.format("{\"ksql\": \"CREATE STREAM tmp_%s (side STRING, symbol STRING, userid STRING) WITH (KAFKA_TOPIC='%s', VALUE_FORMAT = 'AVRO');\"}", topicName, topicName);
System.out.println(ksqlCreateStreamPayload);
confluentPlatform.runKsql(ksqlCreateStreamPayload);
sleep(5 * 1000);
String ksqlCreateStreamJSONPayload = String.format("{\"ksql\": \"CREATE STREAM %s_flat WITH (KAFKA_TOPIC='%s_flat', VALUE_FORMAT = 'JSON') AS SELECT side, symbol FROM tmp_%s EMIT CHANGES;\"}", topicName, topicName, topicName);
System.out.println(ksqlCreateStreamJSONPayload);
confluentPlatform.runKsql(ksqlCreateStreamJSONPayload);
sleep(5 * 1000);


// Now let's create the correct table & configure Sink to insert data to ClickHouse
dropTable(topicName);
createTable(topicName);
String payloadClickHouseSink = String.join("", Files.readAllLines(Paths.get("src/integrationTest/resources/clickhouse_sink.json")));
dropFlatTable(topicName);
createFlatTable(topicName);
String payloadClickHouseSink = String.join("", Files.readAllLines(Paths.get("src/integrationTest/resources/clickhouse_sink_json.json")));

sleep(5 * 1000);

confluentPlatform.createConnect(String.format(payloadClickHouseSink, "ClickHouseSinkConnectorConnector_Single", "ClickHouseSinkConnectorConnector_Single", parCount, topicName, hostname, port, password));
confluentPlatform.createConnect(String.format(payloadClickHouseSink, "ClickHouseSinkConnectorConnector_Single_Schemaless", "ClickHouseSinkConnectorConnector_Single_Schemaless", parCount, flatTableName, hostname, port, password));

long count = 0;
while (count < 10000) {
sleep(5*1000);
sleep(2*1000);
long endOffset = confluentPlatform.getOffset(topicName, 0 );
if (endOffset % 100 == 0)
System.out.println(endOffset);
if (endOffset == 10000) {
if (endOffset >= 10000 / 4) {
break;
}
count+=1;
}
// TODO : see the progress of the offset currently it is 1 min
sleep(30 * 1000);
sleep(2 * 1000);


count = countRows(topicName);
count = countRows(flatTableName);
System.out.println(count);
while (count < 10000) {
long tmpCount = countRows(topicName);
while (count < 10000 / 10) {
long tmpCount = countRows(flatTableName);
System.out.println(tmpCount);
sleep(2 * 1000);
if (tmpCount > count)
count = tmpCount;
}
assertEquals(10000, countRows(topicName));

assertTrue(countRows(flatTableName) >= 1000);
//assertEquals(10000, countRows(flatTableName));
}

@Test
@Ignore
@Description("stockMultiTask")
public void stockGenMultiTaskTest() throws IOException {
dropStateTable();
@@ -331,8 +330,8 @@ public void stockGenMultiTaskTest() throws IOException {

}

@Test
@Ignore
// @Test
// @Ignore
@Description("stockMultiTaskTopic")
public void stockGenMultiTaskTopicTest() throws IOException {
dropStateTable();
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@
import com.clickhouse.client.*;
import com.clickhouse.kafka.connect.sink.db.mapping.Table;
import com.clickhouse.kafka.connect.sink.db.mapping.Type;
import com.clickhouse.kafka.connect.util.Utils;
import jdk.jfr.Description;
import org.junit.jupiter.api.*;
import org.testcontainers.containers.ClickHouseContainer;
@@ -43,7 +44,6 @@ public void CreateTableTest() {
assertEquals(1, tables.size());
assertEquals(name, tables.get(0));
}

@Test
@Order(2)
@Description("DescTableTest")
@@ -52,7 +52,7 @@ public void DescTableTest() {
String createTable = String.format("CREATE TABLE %s ( `off` Int8 , `str` String , `double` DOUBLE, `arr` Array(Int8), `bool` BOOLEAN) Engine = MergeTree ORDER BY off", name);
chc.query(createTable);
Table table = chc.describeTable(name);
assertEquals(name, table.getName());
assertEquals(Utils.escapeTopicName(name), table.getName());
assertEquals(Type.INT8, table.getColumnByName("off").getType());
assertEquals(Type.STRING, table.getColumnByName("str").getType());
assertEquals(Type.FLOAT64, table.getColumnByName("double").getType());
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@

public class ConfluentPlatform {

private static final String CONFLUENT_VERSION = "7.2.1";
private static final String CONFLUENT_VERSION = "6.1.0";
private static final DockerImageName KAFKA_REST_IMAGE = DockerImageName.parse(
"confluentinc/cp-kafka-rest:" + CONFLUENT_VERSION
);
@@ -35,9 +35,9 @@ public class ConfluentPlatform {
private static final DockerImageName CP_SCHEMA_REGISTRY = DockerImageName.parse(
"confluentinc/cp-schema-registry:" + CONFLUENT_VERSION
);

// 0.4.0-6.0.1
private static final DockerImageName CP_DATA_GEN = DockerImageName.parse(
"cnfldemos/cp-server-connect-datagen:0.5.3-" + CONFLUENT_VERSION
"cnfldemos/cp-server-connect-datagen:0.4.0-" + CONFLUENT_VERSION
);

private static final DockerImageName CP_CONTROL_CENTER = DockerImageName.parse(
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@
import com.clickhouse.kafka.connect.sink.db.mapping.Type;
import com.clickhouse.kafka.connect.util.Mask;

import com.clickhouse.kafka.connect.util.Utils;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.kafka.connect.data.Field;
@@ -29,7 +30,6 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

public class ClickHouseWriter implements DBWriter{

private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseWriter.class);
@@ -95,9 +95,6 @@ public void setBinary(boolean binary) {
isBinary = binary;
}

private String escapeTopicName(String topic) {
return String.format("`%s`", topic);
}
// TODO: we need to refactor that
private String convertHelper(Object v) {
if (v instanceof List) {
@@ -266,7 +263,7 @@ public void doInsertRawBinary(List<Record> records) {
Record first = records.get(0);
String topic = first.getTopic();
LOGGER.info(String.format("Number of records to insert %d to table name %s", batchSize, topic));
Table table = this.mapping.get(escapeTopicName(topic));
Table table = this.mapping.get(Utils.escapeTopicName(topic));
if (table == null) {
//TODO to pick the correct exception here
throw new RuntimeException(String.format("Table %s does not exists", topic));
@@ -340,7 +337,7 @@ public void doInsertJson(List<Record> records) {
Record first = records.get(0);
String topic = first.getTopic();
LOGGER.info(String.format("Number of records to insert %d to table name %s", batchSize, topic));
Table table = this.mapping.get(escapeTopicName(topic));
Table table = this.mapping.get(Utils.escapeTopicName(topic));
if (table == null) {
//TODO to pick the correct exception here
throw new RuntimeException(String.format("Table %s does not exists", topic));
@@ -349,11 +346,6 @@ public void doInsertJson(List<Record> records) {
if ( !validateDataSchema(table, first, true) )
throw new RuntimeException();






try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP)) {
ClickHouseRequest.Mutation request = client.connect(chc.getServer())
.write()
@@ -426,7 +418,7 @@ public void doInsertSimple(List<Record> records) {
LOGGER.info(String.format("Number of records to insert %d to table name %s", batchSize, topic));
// Build the insert SQL
StringBuffer sb = new StringBuffer();
sb.append(String.format("INSERT INTO %s ", escapeTopicName(topic)));
sb.append(String.format("INSERT INTO %s ", Utils.escapeTopicName(topic)));
sb.append(extractFields(first.getFields(), "(", ")", ",", ""));
sb.append(" VALUES ");
LOGGER.debug("sb {}", sb);
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.clickhouse.kafka.connect.sink.db.mapping;

import com.clickhouse.kafka.connect.util.Utils;

import java.util.ArrayList;
import java.util.List;
import java.util.HashMap;
@@ -17,7 +19,7 @@ public Table(String name) {
}

public String getName() {
return String.format("`%s`", name);
return Utils.escapeTopicName(name);
}

public void addColumn(Column column) {
8 changes: 8 additions & 0 deletions src/main/java/com/clickhouse/kafka/connect/util/Utils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.clickhouse.kafka.connect.util;

public class Utils {

public static String escapeTopicName(String topic) {
return String.format("`%s`", topic);
}
}

0 comments on commit e7a8f71

Please sign in to comment.