From 3665573a204a80f1272557aefc1dc52413c4b5f6 Mon Sep 17 00:00:00 2001 From: Pravin Bhat Date: Wed, 27 Nov 2024 12:41:59 -0500 Subject: [PATCH] Fixed connection issue caused when using different types of origin and target clusters (e.g. Cassandra/DSE with host/port and Astra with SCB). (#334) --- RELEASE.md | 3 +++ .../statement/TargetUpsertRunDetailsStatement.java | 9 ++++++--- src/main/scala/com/datastax/cdm/job/BaseJob.scala | 12 +++++------- .../com/datastax/cdm/job/ConnectionFetcher.scala | 4 ++-- src/main/scala/com/datastax/cdm/job/DiffData.scala | 8 +++++--- .../scala/com/datastax/cdm/job/GuardrailCheck.scala | 5 +++-- src/main/scala/com/datastax/cdm/job/Migrate.scala | 10 ++++++---- .../com/datastax/cdm/job/ConnectionFetcherTest.java | 2 +- 8 files changed, 31 insertions(+), 22 deletions(-) diff --git a/RELEASE.md b/RELEASE.md index 267605c8..3d90a804 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,4 +1,7 @@ # Release Notes +## [5.1.3] - 2024-11-27 +- Bug fix: Fixed connection issue caused when using different types of origin and target clusters (e.g. Cassandra/DSE with host/port and Astra with SCB). + ## [5.1.2] - 2024-11-26 - Bug fix: SCB file on some Spark worker nodes may get deleted before the connection is established, which may cause connection exception on that worker node. Added a static async SCB delete delay to address such issues. diff --git a/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java b/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java index 6028a7c6..6bd4c964 100644 --- a/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java +++ b/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java @@ -66,10 +66,13 @@ public TargetUpsertRunDetailsStatement(CqlSession session, String keyspaceTable) // TODO: Remove this code block after a few releases, its only added for backward compatibility try { this.session.execute("ALTER TABLE " + cdmKsTabInfo + " ADD status TEXT"); + } catch (Exception e) { // ignore if column already exists + logger.debug("Column 'status' already exists in table {}", cdmKsTabInfo); + } + try { this.session.execute("ALTER TABLE " + cdmKsTabDetails + " ADD run_info TEXT"); - } catch (Exception e) { - // ignore if column already exists - logger.trace("Column 'status' already exists in table {}", cdmKsTabInfo); + } catch (Exception e) { // ignore if column already exists + logger.debug("Column 'run_info' already exists in table {}", cdmKsTabDetails); } boundInitInfoStatement = bindStatement("INSERT INTO " + cdmKsTabInfo diff --git a/src/main/scala/com/datastax/cdm/job/BaseJob.scala b/src/main/scala/com/datastax/cdm/job/BaseJob.scala index c2ac3111..15c99784 100644 --- a/src/main/scala/com/datastax/cdm/job/BaseJob.scala +++ b/src/main/scala/com/datastax/cdm/job/BaseJob.scala @@ -17,7 +17,7 @@ package com.datastax.cdm.job import com.datastax.cdm.properties.{KnownProperties, PropertyHelper} import com.datastax.spark.connector.cql.CassandraConnector -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.slf4j.LoggerFactory @@ -40,7 +40,6 @@ abstract class BaseJob[T: ClassTag] extends App { var spark: SparkSession = _ var sContext: SparkContext = _ - var sc: SparkConf = _ var propertyHelper: PropertyHelper = _ var consistencyLevel: String = _ @@ -69,8 +68,7 @@ abstract class BaseJob[T: ClassTag] extends App { .appName(jobName) .getOrCreate() sContext = spark.sparkContext - sc = sContext.getConf - propertyHelper = PropertyHelper.getInstance(sc); + propertyHelper = PropertyHelper.getInstance(sContext.getConf); runId = propertyHelper.getLong(KnownProperties.RUN_ID) prevRunId = propertyHelper.getLong(KnownProperties.PREV_RUN_ID) @@ -79,9 +77,9 @@ abstract class BaseJob[T: ClassTag] extends App { runId = System.nanoTime(); } consistencyLevel = propertyHelper.getString(KnownProperties.READ_CL) - connectionFetcher = new ConnectionFetcher(sc, propertyHelper) - originConnection = connectionFetcher.getConnection(Side.ORIGIN, consistencyLevel, runId) - targetConnection = connectionFetcher.getConnection(Side.TARGET, consistencyLevel, runId) + connectionFetcher = new ConnectionFetcher(propertyHelper) + originConnection = connectionFetcher.getConnection(sContext.getConf, Side.ORIGIN, consistencyLevel, runId) + targetConnection = connectionFetcher.getConnection(sContext.getConf, Side.TARGET, consistencyLevel, runId) val hasRandomPartitioner: Boolean = { val partitionerName = originConnection.withSessionDo(_.getMetadata.getTokenMap.get().getPartitionerName) diff --git a/src/main/scala/com/datastax/cdm/job/ConnectionFetcher.scala b/src/main/scala/com/datastax/cdm/job/ConnectionFetcher.scala index 2b723a9e..9bae3e48 100644 --- a/src/main/scala/com/datastax/cdm/job/ConnectionFetcher.scala +++ b/src/main/scala/com/datastax/cdm/job/ConnectionFetcher.scala @@ -23,7 +23,7 @@ import com.datastax.cdm.data.DataUtility.generateSCB import com.datastax.cdm.data.PKFactory.Side // TODO: CDM-31 - add localDC configuration support -class ConnectionFetcher(config: SparkConf, propertyHelper: IPropertyHelper) extends Serializable { +class ConnectionFetcher(propertyHelper: IPropertyHelper) extends Serializable { val logger: Logger = LoggerFactory.getLogger(this.getClass.getName) def getConnectionDetails(side: Side): ConnectionDetails = { @@ -63,7 +63,7 @@ class ConnectionFetcher(config: SparkConf, propertyHelper: IPropertyHelper) exte } } - def getConnection(side: Side, consistencyLevel: String, runId: Long): CassandraConnector = { + def getConnection(config: SparkConf, side: Side, consistencyLevel: String, runId: Long): CassandraConnector = { val connectionDetails = getConnectionDetails(side) logger.info("PARAM -- SSL Enabled: "+connectionDetails.sslEnabled); diff --git a/src/main/scala/com/datastax/cdm/job/DiffData.scala b/src/main/scala/com/datastax/cdm/job/DiffData.scala index c2bab7ff..c37153f1 100644 --- a/src/main/scala/com/datastax/cdm/job/DiffData.scala +++ b/src/main/scala/com/datastax/cdm/job/DiffData.scala @@ -17,7 +17,7 @@ package com.datastax.cdm.job import com.datastax.cdm.feature.TrackRun import com.datastax.cdm.data.PKFactory.Side -import com.datastax.cdm.properties.{KnownProperties, PropertyHelper} +import com.datastax.cdm.properties.KnownProperties import com.datastax.cdm.job.IJobSessionFactory.JobType object DiffData extends BasePartitionJob { @@ -34,6 +34,8 @@ object DiffData extends BasePartitionJob { var ma = new CDMMetricsAccumulator(jobType) sContext.register(ma, "CDMMetricsAccumulator") + val bcOriginConfig = sContext.broadcast(sContext.getConf) + val bcTargetConfig = sContext.broadcast(sContext.getConf) val bcConnectionFetcher = sContext.broadcast(connectionFetcher) val bcPropHelper = sContext.broadcast(propertyHelper) val bcJobFactory = sContext.broadcast(jobFactory) @@ -42,8 +44,8 @@ object DiffData extends BasePartitionJob { slices.foreach(slice => { if (null == originConnection) { - originConnection = bcConnectionFetcher.value.getConnection(Side.ORIGIN, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value) - targetConnection = bcConnectionFetcher.value.getConnection(Side.TARGET, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value) + originConnection = bcConnectionFetcher.value.getConnection(bcOriginConfig.value, Side.ORIGIN, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value) + targetConnection = bcConnectionFetcher.value.getConnection(bcTargetConfig.value, Side.TARGET, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value) trackRunFeature = targetConnection.withSessionDo(targetSession => new TrackRun(targetSession, bcKeyspaceTableValue.value)) } originConnection.withSessionDo(originSession => diff --git a/src/main/scala/com/datastax/cdm/job/GuardrailCheck.scala b/src/main/scala/com/datastax/cdm/job/GuardrailCheck.scala index 6c86e65c..810e2d35 100644 --- a/src/main/scala/com/datastax/cdm/job/GuardrailCheck.scala +++ b/src/main/scala/com/datastax/cdm/job/GuardrailCheck.scala @@ -16,7 +16,7 @@ package com.datastax.cdm.job import com.datastax.cdm.data.PKFactory.Side -import com.datastax.cdm.properties.{KnownProperties, PropertyHelper} +import com.datastax.cdm.properties.KnownProperties import com.datastax.cdm.job.IJobSessionFactory.JobType object GuardrailCheck extends BasePartitionJob { @@ -32,13 +32,14 @@ object GuardrailCheck extends BasePartitionJob { var ma = new CDMMetricsAccumulator(jobType) sContext.register(ma, "CDMMetricsAccumulator") + val bcOriginConfig = sContext.broadcast(sContext.getConf) val bcConnectionFetcher = sContext.broadcast(connectionFetcher) val bcPropHelper = sContext.broadcast(propertyHelper) val bcJobFactory = sContext.broadcast(jobFactory) slices.foreach(slice => { if (null == originConnection) { - originConnection = bcConnectionFetcher.value.getConnection(Side.ORIGIN, bcPropHelper.value.getString(KnownProperties.READ_CL), 0) + originConnection = bcConnectionFetcher.value.getConnection(bcOriginConfig.value, Side.ORIGIN, bcPropHelper.value.getString(KnownProperties.READ_CL), 0) } originConnection.withSessionDo(originSession => bcJobFactory.value.getInstance(originSession, null, bcPropHelper.value) diff --git a/src/main/scala/com/datastax/cdm/job/Migrate.scala b/src/main/scala/com/datastax/cdm/job/Migrate.scala index c63d71ee..eb968825 100644 --- a/src/main/scala/com/datastax/cdm/job/Migrate.scala +++ b/src/main/scala/com/datastax/cdm/job/Migrate.scala @@ -18,7 +18,7 @@ package com.datastax.cdm.job import com.datastax.cdm.feature.TrackRun import com.datastax.cdm.job.CDMMetricsAccumulator import com.datastax.cdm.data.PKFactory.Side -import com.datastax.cdm.properties.{KnownProperties, PropertyHelper} +import com.datastax.cdm.properties.KnownProperties import com.datastax.cdm.job.IJobSessionFactory.JobType object Migrate extends BasePartitionJob { @@ -34,7 +34,9 @@ object Migrate extends BasePartitionJob { jobFactory.getInstance(originSession, targetSession, propertyHelper).initCdmRun(runId, prevRunId, parts, trackRunFeature, jobType))) var ma = new CDMMetricsAccumulator(jobType) sContext.register(ma, "CDMMetricsAccumulator") - + + val bcOriginConfig = sContext.broadcast(sContext.getConf) + val bcTargetConfig = sContext.broadcast(sContext.getConf) val bcConnectionFetcher = sContext.broadcast(connectionFetcher) val bcPropHelper = sContext.broadcast(propertyHelper) val bcJobFactory = sContext.broadcast(jobFactory) @@ -43,8 +45,8 @@ object Migrate extends BasePartitionJob { slices.foreach(slice => { if (null == originConnection) { - originConnection = bcConnectionFetcher.value.getConnection(Side.ORIGIN, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value) - targetConnection = bcConnectionFetcher.value.getConnection(Side.TARGET, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value) + originConnection = bcConnectionFetcher.value.getConnection(bcOriginConfig.value, Side.ORIGIN, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value) + targetConnection = bcConnectionFetcher.value.getConnection(bcTargetConfig.value, Side.TARGET, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value) trackRunFeature = targetConnection.withSessionDo(targetSession => new TrackRun(targetSession, bcKeyspaceTableValue.value)) } originConnection.withSessionDo(originSession => diff --git a/src/test/java/com/datastax/cdm/job/ConnectionFetcherTest.java b/src/test/java/com/datastax/cdm/job/ConnectionFetcherTest.java index ffc224ec..8b7d0d5a 100644 --- a/src/test/java/com/datastax/cdm/job/ConnectionFetcherTest.java +++ b/src/test/java/com/datastax/cdm/job/ConnectionFetcherTest.java @@ -45,7 +45,7 @@ public void setup() { commonSetupWithoutDefaultClassVariables(); MockitoAnnotations.openMocks(this); - cf = new ConnectionFetcher(conf, propertyHelper); + cf = new ConnectionFetcher(propertyHelper); } @Test