Skip to content

Commit

Permalink
EIP-61 Added properties to specify offset and history storage
Browse files Browse the repository at this point in the history
  • Loading branch information
wluyima committed Nov 19, 2021
1 parent 325d389 commit 48630d0
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ public void configure() {
logger.info("Starting debezium...");

RouteDefinition routeDef = from(
"debezium-mysql:extract?databaseServerId={{debezium.db.serverId}}&databaseServerName={{debezium.db.serverName}}&databaseHostname={{openmrs.db.host}}&databasePort={{openmrs.db.port}}&databaseUser={{debezium.db.user}}&databasePassword={{debezium.db.password}}&databaseWhitelist={{openmrs.db.name}}&offsetStorageFileName={{debezium.offsetFilename}}&databaseHistoryFileFilename={{debezium.historyFilename}}&tableWhitelist={{debezium.tablesToSync}}&offsetFlushIntervalMs=0&snapshotMode={{debezium.snapshotMode}}&snapshotFetchSize=1000&snapshotLockingMode={{debezium.snapshotLockingMode}}&includeSchemaChanges=false&maxBatchSize={{debezium.reader.maxBatchSize}}&offsetStorage=org.openmrs.eip.mysql.watcher.CustomFileOffsetBackingStore&offsetCommitTimeoutMs=15000")
.routeId(DEBEZIUM_ROUTE_ID);
"debezium-mysql:extract?databaseServerId={{debezium.db.serverId}}&databaseServerName={{debezium.db.serverName}}&databaseHostname={{openmrs.db.host}}&databasePort={{openmrs.db.port}}&databaseUser={{debezium.db.user}}&databasePassword={{debezium.db.password}}&databaseWhitelist={{openmrs.db.name}}&offsetStorageFileName={{debezium.offsetFilename}}&databaseHistoryFileFilename={{debezium.historyFilename}}&tableWhitelist={{debezium.tablesToSync}}&offsetFlushIntervalMs=0&snapshotMode={{debezium.snapshotMode}}&snapshotFetchSize=1000&snapshotLockingMode={{debezium.snapshotLockingMode}}&includeSchemaChanges=false&maxBatchSize={{debezium.reader.maxBatchSize}}&offsetStorage={{"
+ WatcherConstants.PROP_DBZM_OFFSET_STORAGE_CLASS + "}}&databaseHistory={{"
+ WatcherConstants.PROP_DBZM_OFFSET_HISTORY_CLASS + "}}&offsetCommitTimeoutMs=15000")
.routeId(DEBEZIUM_ROUTE_ID);

logger.info("Setting debezium route handler to: " + WatcherConstants.SHUTDOWN_HANDLER_REF);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ public class WatcherConstants {

public static final String PROP_URI_EVENT_PROCESSOR = "watcher.uri.event.processor";

public static final String PROP_DBZM_OFFSET_STORAGE_CLASS = "debezium.offsetStorage";

public static final String PROP_DBZM_OFFSET_HISTORY_CLASS = "debezium.databaseHistory";

public static final String PROP_URI_ERROR_HANDLER = "watcher.uri.error.handler";

public static final String URI_EVENT_PROCESSOR = "direct:db-event-processor";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.commons.lang3.StringUtils;
import org.openmrs.eip.EIPException;
import org.openmrs.eip.Utils;
import org.openmrs.eip.mysql.watcher.CustomFileOffsetBackingStore;
import org.openmrs.eip.mysql.watcher.WatcherConstants;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
Expand All @@ -25,6 +26,8 @@
import org.springframework.core.env.MapPropertySource;
import org.springframework.core.env.PropertySource;

import io.debezium.relational.history.FileDatabaseHistory;

@Configuration
@ComponentScan("org.openmrs.eip.mysql.watcher")
public class WatcherConfig {
Expand Down Expand Up @@ -69,6 +72,14 @@ public PropertySource getCustomPropertySource(ConfigurableEnvironment env) {
}

Map<String, Object> props = new HashMap();
if (StringUtils.isBlank(env.getProperty(WatcherConstants.PROP_DBZM_OFFSET_STORAGE_CLASS))) {
props.put(WatcherConstants.PROP_DBZM_OFFSET_STORAGE_CLASS, CustomFileOffsetBackingStore.class.getName());
}

if (StringUtils.isBlank(env.getProperty(WatcherConstants.PROP_DBZM_OFFSET_HISTORY_CLASS))) {
props.put(WatcherConstants.PROP_DBZM_OFFSET_HISTORY_CLASS, FileDatabaseHistory.class.getName());
}

props.put("debezium.tablesToSync", StringUtils.join(tables, ","));
props.put(WatcherConstants.PROP_URI_EVENT_PROCESSOR, WatcherConstants.URI_EVENT_PROCESSOR);
props.put(PROP_URI_ERROR_HANDLER, WatcherConstants.URI_ERROR_HANDLER);
Expand Down

0 comments on commit 48630d0

Please sign in to comment.