Skip to content

Commit

Permalink
Improved metrics output consistency and removed references to Thread …
Browse files Browse the repository at this point in the history
…and Global variables. (#329)
  • Loading branch information
pravinbhat authored Nov 18, 2024
1 parent e018ae2 commit 7944225
Show file tree
Hide file tree
Showing 9 changed files with 220 additions and 172 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Migrate and Validate Tables between Origin and Target Cassandra Clusters.
- **Java11** (minimum) as Spark binaries are compiled with it.
- **Spark `3.5.x` with Scala `2.13` and Hadoop `3.3`**
- Typically installed using [this binary](https://archive.apache.org/dist/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3-scala2.13.tgz) on a single VM (no cluster necessary) where you want to run this job. This simple setup is recommended for most one-time migrations.
- However we recommend a Spark Cluster or a Spark Serverless platform like `Databricks` or `Google Dataproc` (that supports the above mentioned versions) for large (e.g. several terabytes) complex migrations OR when CDM is used as a long-term data-transfer utility and not a one-time job.
- However we recommend using a Spark Cluster or a Spark Serverless platform like `Databricks` or `Google Dataproc` (that supports the above mentioned versions) for large (e.g. several terabytes) complex migrations OR when CDM is used as a long-term data-transfer utility and not a one-time job.

Spark can be installed by running the following: -

Expand Down Expand Up @@ -150,7 +150,7 @@ spark-submit --properties-file cdm.properties \
- Supports migration/validation from and to [Azure Cosmos Cassandra](https://learn.microsoft.com/en-us/azure/cosmos-db/cassandra)
- Validate migration accuracy and performance using a smaller randomized data-set
- Supports adding custom fixed `writetime` and/or `ttl`
- Track run information (start-time, end-time, status, etc.) in tables (`cdm_run_info` and `cdm_run_details`) on the target keyspace
- Track run information (start-time, end-time, run-metrics, status, etc.) in tables (`cdm_run_info` and `cdm_run_details`) on the target keyspace

# Things to know
- Each run (Migration or Validation) can be tracked (when enabled). You can find summary and details of the same in tables `cdm_run_info` and `cdm_run_details` in the target keyspace.
Expand All @@ -160,7 +160,7 @@ spark-submit --properties-file cdm.properties \
- If a table has only collection and/or UDT non-key columns, the `writetime` used on target will be time the job was run. If you want to avoid this, we recommend setting `spark.cdm.schema.ttlwritetime.calc.useCollections` param to `true` in such scenarios.
- When CDM migration (or validation with autocorrect) is run multiple times on the same table (for whatever reasons), it could lead to duplicate entries in `list` type columns. Note this is [due to a Cassandra/DSE bug](https://issues.apache.org/jira/browse/CASSANDRA-11368) and not a CDM issue. This issue can be addressed by enabling and setting a positive value for `spark.cdm.transform.custom.writetime.incrementBy` param. This param was specifically added to address this issue.
- When you rerun job to resume from a previous run, the run metrics (read, write, skipped, etc.) captured in table `cdm_run_info` will be only for the current run. If the previous run was killed for some reasons, its run metrics may not have been saved. If the previous run did complete (not killed) but with errors, then you will have all run metrics from previous run as well.
- When running on a Spark Cluster (and not a single VM), the rate-limit values (`spark.cdm.perfops.ratelimit.origin` & `spark.cdm.perfops.ratelimit.target`) applies to individual Spark worker nodes. Hence this value should be set to `effective-rate-limit-you-need`/`number-of-spark-worker-nodes` . E.g. If you need an effective rate-limit of 10000, and the number of Spark worker nodes are 4, then you should set the above rate-limit params to a value of 2500.
- When running on a Spark Cluster (and not a single VM), the rate-limit values (`spark.cdm.perfops.ratelimit.origin` & `spark.cdm.perfops.ratelimit.target`) applies to individual Spark worker nodes. Hence this value should be set to the effective-rate-limit-you-need/number-of-spark-worker-nodes . E.g. If you need an effective rate-limit of 10000, and the number of Spark worker nodes are 4, then you should set the above rate-limit params to a value of 2500.

# Performance recommendations
Below recommendations may only be useful when migrating large tables where the default performance is not good enough
Expand All @@ -175,7 +175,7 @@ Below recommendations may only be useful when migrating large tables where the d
- `ratelimit`: Default is `20000`, but this property should usually be updated (after updating other properties) to the highest possible value that your `origin` and `target` clusters can efficiently handle.
- Using schema manipulation features (like `constantColumns`, `explodeMap`, `extractJson`), transformation functions and/or where-filter-conditions (except partition min/max) may negatively impact performance
- We typically recommend [this infrastructure](https://docs.datastax.com/en/data-migration/deployment-infrastructure.html#_machines) for CDM VMs and [this starter conf](https://github.com/datastax/cassandra-data-migrator/blob/main/src/resources/cdm.properties). You can then optimize the job further based on CDM params info provided above and the observed load and throughput on `Origin` and `Target` clusters
- Use a Spark Cluster or a Spark Serverless platform like `Databricks` or `Google Dataproc` for large (e.g. several terabytes) complex migrations OR when CDM is used as a long-term data-transfer utility and not a one-time job.
- We recommend using a Spark Cluster or a Spark Serverless platform like `Databricks` or `Google Dataproc` for large (e.g. several terabytes) complex migrations OR when CDM is used as a long-term data-transfer utility and not a one-time job.

> [!NOTE]
> For additional performance tuning, refer to details mentioned in the [`cdm-detailed.properties` file here](./src/resources/cdm-detailed.properties)
Expand Down
4 changes: 4 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# Release Notes
## [5.1.0] - 2024-11-15
- Improves metrics output by producing stats labels in an intuitive and consistent order
- Refactored JobCounter by removing any references to `thread` or `global` as CDM operations are now isolated within partition-ranges (`parts`). Each such `part` is then parallelly processed and aggregated by Spark.

## [5.0.0] - 2024-11-08
- CDM refactored to be fully Spark Native and more performant when deployed on a multi-node Spark Cluster
- `trackRun` feature has been expanded to record `run-info` for each part in the `CDM_RUN_DETAILS` table. Along with granular metrics, this information can be used to troubleshoot any unbalanced problematic partitions.
Expand Down
37 changes: 19 additions & 18 deletions src/main/java/com/datastax/cdm/job/CopyJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,52 +82,53 @@ protected void processPartitionRange(PartitionRange range) {

for (Row originRow : resultSet) {
rateLimiterOrigin.acquire(1);
jobCounter.threadIncrement(JobCounter.CounterType.READ);
jobCounter.increment(JobCounter.CounterType.READ);

Record record = new Record(pkFactory.getTargetPK(originRow), originRow, null);
if (originSelectByPartitionRangeStatement.shouldFilterRecord(record)) {
jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED);
jobCounter.increment(JobCounter.CounterType.SKIPPED);
continue;
}

for (Record r : pkFactory.toValidRecordList(record)) {
BoundStatement boundUpsert = bind(r);
if (null == boundUpsert) {
jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED);
jobCounter.increment(JobCounter.CounterType.SKIPPED);
continue;
}

rateLimiterTarget.acquire(1);
batch = writeAsync(batch, writeResults, boundUpsert);
jobCounter.threadIncrement(JobCounter.CounterType.UNFLUSHED);
jobCounter.increment(JobCounter.CounterType.UNFLUSHED);

if (jobCounter.getCount(JobCounter.CounterType.UNFLUSHED) > fetchSize) {
flushAndClearWrites(batch, writeResults);
jobCounter.threadIncrement(JobCounter.CounterType.WRITE,
jobCounter.getCount(JobCounter.CounterType.UNFLUSHED));
jobCounter.threadReset(JobCounter.CounterType.UNFLUSHED);
jobCounter.increment(JobCounter.CounterType.WRITE,
jobCounter.getCount(JobCounter.CounterType.UNFLUSHED, true));
jobCounter.reset(JobCounter.CounterType.UNFLUSHED);
}
}
}

flushAndClearWrites(batch, writeResults);
jobCounter.threadIncrement(JobCounter.CounterType.WRITE,
jobCounter.getCount(JobCounter.CounterType.UNFLUSHED));
jobCounter.threadReset(JobCounter.CounterType.UNFLUSHED);
jobCounter.globalIncrement();
jobCounter.increment(JobCounter.CounterType.WRITE,
jobCounter.getCount(JobCounter.CounterType.UNFLUSHED, true));
jobCounter.reset(JobCounter.CounterType.UNFLUSHED);
jobCounter.flush();
if (null != trackRunFeature) {
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.PASS, jobCounter.getThreadCounters(true));
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.PASS, jobCounter.getMetrics());
}
} catch (Exception e) {
jobCounter.threadIncrement(JobCounter.CounterType.ERROR,
jobCounter.getCount(JobCounter.CounterType.READ) - jobCounter.getCount(JobCounter.CounterType.WRITE)
- jobCounter.getCount(JobCounter.CounterType.SKIPPED));
jobCounter.increment(JobCounter.CounterType.ERROR,
jobCounter.getCount(JobCounter.CounterType.READ, true)
- jobCounter.getCount(JobCounter.CounterType.WRITE, true)
- jobCounter.getCount(JobCounter.CounterType.SKIPPED, true));
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {}",
Thread.currentThread().getId(), min, max, e);
logger.error("Error stats " + jobCounter.getThreadCounters(false));
jobCounter.globalIncrement();
logger.error("Error stats " + jobCounter.getMetrics(true));
jobCounter.flush();
if (null != trackRunFeature) {
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.FAIL, jobCounter.getThreadCounters(true));
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.FAIL, jobCounter.getMetrics());
}
}
}
Expand Down
29 changes: 15 additions & 14 deletions src/main/java/com/datastax/cdm/job/CounterUnit.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,31 @@
public class CounterUnit implements Serializable {

private static final long serialVersionUID = 2194336948011681878L;
private long globalCounter = 0;
private long threadLocalCounter = 0;
private long count = 0;
private long interimCount = 0;

public void incrementThreadCounter(long incrementBy) {
threadLocalCounter += incrementBy;
public void increment(long incrementBy) {
interimCount += incrementBy;
}

public long getThreadCounter() {
return threadLocalCounter;
public long getInterimCount() {
return interimCount;
}

public void resetThreadCounter() {
threadLocalCounter = 0;
public void reset() {
interimCount = 0;
}

public void setGlobalCounter(long value) {
globalCounter = value;
public void setCount(long value) {
count = value;
}

public void addThreadToGlobalCounter() {
globalCounter += threadLocalCounter;
public void addToCount() {
count += interimCount;
reset();
}

public long getGlobalCounter() {
return globalCounter;
public long getCount() {
return count;
}
}
39 changes: 19 additions & 20 deletions src/main/java/com/datastax/cdm/job/DiffJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,18 +135,18 @@ protected void processPartitionRange(PartitionRange range) {
StreamSupport.stream(resultSet.spliterator(), false).forEach(originRow -> {
rateLimiterOrigin.acquire(1);
Record record = new Record(pkFactory.getTargetPK(originRow), originRow, null);
jobCounter.threadIncrement(JobCounter.CounterType.READ);
jobCounter.increment(JobCounter.CounterType.READ);

if (originSelectByPartitionRangeStatement.shouldFilterRecord(record)) {
jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED);
jobCounter.increment(JobCounter.CounterType.SKIPPED);
} else {
for (Record r : pkFactory.toValidRecordList(record)) {
rateLimiterTarget.acquire(1);
CompletionStage<AsyncResultSet> targetResult = targetSelectByPKStatement
.getAsyncResult(r.getPk());

if (null == targetResult) {
jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED);
jobCounter.increment(JobCounter.CounterType.SKIPPED);
} else {
r.setAsyncTargetRow(targetResult);
recordsToDiff.add(r);
Expand All @@ -168,32 +168,31 @@ protected void processPartitionRange(PartitionRange range) {
.getCount(JobCounter.CounterType.CORRECTED_MISSING)
&& jobCounter.getCount(JobCounter.CounterType.MISMATCH) == jobCounter
.getCount(JobCounter.CounterType.CORRECTED_MISMATCH)) {
jobCounter.globalIncrement();
jobCounter.flush();
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.DIFF_CORRECTED,
jobCounter.getThreadCounters(true));
jobCounter.getMetrics());
} else {
jobCounter.globalIncrement();
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.DIFF,
jobCounter.getThreadCounters(true));
jobCounter.flush();
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.DIFF, jobCounter.getMetrics());
}
} else if (null != trackRunFeature) {
jobCounter.globalIncrement();
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.PASS, jobCounter.getThreadCounters(true));
jobCounter.flush();
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.PASS, jobCounter.getMetrics());
} else {
jobCounter.globalIncrement();
jobCounter.flush();
}
} catch (Exception e) {
jobCounter.threadIncrement(JobCounter.CounterType.ERROR,
jobCounter.increment(JobCounter.CounterType.ERROR,
jobCounter.getCount(JobCounter.CounterType.READ) - jobCounter.getCount(JobCounter.CounterType.VALID)
- jobCounter.getCount(JobCounter.CounterType.MISSING)
- jobCounter.getCount(JobCounter.CounterType.MISMATCH)
- jobCounter.getCount(JobCounter.CounterType.SKIPPED));
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {}",
Thread.currentThread().getId(), min, max, e);
logger.error("Error stats " + jobCounter.getThreadCounters(false));
jobCounter.globalIncrement();
logger.error("Error stats " + jobCounter.getMetrics(true));
jobCounter.flush();
if (null != trackRunFeature)
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.FAIL, jobCounter.getThreadCounters(true));
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.FAIL, jobCounter.getMetrics());
}
}

Expand All @@ -205,7 +204,7 @@ private boolean diffAndClear(List<Record> recordsToDiff, JobCounter jobCounter)

private boolean diff(Record record, JobCounter jobCounter) {
if (record.getTargetRow() == null) {
jobCounter.threadIncrement(JobCounter.CounterType.MISSING);
jobCounter.increment(JobCounter.CounterType.MISSING);
logger.error("Missing target row found for key: {}", record.getPk());
if (autoCorrectMissing && isCounterTable && !forceCounterWhenMissing) {
logger.error("{} is true, but not Inserting as {} is not enabled; key : {}",
Expand All @@ -218,27 +217,27 @@ private boolean diff(Record record, JobCounter jobCounter) {
if (autoCorrectMissing) {
rateLimiterTarget.acquire(1);
targetSession.getTargetUpsertStatement().putRecord(record);
jobCounter.threadIncrement(JobCounter.CounterType.CORRECTED_MISSING);
jobCounter.increment(JobCounter.CounterType.CORRECTED_MISSING);
logger.error("Inserted missing row in target: {}", record.getPk());
}
return true;
}

String diffData = isDifferent(record);
if (!diffData.isEmpty()) {
jobCounter.threadIncrement(JobCounter.CounterType.MISMATCH);
jobCounter.increment(JobCounter.CounterType.MISMATCH);
logger.error("Mismatch row found for key: {} Mismatch: {}", record.getPk(), diffData);

if (autoCorrectMismatch) {
rateLimiterTarget.acquire(1);
targetSession.getTargetUpsertStatement().putRecord(record);
jobCounter.threadIncrement(JobCounter.CounterType.CORRECTED_MISMATCH);
jobCounter.increment(JobCounter.CounterType.CORRECTED_MISMATCH);
logger.error("Corrected mismatch row in target: {}", record.getPk());
}

return true;
} else {
jobCounter.threadIncrement(JobCounter.CounterType.VALID);
jobCounter.increment(JobCounter.CounterType.VALID);
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,22 @@ protected void processPartitionRange(PartitionRange range) {
String checkString;
for (Row originRow : resultSet) {
rateLimiterOrigin.acquire(1);
jobCounter.threadIncrement(JobCounter.CounterType.READ);
jobCounter.increment(JobCounter.CounterType.READ);

checkString = guardrailFeature.guardrailChecks(originRow);
if (checkString != null && !checkString.isEmpty()) {
jobCounter.threadIncrement(JobCounter.CounterType.LARGE);
jobCounter.increment(JobCounter.CounterType.LARGE);
logger.error("Guardrails failed for row {}", checkString);
} else {
jobCounter.threadIncrement(JobCounter.CounterType.VALID);
jobCounter.increment(JobCounter.CounterType.VALID);
}
}
} catch (Exception e) {
logger.error("Error occurred ", e);
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {}",
Thread.currentThread().getId(), min, max);
} finally {
jobCounter.globalIncrement();
jobCounter.flush();
}

ThreadContext.remove(THREAD_CONTEXT_LABEL);
Expand Down
Loading

0 comments on commit 7944225

Please sign in to comment.