Skip to content

Commit

Permalink
Added logic to detect connector class(MySQL or PostgreSQL) and change…
Browse files Browse the repository at this point in the history
… log messages.
  • Loading branch information
subkanthi committed Jan 22, 2025
1 parent 5908bed commit 9422d67
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper;
import com.altinity.clickhouse.debezium.embedded.config.SinkConnectorLightWeightConfig;
import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService;
import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService;
import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserService;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.altinity.clickhouse.sink.connector.executor;

import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct;
import com.altinity.clickhouse.sink.connector.common.ConnectorType;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -74,7 +75,7 @@ public static Pair<Long, Long> calculateMinMaxTimestampFromBatch(List<ClickHouse
/**
* Function to check if there are inflight requests that are within the range of the
* current batch.
* @param batch
* @param currentBatch
*/
static boolean checkIfThereAreInflightRequests(List<ClickHouseStruct> currentBatch) {
boolean result = false;
Expand Down Expand Up @@ -130,7 +131,7 @@ static synchronized public boolean checkIfBatchCanBeCommitted(List<ClickHouseStr
return result;
}

static synchronized void acknowledgeRecords(List<ClickHouseStruct> batch) throws InterruptedException {
static synchronized void acknowledgeRecords(List<ClickHouseStruct> batch, ConnectorType connectorType ) throws InterruptedException {
// Acknowledge the records.

// acknowledge records
Expand All @@ -140,15 +141,18 @@ static synchronized void acknowledgeRecords(List<ClickHouseStruct> batch) throws
if (record.getCommitter() != null && record.getSourceRecord() != null) {

record.getCommitter().markProcessed(record.getSourceRecord());
// log.debug("***** Record successfully marked as processed ****" + "Binlog file:" +
// record.getFile() + " Binlog position: " + record.getPos() + " GTID: " + record.getGtid()
// + "Sequence Number: " + record.getSequenceNumber() + "Debezium Timestamp: " + record.getDebezium_ts_ms());

if(record.isLastRecordInBatch()) {
record.getCommitter().markBatchFinished();
log.info("***** BATCH marked as processed to debezium ****" + "Binlog file:" +

if(ConnectorType.MYSQL.getValue().equalsIgnoreCase(connectorType.getValue())) {
log.info("***** BATCH marked as processed to debezium ****" + "Binlog file:" +
record.getFile() + " Binlog position: " + record.getPos() + " GTID: " + record.getGtid()
+ " Sequence Number: " + record.getSequenceNumber() + " Debezium Timestamp: " + record.getDebezium_ts_ms());
} else if(ConnectorType.POSTGRES.getValue().equalsIgnoreCase(connectorType.getValue())) {
log.info("***** BATCH marked as processed to debezium ****" + "LSN: " + record.getLsn()
+ "Debezium Timestamp: " + record.getDebezium_ts_ms());
}
}
}
}
Expand Down

0 comments on commit 9422d67

Please sign in to comment.