Skip to content

Commit

Permalink
Updated setup signature removed ddlservice dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
subkanthi committed Oct 20, 2024
1 parent f1add2d commit 699ab46
Show file tree
Hide file tree
Showing 48 changed files with 84 additions and 116 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<maven.compiler.target>17</maven.compiler.target>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<version.debezium>2.5.0.Beta1</version.debezium>
<version.debezium>2.7.2.Final</version.debezium>
<version.junit>5.9.1</version.junit>
<version.checkstyle.plugin>3.1.1</version.checkstyle.plugin>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down
4 changes: 2 additions & 2 deletions sink-connector-lightweight/dependency-reduced-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mongodb</artifactId>
<version>2.7.0.Beta2</version>
<version>2.7.2.Final</version>
<scope>test</scope>
<exclusions>
<exclusion>
Expand Down Expand Up @@ -326,7 +326,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<version.checkstyle.plugin>3.1.1</version.checkstyle.plugin>
<maven.compiler.target>17</maven.compiler.target>
<version.debezium>2.7.0.Beta2</version.debezium>
<version.debezium>2.7.2.Final</version.debezium>
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
</properties>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ public static void main(String[] args) throws Exception {

setupMonitoringThread(new ClickHouseSinkConnectorConfig(PropertiesHelper.toMap(props)), props);

embeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
injector.getInstance(DDLParserService.class), props, false);
embeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), props, false);

try {
DebeziumEmbeddedRestApi.startRestApi(props, injector, debeziumChangeEventCapture, userProperties);
Expand Down Expand Up @@ -141,8 +140,7 @@ public static CompletableFuture<String> startDebeziumEventLoop(Injector injector

Thread.sleep(500);
// embeddedApplication = new ClickHouseDebeziumEmbeddedApplication();
embeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
injector.getInstance(DDLParserService.class), props, true);
embeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), props, true);
return null;
});

Expand All @@ -151,15 +149,15 @@ public static CompletableFuture<String> startDebeziumEventLoop(Injector injector


public static void start(DebeziumRecordParserService recordParserService,
DDLParserService ddlParserService, Properties props, boolean forceStart) throws Exception {
Properties props, boolean forceStart) throws Exception {

if(forceStart == true) {
// Reload the configuration file.
log.info(String.format("******* Reloading configuration file (%s) from disk ******", configurationFile));
loadPropertiesFile(configurationFile);
}
debeziumChangeEventCapture = new DebeziumChangeEventCapture();
debeziumChangeEventCapture.setup(props, recordParserService, ddlParserService, forceStart);
debeziumChangeEventCapture.setup(props, recordParserService, forceStart);
}

public static void stop() throws IOException {
Expand Down Expand Up @@ -210,8 +208,7 @@ public void run() {
log.info("******* Restarting Event Loop ********");
debeziumChangeEventCapture.stop();
Thread.sleep(3000);
start(injector.getInstance(DebeziumRecordParserService.class),
injector.getInstance(DDLParserService.class), props, true);
start(injector.getInstance(DebeziumRecordParserService.class), props, true);
} catch (IOException e) {
log.error("**** ERROR: Restarting Event Loop ****", e);
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public static void startRestApi(Properties props, Injector injector,
try {
debeziumChangeEventCapture.deleteSchemaHistory(config, finalProps1);
} catch (Exception e) {
log.error("Client - Error deleting offsets", e);
log.error("Client - Error deleting schema history", e);
ctx.result(e.toString());
ctx.status(HttpStatus.INTERNAL_SERVER_ERROR);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.storage.jdbc.history.JdbcSchemaHistoryConfig;
import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.connect.data.Field;
Expand Down Expand Up @@ -421,8 +423,8 @@ private Pair<String, String> getDebeziumOffsetStorageDatabaseName(Properties pro
* @return
*/
private Pair<String, String> getDebeziumSchemaHistoryDatabaseName(Properties props) {
String tableName = props.getProperty(JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX +
JdbcOffsetBackingStoreConfig.PROP_TABLE_NAME.name());
String tableName = props.getProperty(SchemaHistory.CONFIGURATION_FIELD_PREFIX_STRING +
JdbcSchemaHistoryConfig.PROP_TABLE_NAME.name());
return splitTableName(tableName);
}

Expand Down Expand Up @@ -615,10 +617,11 @@ public void deleteSchemaHistory(ClickHouseSinkConnectorConfig config, Properties
databaseName, dbCredentials.getUserName(),
dbCredentials.getPassword(), config, this.conn);

// Get topic.prefix from config
String topicPrefix = config.getString(CommonConnectorConfig.TOPIC_PREFIX.name());
new DebeziumOffsetStorage().deleteSchemaHistoryTable(topicPrefix, tableNameDatabaseName.getRight() + "."
+ tableNameDatabaseName.getLeft(),writer);
// Get topic.prefix from properies
String topicPrefix = props.getProperty(CommonConnectorConfig.TOPIC_PREFIX.name());
// String topicPrefix = config.getString(CommonConnectorConfig.TOPIC_PREFIX.name());
// Jdbc adds the database name to the table name, so we need to remove it
new DebeziumOffsetStorage().deleteSchemaHistoryTable(topicPrefix, tableName, writer);

}
/**
Expand Down Expand Up @@ -780,7 +783,7 @@ public void connectorStopped() {
* @param debeziumRecordParserService
*/
public void setup(Properties props, DebeziumRecordParserService debeziumRecordParserService,
DDLParserService ddlParserService, boolean forceStart) throws IOException, ClassNotFoundException {
boolean forceStart) throws IOException, ClassNotFoundException {

// Check if max queue size was defined by the user.
if(props.getProperty(ClickHouseSinkConnectorConfigVariables.MAX_QUEUE_SIZE.toString()) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;

import com.clickhouse.logging.Logger;
import com.clickhouse.logging.LoggerFactory;
import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
Expand Down Expand Up @@ -32,6 +34,7 @@ public class DebeziumOffsetStorage {
public static final String SOURCE_PASSWORD = "source_password";


private static final Logger log = LoggerFactory.getLogger(DebeziumOffsetStorage.class);

public String getOffsetKey(Properties props) {
String connectorName = props.getProperty("name");
Expand Down Expand Up @@ -62,7 +65,8 @@ public void deleteSchemaHistoryTable(String offsetKey,
BaseDbWriter writer) throws SQLException {


String debeziumStorageStatusQuery = String.format("delete from %s where JSONExtractRaw(JSONExtractRaw(history_data,'source'), 'server')='\"%s\"" , tableName, offsetKey);
String debeziumStorageStatusQuery = String.format("delete from `%s` where JSONExtractRaw(JSONExtractRaw(history_data,'source'), 'server')='%s'" , tableName, offsetKey);
log.info("Deleting schema history table query: " + debeziumStorageStatusQuery);
writer.executeQuery(debeziumStorageStatusQuery);
}
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,7 @@ public void testDecoderBufsPlugin() throws Exception {
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(getProperties(), new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()),
"employees"), false);
engine.get().setup(getProperties(), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ public void testPgOutputPlugin() throws Exception {
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(getProperties(), new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "system"), false);
engine.get().setup(getProperties(), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ static public Properties getDebeziumProperties(MySQLContainer mySqlContainer, Cl
defaultProps.setProperty("clickhouse.server.port", String.valueOf(clickHouseContainer.getFirstMappedPort()));
defaultProps.setProperty("clickhouse.server.user", clickHouseContainer.getUsername());
defaultProps.setProperty("clickhouse.server.password", clickHouseContainer.getPassword());
defaultProps.setProperty("ddl.retry", "true");
//defaultProps.setProperty("ddl.retry", "true");

defaultProps.setProperty("offset.storage.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ public void testMultipleDatabases() throws Exception {
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(props, new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "test_db"),false);
engine.get().setup(props, new SourceRecordParserService(),false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ public void testMySQLGeneratedColumns() throws Exception {
Properties props = getDebeziumProperties(mySqlContainer, clickHouseContainer);

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(props, new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()),
"employees"), false);
engine.get().setup(props, new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ public void testMultipleDatabases() throws Exception {
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(props, new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "test_db"),false);
engine.get().setup(props, new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ public void testAutoCreateTable(String clickHouseServerVersion) throws Exception
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(ITCommon.getDebeziumPropertiesForSchemaOnly(mySqlContainer, clickHouseContainer), new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "datatypes"),false);
engine.get().setup(ITCommon.getDebeziumPropertiesForSchemaOnly(mySqlContainer, clickHouseContainer), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ public void testDecoderBufsPlugin() throws Exception {
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(getProperties(), new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "employees"), false);
engine.get().setup(getProperties(), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ public void testDecoderBufsPlugin() throws Exception {
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(getProperties(), new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "employees"), false);
engine.get().setup(getProperties(), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ public void testMultipleSchemaReplication() throws Exception {
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(getProperties(), new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "system"), false);
engine.get().setup(getProperties(), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ public void testReplicatedRMTAutoCreate(String clickHouseServerVersion) throws E
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(props, new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "employees"), false);
engine.get().setup(props, new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ public void testReplicatedRMTAutoCreate(String clickHouseServerVersion) throws E
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(props, new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "employees"), false);
engine.get().setup(props, new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ public void testReplicatedRMTAutoCreate(String clickHouseServerVersion) throws E
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(props, new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "employees"), false);
engine.get().setup(props, new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,7 @@ public void testReplicatedRMTAutoCreate(String clickHouseServerVersion) throws E
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(props, new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()),
"employees"), false);
engine.get().setup(props, new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ public void testRESTAPI(String clickHouseServerVersion) throws Exception {
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(ITCommon.getDebeziumPropertiesForSchemaOnly(mySqlContainer, clickHouseContainer), new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "datatypes"),false);
engine.get().setup(ITCommon.getDebeziumPropertiesForSchemaOnly(mySqlContainer, clickHouseContainer), new SourceRecordParserService()
,false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ public void testBatchRetryOnCHFailure() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {
clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
injector.getInstance(DDLParserService.class), props, false);
clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class) , props, false);
DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture()
, new Properties());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ public void testClickHouseDelayedStart() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {
clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
injector.getInstance(DDLParserService.class), props, false);
clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), props, false);
DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture()
, new Properties());
} catch (Exception e) {
Expand Down Expand Up @@ -136,8 +135,7 @@ public void debeziumStorageView() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {
clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
injector.getInstance(DDLParserService.class), props, false);
clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), props, false);
DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture()
, new Properties());
} catch (Exception e) {
Expand Down
Loading

0 comments on commit 699ab46

Please sign in to comment.