diff --git a/SIT/Makefile b/SIT/Makefile index 8c7bb655..c821c993 100644 --- a/SIT/Makefile +++ b/SIT/Makefile @@ -21,6 +21,8 @@ test_features: reset test_features_cmd test_features_cmd: ./test.sh -p features +test: test_smoke test_regression_cmd test_features_cmd + # Local tests are not included in automated tests, but provide a means # to use the test harness to validate project-specific work test_local: reset test_local_cmd diff --git a/SIT/cdm-assert.sh b/SIT/cdm-assert.sh index 998b9792..8d79b873 100644 --- a/SIT/cdm-assert.sh +++ b/SIT/cdm-assert.sh @@ -1,6 +1,6 @@ #!/bin/bash -assertCmd="egrep 'JobSession.* Final ' \${OUTPUT_FILE} | sed 's/^.*Final //'" +assertCmd="egrep 'JobCounter.* Final ' \${OUTPUT_FILE} | sed 's/^.*Final //'" _usage() { cat <LINE MISSEDCOUNT - 3052 + 3073 diff --git a/src/main/java/com/datastax/cdm/job/AbstractJobSession.java b/src/main/java/com/datastax/cdm/job/AbstractJobSession.java index 0894141e..4830b68d 100644 --- a/src/main/java/com/datastax/cdm/job/AbstractJobSession.java +++ b/src/main/java/com/datastax/cdm/job/AbstractJobSession.java @@ -20,6 +20,8 @@ public abstract class AbstractJobSession extends BaseJobSession { protected Guardrail guardrailFeature; protected boolean guardrailEnabled; protected String partitionFile = SplitPartitions.getPartitionFile(propertyHelper); + protected JobCounter jobCounter; + protected Long printStatsAfter; protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) { this(originSession, targetSession, sc, false); @@ -32,12 +34,13 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, return; } - printStatsAfter = propertyHelper.getInteger(KnownProperties.PRINT_STATS_AFTER); - if (!propertyHelper.meetsMinimum(KnownProperties.PRINT_STATS_AFTER, printStatsAfter, 1)) { + this.printStatsAfter = propertyHelper.getLong(KnownProperties.PRINT_STATS_AFTER); + if (!propertyHelper.meetsMinimum(KnownProperties.PRINT_STATS_AFTER, printStatsAfter, 1L)) { logger.warn(KnownProperties.PRINT_STATS_AFTER + " must be greater than 0. Setting to default value of " + KnownProperties.getDefaultAsString(KnownProperties.PRINT_STATS_AFTER)); propertyHelper.setProperty(KnownProperties.PRINT_STATS_AFTER, KnownProperties.getDefault(KnownProperties.PRINT_STATS_AFTER)); - printStatsAfter = propertyHelper.getInteger(KnownProperties.PRINT_STATS_AFTER); + printStatsAfter = propertyHelper.getLong(KnownProperties.PRINT_STATS_AFTER); } + this.jobCounter = new JobCounter(printStatsAfter, propertyHelper.getBoolean(KnownProperties.PRINT_STATS_PER_PART)); rateLimiterOrigin = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_ORIGIN)); rateLimiterTarget = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_TARGET)); @@ -77,5 +80,11 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, public abstract void processSlice(T slice); - public abstract void printCounts(boolean isFinal); + public synchronized void printCounts(boolean isFinal) { + if (isFinal) { + jobCounter.printFinal(); + } else { + jobCounter.printProgress(); + } + } } diff --git a/src/main/java/com/datastax/cdm/job/BaseJobSession.java b/src/main/java/com/datastax/cdm/job/BaseJobSession.java index cbcaf449..3853821a 100644 --- a/src/main/java/com/datastax/cdm/job/BaseJobSession.java +++ b/src/main/java/com/datastax/cdm/job/BaseJobSession.java @@ -32,7 +32,6 @@ public abstract class BaseJobSession { protected RateLimiter rateLimiterOrigin; protected RateLimiter rateLimiterTarget; protected Integer maxRetries = 10; - protected Integer printStatsAfter = 100000; protected BaseJobSession(SparkConf sc) { propertyHelper.initializeSparkConf(sc); diff --git a/src/main/java/com/datastax/cdm/job/CopyJobSession.java b/src/main/java/com/datastax/cdm/job/CopyJobSession.java index 3891b264..f60c6b16 100644 --- a/src/main/java/com/datastax/cdm/job/CopyJobSession.java +++ b/src/main/java/com/datastax/cdm/job/CopyJobSession.java @@ -17,25 +17,20 @@ import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.CompletionStage; -import java.util.concurrent.atomic.AtomicLong; public class CopyJobSession extends AbstractJobSession { - private static CopyJobSession copyJobSession; private final PKFactory pkFactory; private final boolean isCounterTable; private final Integer fetchSize; private final Integer batchSize; public Logger logger = LoggerFactory.getLogger(this.getClass().getName()); - protected AtomicLong readCounter = new AtomicLong(0); - protected AtomicLong skippedCounter = new AtomicLong(0); - protected AtomicLong writeCounter = new AtomicLong(0); - protected AtomicLong errorCounter = new AtomicLong(0); private TargetUpsertStatement targetUpsertStatement; private TargetSelectByPKStatement targetSelectByPKStatement; protected CopyJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) { super(originSession, targetSession, sc); + this.jobCounter.setRegisteredTypes(JobCounter.CounterType.READ, JobCounter.CounterType.WRITE, JobCounter.CounterType.SKIPPED, JobCounter.CounterType.ERROR, JobCounter.CounterType.UNFLUSHED); pkFactory = this.originSession.getPKFactory(); isCounterTable = this.originSession.getCqlTable().isCounterTable(); @@ -60,11 +55,8 @@ public void getDataAndInsert(BigInteger min, BigInteger max) { int maxAttempts = maxRetries + 1; String guardrailCheck; for (int attempts = 1; attempts <= maxAttempts && !done; attempts++) { - long readCnt = 0; - long flushedWriteCnt = 0; - long skipCnt = 0; - long errCnt = 0; - long unflushedWrites = 0; + jobCounter.threadReset(); + try { OriginSelectByPartitionRangeStatement originSelectByPartitionRangeStatement = this.originSession.getOriginSelectByPartitionRangeStatement(); targetUpsertStatement = this.targetSession.getTargetUpsertStatement(); @@ -74,14 +66,11 @@ public void getDataAndInsert(BigInteger min, BigInteger max) { for (Row originRow : resultSet) { rateLimiterOrigin.acquire(1); - readCnt++; - if (readCnt % printStatsAfter == 0) { - printCounts(false); - } + jobCounter.threadIncrement(JobCounter.CounterType.READ); Record record = new Record(pkFactory.getTargetPK(originRow), originRow, null); if (originSelectByPartitionRangeStatement.shouldFilterRecord(record)) { - skipCnt++; + jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED); continue; } @@ -90,66 +79,48 @@ public void getDataAndInsert(BigInteger min, BigInteger max) { guardrailCheck = guardrailFeature.guardrailChecks(r); if (guardrailCheck != null && guardrailCheck != Guardrail.CLEAN_CHECK) { logger.error("Guardrails failed for PrimaryKey {}; {}", r.getPk(), guardrailCheck); - skipCnt++; + jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED); continue; } } BoundStatement boundUpsert = bind(r); if (null == boundUpsert) { - skipCnt++; // TODO: this previously skipped, why not errCnt? + jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED); // TODO: this previously skipped, why not errCnt? continue; } rateLimiterTarget.acquire(1); batch = writeAsync(batch, writeResults, boundUpsert); - unflushedWrites++; + jobCounter.threadIncrement(JobCounter.CounterType.UNFLUSHED); - if (unflushedWrites > fetchSize) { + if (jobCounter.getCount(JobCounter.CounterType.UNFLUSHED) > fetchSize) { flushAndClearWrites(batch, writeResults); - flushedWriteCnt += unflushedWrites; - unflushedWrites = 0; + jobCounter.threadIncrement(JobCounter.CounterType.WRITE, jobCounter.getCount(JobCounter.CounterType.UNFLUSHED)); + jobCounter.threadReset(JobCounter.CounterType.UNFLUSHED); } } } flushAndClearWrites(batch, writeResults); - flushedWriteCnt += unflushedWrites; - - readCounter.addAndGet(readCnt); - writeCounter.addAndGet(flushedWriteCnt); - skippedCounter.addAndGet(skipCnt); + jobCounter.threadIncrement(JobCounter.CounterType.WRITE, jobCounter.getCount(JobCounter.CounterType.UNFLUSHED)); + jobCounter.threadReset(JobCounter.CounterType.UNFLUSHED); done = true; } catch (Exception e) { if (attempts == maxAttempts) { - readCounter.addAndGet(readCnt); - writeCounter.addAndGet(flushedWriteCnt); - skippedCounter.addAndGet(skipCnt); - errorCounter.addAndGet(readCnt - flushedWriteCnt - skipCnt); + jobCounter.threadIncrement(JobCounter.CounterType.ERROR, jobCounter.getCount(JobCounter.CounterType.READ) - jobCounter.getCount(JobCounter.CounterType.WRITE) - jobCounter.getCount(JobCounter.CounterType.SKIPPED)); logFailedPartitionsInFile(partitionFile, min, max); } logger.error("Error occurred during Attempt#: {}", attempts, e); logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {} -- Attempt# {}", Thread.currentThread().getId(), min, max, attempts); - logger.error("Error stats Read#: {}, Wrote#: {}, Skipped#: {}, Error#: {}", readCnt, flushedWriteCnt, skipCnt, (readCnt - flushedWriteCnt - skipCnt)); + logger.error("Error stats " + jobCounter.getThreadCounters(false)); + } + finally { + jobCounter.globalIncrement(); + printCounts(false); } - } - } - - @Override - public synchronized void printCounts(boolean isFinal) { - String msg = "ThreadID: " + Thread.currentThread().getId(); - if (isFinal) { - msg += " Final"; - logger.info("################################################################################################"); - } - logger.info("{} Read Record Count: {}", msg, readCounter.get()); - logger.info("{} Skipped Record Count: {}", msg, skippedCounter.get()); - logger.info("{} Write Record Count: {}", msg, writeCounter.get()); - logger.info("{} Error Record Count: {}", msg, errorCounter.get()); - if (isFinal) { - logger.info("################################################################################################"); } } diff --git a/src/main/java/com/datastax/cdm/job/CopyPKJobSession.java b/src/main/java/com/datastax/cdm/job/CopyPKJobSession.java index 0b486e1f..eff0c107 100644 --- a/src/main/java/com/datastax/cdm/job/CopyPKJobSession.java +++ b/src/main/java/com/datastax/cdm/job/CopyPKJobSession.java @@ -15,23 +15,18 @@ import java.beans.PropertyEditorManager; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicLong; public class CopyPKJobSession extends AbstractJobSession { - private static CopyPKJobSession copyJobSession; private final PKFactory pkFactory; private final List originPKClasses; private final boolean isCounterTable; public Logger logger = LoggerFactory.getLogger(this.getClass().getName()); - protected AtomicLong readCounter = new AtomicLong(0); - protected AtomicLong missingCounter = new AtomicLong(0); - protected AtomicLong skipCounter = new AtomicLong(0); - protected AtomicLong writeCounter = new AtomicLong(0); private OriginSelectByPKStatement originSelectByPKStatement; protected CopyPKJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) { super(originSession, targetSession, sc, true); + this.jobCounter.setRegisteredTypes(JobCounter.CounterType.READ, JobCounter.CounterType.WRITE, JobCounter.CounterType.SKIPPED, JobCounter.CounterType.MISSING); pkFactory = this.originSession.getPKFactory(); isCounterTable = this.originSession.getCqlTable().isCounterTable(); originPKClasses = this.originSession.getCqlTable().getPKClasses(); @@ -47,10 +42,10 @@ public void processSlice(SplitPartitions.PKRows slice) { public void getRowAndInsert(SplitPartitions.PKRows rowsList) { originSelectByPKStatement = originSession.getOriginSelectByPKStatement(); for (String row : rowsList.getPkRows()) { - readCounter.incrementAndGet(); + jobCounter.threadIncrement(JobCounter.CounterType.READ); EnhancedPK pk = toEnhancedPK(row); if (null == pk || pk.isError()) { - missingCounter.incrementAndGet(); + jobCounter.threadIncrement(JobCounter.CounterType.MISSING); logger.error("Could not build PK object with value <{}>; error is: {}", row, (null == pk ? "null" : pk.getMessages())); return; } @@ -58,7 +53,7 @@ public void getRowAndInsert(SplitPartitions.PKRows rowsList) { rateLimiterOrigin.acquire(1); Record recordFromOrigin = originSelectByPKStatement.getRecord(pk); if (null == recordFromOrigin) { - missingCounter.incrementAndGet(); + jobCounter.threadIncrement(JobCounter.CounterType.MISSING); logger.error("Could not find origin row with primary-key: {}", row); return; } @@ -66,7 +61,7 @@ public void getRowAndInsert(SplitPartitions.PKRows rowsList) { Record record = new Record(pkFactory.getTargetPK(originRow), originRow, null); if (originSelectByPKStatement.shouldFilterRecord(record)) { - skipCounter.incrementAndGet(); + jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED); return; } @@ -74,37 +69,22 @@ public void getRowAndInsert(SplitPartitions.PKRows rowsList) { String guardrailCheck = guardrailFeature.guardrailChecks(record); if (guardrailCheck != null && guardrailCheck != Guardrail.CLEAN_CHECK) { logger.error("Guardrails failed for PrimaryKey {}; {}", record.getPk(), guardrailCheck); - skipCounter.incrementAndGet(); + jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED); return; } } rateLimiterTarget.acquire(1); targetSession.getTargetUpsertStatement().putRecord(record); - writeCounter.incrementAndGet(); + jobCounter.threadIncrement(JobCounter.CounterType.WRITE); - if (readCounter.get() % printStatsAfter == 0) { - printCounts(false); - } + jobCounter.globalIncrement(); + printCounts(false); } printCounts(true); } - @Override - public void printCounts(boolean isFinal) { - if (isFinal) { - logger.info("################################################################################################"); - } - logger.info("ThreadID: {} Read Record Count: {}", Thread.currentThread().getId(), readCounter.get()); - logger.info("ThreadID: {} Missing Record Count: {}", Thread.currentThread().getId(), missingCounter.get()); - logger.info("ThreadID: {} Skipped Record Count: {}", Thread.currentThread().getId(), skipCounter.get()); - logger.info("ThreadID: {} Inserted Record Count: {}", Thread.currentThread().getId(), writeCounter.get()); - if (isFinal) { - logger.info("################################################################################################"); - } - } - private EnhancedPK toEnhancedPK(String rowString) { String[] pkFields = rowString.split(" %% "); List values = new ArrayList<>(originPKClasses.size()); diff --git a/src/main/java/com/datastax/cdm/job/DiffJobSession.java b/src/main/java/com/datastax/cdm/job/DiffJobSession.java index 3ea2dfd1..e0fb1f3a 100644 --- a/src/main/java/com/datastax/cdm/job/DiffJobSession.java +++ b/src/main/java/com/datastax/cdm/job/DiffJobSession.java @@ -27,22 +27,13 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CompletionStage; -import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.StreamSupport; public class DiffJobSession extends CopyJobSession { - private static DiffJobSession diffJobSession; protected final Boolean autoCorrectMissing; protected final Boolean autoCorrectMismatch; - private final AtomicLong readCounter = new AtomicLong(0); - private final AtomicLong mismatchCounter = new AtomicLong(0); - private final AtomicLong missingCounter = new AtomicLong(0); - private final AtomicLong correctedMissingCounter = new AtomicLong(0); - private final AtomicLong correctedMismatchCounter = new AtomicLong(0); - private final AtomicLong validCounter = new AtomicLong(0); - private final AtomicLong skippedCounter = new AtomicLong(0); private final boolean isCounterTable; private final boolean forceCounterWhenMissing; private final List targetColumnNames; @@ -57,6 +48,7 @@ public class DiffJobSession extends CopyJobSession { public DiffJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) { super(originSession, targetSession, sc); + this.jobCounter.setRegisteredTypes(JobCounter.CounterType.READ, JobCounter.CounterType.VALID, JobCounter.CounterType.MISMATCH, JobCounter.CounterType.CORRECTED_MISMATCH, JobCounter.CounterType.MISSING, JobCounter.CounterType.CORRECTED_MISSING, JobCounter.CounterType.SKIPPED); autoCorrectMissing = propertyHelper.getBoolean(KnownProperties.AUTOCORRECT_MISSING); logger.info("PARAM -- Autocorrect Missing: {}", autoCorrectMissing); @@ -108,6 +100,8 @@ public void getDataAndDiff(BigInteger min, BigInteger max) { int maxAttempts = maxRetries + 1; for (int attempts = 1; attempts <= maxAttempts && !done; attempts++) { try { + jobCounter.threadReset(); + PKFactory pkFactory = originSession.getPKFactory(); OriginSelectByPartitionRangeStatement originSelectByPartitionRangeStatement = originSession.getOriginSelectByPartitionRangeStatement(); ResultSet resultSet = originSelectByPartitionRangeStatement.execute(originSelectByPartitionRangeStatement.bind(min, max)); @@ -118,21 +112,18 @@ public void getDataAndDiff(BigInteger min, BigInteger max) { StreamSupport.stream(resultSet.spliterator(), false).forEach(originRow -> { rateLimiterOrigin.acquire(1); Record record = new Record(pkFactory.getTargetPK(originRow), originRow, null); + jobCounter.threadIncrement(JobCounter.CounterType.READ); if (originSelectByPartitionRangeStatement.shouldFilterRecord(record)) { - readCounter.incrementAndGet(); - skippedCounter.incrementAndGet(); + jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED); } else { - if (readCounter.incrementAndGet() % printStatsAfter == 0) { - printCounts(false); - } for (Record r : pkFactory.toValidRecordList(record)) { if (guardrailEnabled) { String guardrailCheck = guardrailFeature.guardrailChecks(r); if (guardrailCheck != null && guardrailCheck != Guardrail.CLEAN_CHECK) { logger.error("Guardrails failed for PrimaryKey {}; {}", r.getPk(), guardrailCheck); - skippedCounter.incrementAndGet(); + jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED); continue; } } @@ -141,7 +132,7 @@ public void getDataAndDiff(BigInteger min, BigInteger max) { CompletionStage targetResult = targetSelectByPKStatement.getAsyncResult(r.getPk()); if (null == targetResult) { - skippedCounter.incrementAndGet(); + jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED); } else { r.setAsyncTargetRow(targetResult); recordsToDiff.add(r); @@ -160,6 +151,9 @@ public void getDataAndDiff(BigInteger min, BigInteger max) { if (attempts == maxAttempts) { logFailedPartitionsInFile(partitionFile, min, max); } + } finally { + jobCounter.globalIncrement(); + printCounts(false); } } } @@ -175,32 +169,13 @@ private void diffAndClear(List recordsToDiff) { recordsToDiff.clear(); } - @Override - public synchronized void printCounts(boolean isFinal) { - String msg = "ThreadID: " + Thread.currentThread().getId(); - if (isFinal) { - msg += " Final"; - logger.info("################################################################################################"); - } - logger.info("{} Read Record Count: {}", msg, readCounter.get()); - logger.info("{} Mismatch Record Count: {}", msg, mismatchCounter.get()); - logger.info("{} Corrected Mismatch Record Count: {}", msg, correctedMismatchCounter.get()); - logger.info("{} Missing Record Count: {}", msg, missingCounter.get()); - logger.info("{} Corrected Missing Record Count: {}", msg, correctedMissingCounter.get()); - logger.info("{} Valid Record Count: {}", msg, validCounter.get()); - logger.info("{} Skipped Record Count: {}", msg, skippedCounter.get()); - if (isFinal) { - logger.info("################################################################################################"); - } - } - private void diff(Record record) { EnhancedPK originPK = record.getPk(); Row originRow = record.getOriginRow(); Row targetRow = record.getTargetRow(); if (targetRow == null) { - missingCounter.incrementAndGet(); + jobCounter.threadIncrement(JobCounter.CounterType.MISSING); logger.error("Missing target row found for key: {}", record.getPk()); if (autoCorrectMissing && isCounterTable && !forceCounterWhenMissing) { logger.error("{} is true, but not Inserting as {} is not enabled; key : {}", KnownProperties.AUTOCORRECT_MISSING, KnownProperties.AUTOCORRECT_MISSING_COUNTER, record.getPk()); @@ -211,7 +186,7 @@ private void diff(Record record) { if (autoCorrectMissing) { rateLimiterTarget.acquire(1); targetSession.getTargetUpsertStatement().putRecord(record); - correctedMissingCounter.incrementAndGet(); + jobCounter.threadIncrement(JobCounter.CounterType.CORRECTED_MISSING); logger.error("Inserted missing row in target: {}", record.getPk()); } return; @@ -219,17 +194,17 @@ private void diff(Record record) { String diffData = isDifferent(originPK, originRow, targetRow); if (!diffData.isEmpty()) { - mismatchCounter.incrementAndGet(); + jobCounter.threadIncrement(JobCounter.CounterType.MISMATCH); logger.error("Mismatch row found for key: {} Mismatch: {}", record.getPk(), diffData); if (autoCorrectMismatch) { rateLimiterTarget.acquire(1); targetSession.getTargetUpsertStatement().putRecord(record); - correctedMismatchCounter.incrementAndGet(); + jobCounter.threadIncrement(JobCounter.CounterType.CORRECTED_MISMATCH); logger.error("Corrected mismatch row in target: {}", record.getPk()); } } else { - validCounter.incrementAndGet(); + jobCounter.threadIncrement(JobCounter.CounterType.VALID); } } diff --git a/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java b/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java index e1c55f8c..665ef149 100644 --- a/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java +++ b/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java @@ -11,21 +11,17 @@ import org.slf4j.LoggerFactory; import java.math.BigInteger; -import java.util.concurrent.atomic.AtomicLong; public class GuardrailCheckJobSession extends AbstractJobSession { private static GuardrailCheckJobSession guardrailJobSession; public Logger logger = LoggerFactory.getLogger(this.getClass().getName()); - protected AtomicLong readCounter = new AtomicLong(0); - protected AtomicLong validCounter = new AtomicLong(0); - protected AtomicLong skippedCounter = new AtomicLong(0); - protected AtomicLong largeRowCounter = new AtomicLong(0); private final PKFactory pkFactory; protected GuardrailCheckJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) { super(originSession, targetSession, sc); + this.jobCounter.setRegisteredTypes(JobCounter.CounterType.READ, JobCounter.CounterType.VALID, JobCounter.CounterType.SKIPPED, JobCounter.CounterType.LARGE); pkFactory = this.originSession.getPKFactory(); @@ -51,26 +47,22 @@ public void guardrailCheck(BigInteger min, BigInteger max) { String checkString; for (Row originRow : resultSet) { rateLimiterOrigin.acquire(1); - readCounter.addAndGet(1); - - if (readCounter.get() % printStatsAfter == 0) { - printCounts(false); - } + jobCounter.threadIncrement(JobCounter.CounterType.READ); Record record = new Record(pkFactory.getTargetPK(originRow), originRow, null); if (originSelectByPartitionRangeStatement.shouldFilterRecord(record)) { - skippedCounter.addAndGet(1); + jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED); continue; } for (Record r : pkFactory.toValidRecordList(record)) { checkString = guardrailFeature.guardrailChecks(r); if (checkString != null && !checkString.isEmpty()) { - largeRowCounter.addAndGet(1); + jobCounter.threadIncrement(JobCounter.CounterType.LARGE); logger.error("Guardrails failed for PrimaryKey {}; {}", r.getPk(), checkString); } else { - validCounter.addAndGet(1); + jobCounter.threadIncrement(JobCounter.CounterType.VALID); } } } @@ -78,23 +70,11 @@ public void guardrailCheck(BigInteger min, BigInteger max) { logger.error("Error occurred ", e); logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max); + } finally { + jobCounter.globalIncrement(); + printCounts(false); } ThreadContext.remove(THREAD_CONTEXT_LABEL); } - - @Override - public synchronized void printCounts(boolean isFinal) { - String msg = "ThreadID: " + Thread.currentThread().getId(); - if (isFinal) { - msg += " Final"; - logger.info("################################################################################################"); - } - logger.info("{} Read Record Count: {}", msg, readCounter.get()); - logger.info("{} Valid Record Count: {}", msg, validCounter.get()); - logger.info("{} Large Record Count: {}", msg, largeRowCounter.get()); - if (isFinal) { - logger.info("################################################################################################"); - } - } } diff --git a/src/main/java/com/datastax/cdm/job/JobCounter.java b/src/main/java/com/datastax/cdm/job/JobCounter.java new file mode 100644 index 00000000..92e29bb9 --- /dev/null +++ b/src/main/java/com/datastax/cdm/job/JobCounter.java @@ -0,0 +1,178 @@ +package com.datastax.cdm.job; + +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JobCounter { + + // Enumeration for counter types + public enum CounterType { + READ, WRITE, VALID, ERROR, MISMATCH, MISSING, CORRECTED_MISSING, CORRECTED_MISMATCH, SKIPPED, UNFLUSHED, LARGE + } + + // Logger instance + private final Logger logger = LoggerFactory.getLogger(this.getClass().getName()); + + // Internal class to handle atomic counting operations + private static class CounterUnit { + private final AtomicLong globalCounter = new AtomicLong(0); + private final ThreadLocal threadLocalCounter = ThreadLocal.withInitial(() -> 0L); + + public void incrementThreadCounter(long incrementBy) { + threadLocalCounter.set(threadLocalCounter.get() + incrementBy); + } + + public long getThreadCounter() { + return threadLocalCounter.get(); + } + + public void resetThreadCounter() { + threadLocalCounter.set(0L); + } + + public void setGlobalCounter(long value) { + globalCounter.set(value); + } + + public void addThreadToGlobalCounter() { + globalCounter.addAndGet(threadLocalCounter.get()); + } + + public long getGlobalCounter() { + return globalCounter.get(); + } + } + + // Declare individual counters for different operations + private final HashMap counterMap = new HashMap<>(); + + // Variables to hold lock objects and registered types + private final Object globalLock = new Object(); + private final boolean printPerThread; + private final long printStatsAfter; + private final CounterUnit printCounter = new CounterUnit(); + + // Constructor + public JobCounter(long printStatsAfter, boolean printStatsPerPart) { + this.printStatsAfter = printStatsAfter; + this.printPerThread = printStatsPerPart; + } + + // Allows setting the registered counter types. + public void setRegisteredTypes(CounterType... registeredTypes) { + counterMap.clear(); + for (CounterType type : registeredTypes) { + counterMap.put(type, new CounterUnit()); + } + } + + // Utility method to fetch the appropriate counter unit based on type + private CounterUnit getCounterUnit(CounterType counterType) { + if (!counterMap.containsKey(counterType)) { + throw new IllegalArgumentException("CounterType " + counterType + " is not registered"); + } + return (counterMap.get(counterType)); + } + + // Method to get a counter's value + public long getCount(CounterType counterType, boolean global) { + return global ? getCounterUnit(counterType).getGlobalCounter() : getCounterUnit(counterType).getThreadCounter(); + } + + // Method to get a thread counter's value + public long getCount(CounterType counterType) { + return getCount(counterType, false); + } + + // Method to reset thread-specific counters for given type + public void threadReset(CounterType counterType) { + getCounterUnit(counterType).resetThreadCounter(); + } + + // Method to reset thread-specific counters for all registered types + public void threadReset() { + for (CounterType type : counterMap.keySet()) { + threadReset(type); + } + } + + // Method to increment thread-specific counters by a given value + public void threadIncrement(CounterType counterType, long incrementBy) { + getCounterUnit(counterType).incrementThreadCounter(incrementBy); + } + + // Method to increment thread-specific counters by 1 + public void threadIncrement(CounterType counterType) { + threadIncrement(counterType, 1); + } + + // Method to increment global counters based on thread-specific counters + public void globalIncrement() { + synchronized (globalLock) { + for (CounterType type : counterMap.keySet()) { + getCounterUnit(type).addThreadToGlobalCounter(); + } + } + } + + // Method to get current counts (both thread-specific and global) as a formatted string + public String getThreadCounters(boolean global) { + StringBuilder sb = new StringBuilder(); + for (CounterType type : counterMap.keySet()) { + long value = global ? getCounterUnit(type).getGlobalCounter() : getCounterUnit(type).getThreadCounter(); + sb.append(type.name()).append("=").append(value).append(", "); + } + // Remove the trailing comma and space + if (sb.length() > 2) { + sb.setLength(sb.length() - 2); + } + return sb.toString(); + } + + public void printProgress() { + if (printPerThread) { + printAndLogProgress("Thread Counts: ", false); + } else if (shouldPrintGlobalProgress()) { + printAndLogProgress("Progress Counts: ", true); + } + } + + // Determines if it's the right time to print global progress + protected boolean shouldPrintGlobalProgress() { + if (!counterMap.containsKey(CounterType.READ)) { + return false; + } + long globalReads = counterMap.get(CounterType.READ).getGlobalCounter(); + long expectedPrintCount = globalReads - globalReads % printStatsAfter; + if (expectedPrintCount > printCounter.getGlobalCounter()) { + printCounter.setGlobalCounter(expectedPrintCount); + return true; + } + return false; + } + + // Prints and logs the progress + protected void printAndLogProgress(String message, boolean global) { + String fullMessage = message + getThreadCounters(global); + logger.info(fullMessage); + } + + public void printFinal() { + logger.info("################################################################################################"); + if (counterMap.containsKey(CounterType.READ)) logger.info("Final Read Record Count: {}", counterMap.get(CounterType.READ).getGlobalCounter()); + if (counterMap.containsKey(CounterType.MISMATCH)) logger.info("Final Mismatch Record Count: {}", counterMap.get(CounterType.MISMATCH).getGlobalCounter()); + if (counterMap.containsKey(CounterType.CORRECTED_MISMATCH)) logger.info("Final Corrected Mismatch Record Count: {}", counterMap.get(CounterType.CORRECTED_MISMATCH).getGlobalCounter()); + if (counterMap.containsKey(CounterType.MISSING)) logger.info("Final Missing Record Count: {}", counterMap.get(CounterType.MISSING).getGlobalCounter()); + if (counterMap.containsKey(CounterType.CORRECTED_MISSING)) logger.info("Final Corrected Missing Record Count: {}", counterMap.get(CounterType.CORRECTED_MISSING).getGlobalCounter()); + if (counterMap.containsKey(CounterType.VALID)) logger.info("Final Valid Record Count: {}", counterMap.get(CounterType.VALID).getGlobalCounter()); + if (counterMap.containsKey(CounterType.SKIPPED)) logger.info("Final Skipped Record Count: {}", counterMap.get(CounterType.SKIPPED).getGlobalCounter()); + if (counterMap.containsKey(CounterType.WRITE)) logger.info("Final Write Record Count: {}", counterMap.get(CounterType.WRITE).getGlobalCounter()); + if (counterMap.containsKey(CounterType.ERROR)) logger.info("Final Error Record Count: {}", counterMap.get(CounterType.ERROR).getGlobalCounter()); + if (counterMap.containsKey(CounterType.LARGE)) logger.info("Final Large Record Count: {}", counterMap.get(CounterType.LARGE).getGlobalCounter()); + logger.info("################################################################################################"); + } + +} diff --git a/src/main/java/com/datastax/cdm/properties/IPropertyHelper.java b/src/main/java/com/datastax/cdm/properties/IPropertyHelper.java index eb0fe9c1..1270d13e 100644 --- a/src/main/java/com/datastax/cdm/properties/IPropertyHelper.java +++ b/src/main/java/com/datastax/cdm/properties/IPropertyHelper.java @@ -29,5 +29,5 @@ public interface IPropertyHelper { boolean isSparkConfFullyLoaded(); - boolean meetsMinimum(String valueName, Integer testValue, Integer minimumValue); + boolean meetsMinimum(String valueName, Long testValue, Long minimumValue); } diff --git a/src/main/java/com/datastax/cdm/properties/KnownProperties.java b/src/main/java/com/datastax/cdm/properties/KnownProperties.java index 3b24a465..4aaa3f90 100644 --- a/src/main/java/com/datastax/cdm/properties/KnownProperties.java +++ b/src/main/java/com/datastax/cdm/properties/KnownProperties.java @@ -103,6 +103,7 @@ public enum PropertyType { public static final String PERF_FETCH_SIZE = "spark.cdm.perfops.fetchSizeInRows"; public static final String MAX_RETRIES = "spark.cdm.perfops.errorLimit"; public static final String PRINT_STATS_AFTER = "spark.cdm.perfops.printStatsAfter"; + public static final String PRINT_STATS_PER_PART = "spark.cdm.perfops.printStatsPerPart"; static { types.put(AUTOCORRECT_MISSING, PropertyType.BOOLEAN); @@ -127,6 +128,8 @@ public enum PropertyType { defaults.put(WRITE_CL, "LOCAL_QUORUM"); types.put(PRINT_STATS_AFTER, PropertyType.NUMBER); defaults.put(PRINT_STATS_AFTER, "100000"); + types.put(PRINT_STATS_PER_PART, PropertyType.BOOLEAN); + defaults.put(PRINT_STATS_PER_PART, "false"); types.put(PERF_FETCH_SIZE, PropertyType.NUMBER); defaults.put(PERF_FETCH_SIZE, "1000"); types.put(MAX_RETRIES, PropertyType.NUMBER); diff --git a/src/main/java/com/datastax/cdm/properties/PropertyHelper.java b/src/main/java/com/datastax/cdm/properties/PropertyHelper.java index fefc7157..7b382aed 100644 --- a/src/main/java/com/datastax/cdm/properties/PropertyHelper.java +++ b/src/main/java/com/datastax/cdm/properties/PropertyHelper.java @@ -337,8 +337,7 @@ public boolean isSparkConfFullyLoaded() { return sparkConfFullyLoaded; } - @Override - public boolean meetsMinimum(String valueName, Integer testValue, Integer minimumValue) { + public boolean meetsMinimum(String valueName, Long testValue, Long minimumValue) { if (null != minimumValue && null != testValue && testValue >= minimumValue) return true; logger.warn(valueName + " must be greater than or equal to " + minimumValue + ". Current value does not meet this requirement: " + testValue); diff --git a/src/resources/cdm-detailed.properties b/src/resources/cdm-detailed.properties index c7b31ae3..109e7435 100644 --- a/src/resources/cdm-detailed.properties +++ b/src/resources/cdm-detailed.properties @@ -171,7 +171,8 @@ spark.cdm.autocorrect.mismatch false # .write : Default is LOCAL_QUORUM. Write consistency to Target. # .printStatsAfter : Default is 100000. Number of rows of processing after which a progress log # entry will be made. -# .fetchSizeInRows : Default is 1000. This affects the frequency of reads from Origin, and also the +# .printStatsPerPart : Default is false. Print statistics for each part after it is processed. +# .fetchSizeInRows : Default is 1000. This affects the frequency of reads from Origin, and also the # frequency of flushes to Target. # .errorLimit : Default is 0. Controls how many errors a thread may encounter during Migrate # and DiffData operations before failing. It is recommended to set this to a non- @@ -185,6 +186,7 @@ spark.cdm.perfops.ratelimit.target 40000 #spark.cdm.perfops.consistency.read LOCAL_QUORUM #spark.cdm.perfops.consistency.write LOCAL_QUORUM #spark.cdm.perfops.printStatsAfter 100000 +#spark.cdm.perfops.printStatsPerPart false #spark.cdm.perfops.fetchSizeInRows 1000 #spark.cdm.perfops.errorLimit 0 diff --git a/src/test/java/com/datastax/cdm/job/JobCounterTest.java b/src/test/java/com/datastax/cdm/job/JobCounterTest.java new file mode 100644 index 00000000..ca55ea4e --- /dev/null +++ b/src/test/java/com/datastax/cdm/job/JobCounterTest.java @@ -0,0 +1,125 @@ +package com.datastax.cdm.job; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertFalse; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class JobCounterTest { + + private JobCounter jobCounter; + + @BeforeEach + public void setUp() { + jobCounter = new JobCounter(10, true); // Changed to true to test printPerThread + jobCounter.setRegisteredTypes(JobCounter.CounterType.values()); + } + + @Test + public void testThreadIncrement() { + jobCounter.threadIncrement(JobCounter.CounterType.READ, 5); + assertEquals(5, jobCounter.getCount(JobCounter.CounterType.READ)); + } + + @Test + public void testGlobalIncrement() { + jobCounter.threadIncrement(JobCounter.CounterType.READ, 5); + jobCounter.globalIncrement(); + assertEquals(5, jobCounter.getCount(JobCounter.CounterType.READ, true)); + } + + @Test + public void testThreadResetForSpecificType() { + jobCounter.threadIncrement(JobCounter.CounterType.READ, 5); + jobCounter.threadReset(JobCounter.CounterType.READ); + assertEquals(0, jobCounter.getCount(JobCounter.CounterType.READ)); + } + + @Test + public void testThreadResetForAllTypes() { + jobCounter.threadIncrement(JobCounter.CounterType.READ, 5); + jobCounter.threadIncrement(JobCounter.CounterType.WRITE, 5); + jobCounter.threadReset(); + assertEquals(0, jobCounter.getCount(JobCounter.CounterType.READ)); + assertEquals(0, jobCounter.getCount(JobCounter.CounterType.WRITE)); + } + + @Test + public void testUnregisteredCounterType() { + JobCounter localJobCounter = new JobCounter(10, true); + localJobCounter.setRegisteredTypes(JobCounter.CounterType.READ); + assertThrows(IllegalArgumentException.class, () -> localJobCounter.threadIncrement(JobCounter.CounterType.WRITE, 5)); + } + + @Test + public void testShouldPrintGlobalProgress() { + jobCounter.threadIncrement(JobCounter.CounterType.READ, 11); + jobCounter.globalIncrement(); + assertTrue(jobCounter.shouldPrintGlobalProgress()); // assuming printStatsAfter is set to 10 + } + + @Test + public void testPrintProgressForGlobalAndThread() { + jobCounter.threadIncrement(JobCounter.CounterType.READ, 11); + jobCounter.globalIncrement(); + // You may use mocking to capture logger outputs + jobCounter.printProgress(); + } + + @Test + public void testPrintFinal() { + jobCounter.threadIncrement(JobCounter.CounterType.READ, 5); + jobCounter.globalIncrement(); + // You may use mocking to capture logger outputs + jobCounter.printFinal(); + } + + @Test + public void testGetCountGlobal() { + jobCounter.threadIncrement(JobCounter.CounterType.READ, 5); + jobCounter.globalIncrement(); + assertEquals(5, jobCounter.getCount(JobCounter.CounterType.READ, true)); + } + + @Test + public void threadIncrementByOne() { + jobCounter.threadIncrement(JobCounter.CounterType.READ, 5); + jobCounter.threadIncrement(JobCounter.CounterType.READ); + assertEquals(6, jobCounter.getCount(JobCounter.CounterType.READ)); + } + + @Test + public void testShouldPrintGlobalProgressWithSufficientReads() { + // Increment global READ counter to go beyond the printStatsAfter threshold (assume it's 10) + jobCounter.threadIncrement(JobCounter.CounterType.READ, 11); + jobCounter.globalIncrement(); + + // shouldPrintGlobalProgress should return true because there are enough READs + assertTrue(jobCounter.shouldPrintGlobalProgress()); + } + + @Test + public void testShouldPrintGlobalProgressWithInsufficientReads() { + // Increment global READ counter to remain less than printStatsAfter threshold (assume it's 10) + jobCounter.threadIncrement(JobCounter.CounterType.READ, 5); + jobCounter.globalIncrement(); + + // shouldPrintGlobalProgress should return true because there are enough READs + assertFalse(jobCounter.shouldPrintGlobalProgress()); + } + + @Test + public void testShouldPrintGlobalProgressWithUnregisteredRead() { + jobCounter = new JobCounter(10, true); // Changed to true to test printPerThread + + // Set only WRITE as the registered type + jobCounter.setRegisteredTypes(JobCounter.CounterType.WRITE); + + // shouldPrintGlobalProgress should return false because READ is not registered + assertFalse(jobCounter.shouldPrintGlobalProgress()); + } + +} diff --git a/src/test/java/com/datastax/cdm/properties/PropertyHelperTest.java b/src/test/java/com/datastax/cdm/properties/PropertyHelperTest.java index 2b5cbbc7..f0102992 100644 --- a/src/test/java/com/datastax/cdm/properties/PropertyHelperTest.java +++ b/src/test/java/com/datastax/cdm/properties/PropertyHelperTest.java @@ -498,12 +498,12 @@ public void test_valid_connection_noUserPassword() { @Test public void meetsMinimum_true() { - assertTrue(helper.meetsMinimum("a", 100,0)); + assertTrue(helper.meetsMinimum("a", 100L, 0L)); } @Test public void meetsMinimum_false() { - assertFalse(helper.meetsMinimum("a", 1,100)); + assertFalse(helper.meetsMinimum("a", 1L, 100L)); } private void setValidSparkConf() {