Skip to content

Commit

Permalink
Add support for parameterized test to test db override functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
subkanthi committed Oct 19, 2024
1 parent bbe062d commit f1add2d
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 58 deletions.
2 changes: 1 addition & 1 deletion sink-connector-lightweight/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<maven.compiler.target>17</maven.compiler.target>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<version.debezium>3.0.0.Final</version.debezium>
<version.debezium>2.7.2.Final</version.debezium>
<version.junit>5.9.1</version.junit>
<version.testcontainers>1.19.1</version.testcontainers>
<version.checkstyle.plugin>3.1.1</version.checkstyle.plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,20 @@ private void performDDLOperation(String DDL, Properties props, SourceRecord sr,
updateMetrics(DDL, writer);
}

/**
* Function to get the database name from the SourceRecord.
* If the database name is not present in the SourceRecord, then
* the database name is set to "system".
* Also if a database is overridden in the configuration, then
* the database name is set to the overridden database name.
* @param sr
* @return
*/
private String getDatabaseName(SourceRecord sr) {
if (sr != null && sr.key() instanceof Struct) {
String recordDbName = (String) ((Struct) sr.key()).get("databaseName");
if (recordDbName != null && !recordDbName.isEmpty()) {

return recordDbName;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,8 @@ public MySqlDDLParserListenerImpl(BaseDbWriter writer, StringBuffer transformedQ
} catch(Exception e) {
log.error("enterCreateDatabase: Error parsing source to destination database map:" + e.toString());
}
// databaseName might contain backticks. Remove them.
if(databaseName.contains("`")) {
databaseName = databaseName.replace("`", "");
}

if(sourceToDestinationMap.containsKey(databaseName)) {
this.databaseName = sourceToDestinationMap.get(databaseName);
} else {
this.databaseName = databaseName;
}
this.databaseName = overrideDatabaseName(databaseName);

this.query = transformedQuery;
this.tableName = tableName;
Expand All @@ -74,6 +66,23 @@ public MySqlDDLParserListenerImpl(BaseDbWriter writer, StringBuffer transformedQ
this.userProvidedTimeZone = parseTimeZone();
}

/**
* Function to override the database name.
* @param databaseName
* @return
*/
private String overrideDatabaseName(String databaseName) {

// databaseName might contain backticks. Remove them.
if(databaseName.contains("`")) {
databaseName = databaseName.replace("`", "");
}

if(sourceToDestinationMap.containsKey(databaseName)) {
return sourceToDestinationMap.get(databaseName);
}
return databaseName;
}

public ZoneId parseTimeZone() {
String userProvidedTimeZone = config.getString(ClickHouseSinkConnectorConfigVariables
Expand Down Expand Up @@ -102,25 +111,9 @@ public void enterCreateDatabase(MySqlParser.CreateDatabaseContext createDatabase

String databaseName = tree.getText();
if(!databaseName.isEmpty()) {
// Check if the database is overridden
Map<String, String> sourceToDestinationMap = new HashMap<>();

try {
if (this.config.getString(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_DATABASE_OVERRIDE_MAP.toString()) != null)
sourceToDestinationMap = Utils.parseSourceToDestinationDatabaseMap(this.config.
getString(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_DATABASE_OVERRIDE_MAP.toString()));
} catch(Exception e) {
log.error("enterCreateDatabase: Error parsing source to destination database map:" + e.toString());
}
// databaseName might contain backticks. Remove them.
if(databaseName.contains("`")) {
databaseName = databaseName.replace("`", "");
}
if(sourceToDestinationMap.containsKey(databaseName)) {
this.query.append(String.format(Constants.CREATE_DATABASE, sourceToDestinationMap.get(databaseName)));
} else {
this.query.append(String.format(Constants.CREATE_DATABASE, databaseName));
}

String overrideDatabaseName = overrideDatabaseName(tree.getText());
this.query.append(String.format(Constants.CREATE_DATABASE, overrideDatabaseName));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ static public Properties getDebeziumProperties(MySQLContainer mySqlContainer, Cl
defaultProps.setProperty("clickhouse.server.port", String.valueOf(clickHouseContainer.getFirstMappedPort()));
defaultProps.setProperty("clickhouse.server.user", clickHouseContainer.getUsername());
defaultProps.setProperty("clickhouse.server.password", clickHouseContainer.getPassword());
defaultProps.setProperty("ddl.retry", "true");

defaultProps.setProperty("offset.storage.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
Expand All @@ -22,6 +23,7 @@
import java.sql.Connection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -60,15 +62,19 @@ public void startContainers() throws InterruptedException {
clickHouseContainer.start();
}
@ParameterizedTest
@CsvSource({
"clickhouse/clickhouse-server:latest",
"clickhouse/clickhouse-server:22.3"
})
@ValueSource(booleans = {
false,
true}
)
@DisplayName("Test that validates DDL(Create, ALTER, RENAME)")
public void testTableOperations(String clickHouseServerVersion) throws Exception {
public void testTableOperations(boolean databaseOverride) throws Exception {

AtomicReference<DebeziumChangeEventCapture> engine = new AtomicReference<>();

Properties props = ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer);
if(databaseOverride) {
props.setProperty("database.override.map", "employees:ch_employees, datatypes:ch_datatypes, public:ch_public, project:ch_project");
}
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {
Expand Down Expand Up @@ -131,24 +137,7 @@ public void testTableOperations(String clickHouseServerVersion) throws Exception

// Validate table created with partitions.
String membersResult = writer.executeQuery("show create table members");

if(clickHouseServerVersion.contains("latest")) {
Assert.assertTrue(membersResult.equalsIgnoreCase("CREATE TABLE employees.members\n" +
"(\n" +
" `firstname` String,\n" +
" `lastname` String,\n" +
" `username` String,\n" +
" `email` Nullable(String),\n" +
" `joined` Date32,\n" +
" `_version` UInt64,\n" +
" `is_deleted` UInt8\n" +
")\n" +
"ENGINE = ReplacingMergeTree(_version, is_deleted)\n" +
"PARTITION BY joined\n" +
"ORDER BY tuple()\n" +
"SETTINGS index_granularity = 8192"));
} else {
Assert.assertTrue(membersResult.equalsIgnoreCase("CREATE TABLE employees.members\n" +
Assert.assertTrue(membersResult.equalsIgnoreCase("CREATE TABLE employees.members\n" +
"(\n" +
" `firstname` String,\n" +
" `lastname` String,\n" +
Expand All @@ -162,12 +151,10 @@ public void testTableOperations(String clickHouseServerVersion) throws Exception
"PARTITION BY joined\n" +
"ORDER BY tuple()\n" +
"SETTINGS index_granularity = 8192"));
}

String rcxResult = writer.executeQuery("show create table rcx");

if(clickHouseServerVersion.contains("latest")) {
Assert.assertTrue(rcxResult.equalsIgnoreCase("CREATE TABLE employees.rcx\n" +
Assert.assertTrue(rcxResult.equalsIgnoreCase("CREATE TABLE employees.rcx\n" +
"(\n" +
" `a` Int32,\n" +
" `b` Nullable(Int32),\n" +
Expand All @@ -180,8 +167,6 @@ public void testTableOperations(String clickHouseServerVersion) throws Exception
"PARTITION BY (a, d, c)\n" +
"ORDER BY tuple()\n" +
"SETTINGS index_granularity = 8192"));
}


if(engine.get() != null) {
engine.get().stop();
Expand Down
2 changes: 1 addition & 1 deletion sink-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>3.0.0.Final</version>
<version>2.7.2.Final</version>
</dependency>

<dependency>
Expand Down

0 comments on commit f1add2d

Please sign in to comment.