Skip to content

Commit

Permalink
Added a static async SCB delete delay to address intermittent connect…
Browse files Browse the repository at this point in the history
…ion issues on spark worker nodes.
  • Loading branch information
pravinbhat committed Nov 25, 2024
1 parent 0729d97 commit ba1cd32
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 30 deletions.
39 changes: 21 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ tar -xvzf spark-3.5.3-bin-hadoop3-scala2.13.tgz
```

> [!CAUTION]
> If the above Spark and Scala version does not match, you may see an exception similar like below when running the CDM jobs,
> If the above Spark and Scala version does not match, you may see an exception like below when running the CDM jobs,
```
Exception in thread "main" java.lang.NoSuchMethodError: scala.runtime.Statics.releaseFence()V
```
Expand All @@ -41,24 +41,24 @@ Exception in thread "main" java.lang.NoSuchMethodError: scala.runtime.Statics.re
# Steps for Data-Migration:

1. `cdm.properties` file needs to be configured as applicable for the environment. Parameter descriptions and defaults are described in the file. The file can have any name, it does not need to be `cdm.properties`.
> * A simplified sample properties file configuration can be found here as [cdm.properties](./src/resources/cdm.properties)
> * A complete sample properties file configuration can be found here as [cdm-detailed.properties](./src/resources/cdm-detailed.properties)
1. `cdm.properties` file needs to be configured as applicable for the environment. The file can have any name, it does not need to be `cdm.properties`.
> * A sample properties file with default values can be found here as [cdm.properties](./src/resources/cdm.properties)
> * A complete reference properties file with default values can be found here as [cdm-detailed.properties](./src/resources/cdm-detailed.properties)
2. Place the properties file where it can be accessed while running the job via spark-submit.
3. Run the below job using `spark-submit` command as shown below:
3. Run the job using `spark-submit` command as shown below:

```
spark-submit --properties-file cdm.properties \
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
--master "local[*]" --driver-memory 25G --executor-memory 25G \
--class com.datastax.cdm.job.Migrate cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
--class com.datastax.cdm.job.Migrate cassandra-data-migrator-5.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
```

**Note:**
- Above command generates a log file `logfile_name_*.txt` to avoid log output on the console.
- Update the memory options (driver & executor memory) based on your use-case
- To track details of a run in the `target` keyspace, pass param `--conf spark.cdm.trackRun=true`
- To filter and migrate data only in a specific token range, you can pass the below two additional params to the `Migration` or `Validation` jobs
- To track details of a run (recorded on the `target` keyspace), pass param `--conf spark.cdm.trackRun=true`
- To filter records only for a specific token range, pass the below two additional params to the `Migration` OR `Validation` job

```
--conf spark.cdm.filter.cassandra.partition.min=<token-range-min>
Expand All @@ -73,7 +73,7 @@ spark-submit --properties-file cdm.properties \
spark-submit --properties-file cdm.properties \
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
--master "local[*]" --driver-memory 25G --executor-memory 25G \
--class com.datastax.cdm.job.DiffData cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
--class com.datastax.cdm.job.DiffData cassandra-data-migrator-5.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
```

- Validation job will report differences as “ERRORS” in the log file as shown below.
Expand All @@ -95,13 +95,13 @@ spark-submit --properties-file cdm.properties \
--conf spark.executor.extraJavaOptions='-Dlog4j.configurationFile=log4j2.properties' \
--conf spark.driver.extraJavaOptions='-Dlog4j.configurationFile=log4j2.properties' \
--master "local[*]" --driver-memory 25G --executor-memory 25G \
--class com.datastax.cdm.job.DiffData cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
--class com.datastax.cdm.job.DiffData cassandra-data-migrator-5.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
```

- The Validation job can also be run in an AutoCorrect mode. This mode can
- Add any missing records from origin to target
- Update any mismatched records between origin and target (makes target same as origin).
- Enable/disable this feature using one or both of the below setting in the config file
- Add any missing records from `origin` to `target`
- Update any mismatched records between `origin` and `target`
- Enable/disable this feature using one or both of the below params in the properties file
```
spark.cdm.autocorrect.missing false|true
spark.cdm.autocorrect.mismatch false|true
Expand All @@ -117,24 +117,27 @@ spark-submit --properties-file cdm.properties \
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
--conf spark.cdm.trackRun.previousRunId=<prev_run_id> \
--master "local[*]" --driver-memory 25G --executor-memory 25G \
--class com.datastax.cdm.job.<Migrate|DiffData> cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
--class com.datastax.cdm.job.<Migrate|DiffData> cassandra-data-migrator-5.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
```

# Perform large-field Guardrail violation checks
- The tool can be used to identify large fields from a table that may break you cluster guardrails (e.g. AstraDB has a 10MB limit for a single large field), use class option `--class com.datastax.cdm.job.GuardrailCheck` as shown below
- This mode can help identify large fields on an `origin` table that may break you cluster guardrails (e.g. AstraDB has a 10MB limit for a single large field), use class option `--class com.datastax.cdm.job.GuardrailCheck` as shown below

```
spark-submit --properties-file cdm.properties \
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
--conf spark.cdm.feature.guardrail.colSizeInKB=10000 \
--master "local[*]" --driver-memory 25G --executor-memory 25G \
--class com.datastax.cdm.job.GuardrailCheck cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
--class com.datastax.cdm.job.GuardrailCheck cassandra-data-migrator-5.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
```

> [!NOTE]
> This mode only operates on one database i.e. `origin`, there is no `target` in this mode
# Features
- Auto-detects table schema (column names, types, keys, collections, UDTs, etc.)
- Including counter table [Counter tables](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_using/useCountersConcept.html)
- Rerun job from where the previous job had stopped for any reason (killed, had exceptions, etc.)
- Rerun/Resume a previous job that may have stopped for any reason (killed, had exceptions, etc.)
- If you rerun a `validation` job, it will include any token-ranges that had differences in the previous run
- Preserve [writetimes](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/cql_commands/cqlSelect.html#cqlSelect__retrieving-the-datetime-a-write-occurred-p) and [TTLs](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/cql_commands/cqlSelect.html#cqlSelect__ref-select-ttl-p)
- Supports migration/validation of advanced DataTypes ([Sets](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__set), [Lists](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__list), [Maps](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__map), [UDTs](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__udt))
Expand Down Expand Up @@ -184,7 +187,7 @@ Below recommendations may only be useful when migrating large tables where the d
1. Clone this repo
2. Move to the repo folder `cd cassandra-data-migrator`
3. Run the build `mvn clean package` (Needs Maven 3.9.x)
4. The fat jar (`cassandra-data-migrator-4.x.x.jar`) file should now be present in the `target` folder
4. The fat jar (`cassandra-data-migrator-5.x.x.jar`) file should now be present in the `target` folder

# Contributors
Checkout all our wonderful contributors [here](./CONTRIBUTING.md#contributors).
Expand Down
3 changes: 3 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# Release Notes
## [5.1.2] - 2024-11-26
- Bug fix: SCB file on some Spark worker nodes may get deleted before the connection is established, which may cause connection exception on that worker node. Added a static async SCB delete delay to address such issues.

## [5.1.1] - 2024-11-22
- Bug fix: Writetime filter does not work as expected when custom writetimestamp is also used (issue #327).
- Removed deprecated properties `printStatsAfter` and `printStatsPerPart`. Run metrics should now be tracked using the `trackRun` feature instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ public boolean shouldFilterRecord(Record record) {
}
if (originWriteTimeStamp < minWriteTimeStampFilter || originWriteTimeStamp > maxWriteTimeStampFilter) {
if (logger.isInfoEnabled())
logger.info("Timestamp filter removing record with primary key: {} with write timestamp: {}", record.getPk(),
originWriteTimeStamp);
logger.info("Timestamp filter removing record with primary key: {} with write timestamp: {}",
record.getPk(), originWriteTimeStamp);
return true;
}
}
Expand Down
32 changes: 24 additions & 8 deletions src/main/java/com/datastax/cdm/data/DataUtility.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

Expand All @@ -39,6 +42,7 @@ public class DataUtility {
public static final Logger logger = LoggerFactory.getLogger(DataUtility.class.getName());

protected static final String SCB_FILE_NAME = "_temp_cdm_scb_do_not_touch.zip";
protected static final int SCB_DELETE_DELAY = 5;

public static boolean diff(Object obj1, Object obj2) {
if (obj1 == null && obj2 == null) {
Expand Down Expand Up @@ -157,15 +161,27 @@ public static String getMyClassMethodLine(Exception e) {
return "Unknown";
}

public static void deleteGeneratedSCB(long runId, int waitSeconds) {
CompletableFuture.runAsync(() -> {
try {
File originFile = new File(PKFactory.Side.ORIGIN + "_" + Long.toString(runId) + SCB_FILE_NAME);
File targetFile = new File(PKFactory.Side.TARGET + "_" + Long.toString(runId) + SCB_FILE_NAME);

if (originFile.exists() || targetFile.exists()) {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(waitSeconds));
if (originFile.exists())
originFile.delete();
if (targetFile.exists())
targetFile.delete();
}
} catch (Exception e) {
logger.error("Unable to delete generated SCB files: {}", e.getMessage());
}
});
}

public static void deleteGeneratedSCB(long runId) {
File file = new File(PKFactory.Side.ORIGIN + "_" + Long.toString(runId) + SCB_FILE_NAME);
if (file.exists()) {
file.delete();
}
file = new File(PKFactory.Side.TARGET + "_" + Long.toString(runId) + SCB_FILE_NAME);
if (file.exists()) {
file.delete();
}
deleteGeneratedSCB(runId, SCB_DELETE_DELAY);
}

public static File generateSCB(String host, String port, String trustStorePassword, String trustStorePath,
Expand Down
10 changes: 8 additions & 2 deletions src/test/java/com/datastax/cdm/data/DataUtilityTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -171,7 +173,9 @@ public void generateSCBOrigin() throws IOException {
File file = new File(PKFactory.Side.ORIGIN + "_" + Long.toString(0) + DataUtility.SCB_FILE_NAME);
assertTrue(file.exists());

DataUtility.deleteGeneratedSCB(0);
DataUtility.deleteGeneratedSCB(0, 0);
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));

assertFalse(file.exists());
}

Expand All @@ -183,7 +187,9 @@ public void generateSCBTarget() throws IOException {
File file = new File(PKFactory.Side.TARGET + "_" + Long.toString(0) + DataUtility.SCB_FILE_NAME);
assertTrue(file.exists());

DataUtility.deleteGeneratedSCB(0);
DataUtility.deleteGeneratedSCB(0, 0);
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));

assertFalse(file.exists());
}
}

0 comments on commit ba1cd32

Please sign in to comment.