Skip to content

Commit

Permalink
fixing progress counter bug (#197)
Browse files Browse the repository at this point in the history
* fixing progress counter bug; consolidating count printing logic to a single class;
* need to reset unflushed counter after flushing
* refactor to HashMap and change names for clearer code
* implementing spacing suggestion
* address review comments to consolidate the pritprogress function

---------

Co-authored-by: Phil Miesle <[email protected]>
Co-authored-by: Madhavan Sridharan <[email protected]>
  • Loading branch information
3 people authored Oct 25, 2023
1 parent 5f0222f commit 1fc5fc6
Show file tree
Hide file tree
Showing 18 changed files with 385 additions and 158 deletions.
2 changes: 2 additions & 0 deletions SIT/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ test_features: reset test_features_cmd
test_features_cmd:
./test.sh -p features

test: test_smoke test_regression_cmd test_features_cmd

# Local tests are not included in automated tests, but provide a means
# to use the test harness to validate project-specific work
test_local: reset test_local_cmd
Expand Down
2 changes: 1 addition & 1 deletion SIT/cdm-assert.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash

assertCmd="egrep 'JobSession.* Final ' \${OUTPUT_FILE} | sed 's/^.*Final //'"
assertCmd="egrep 'JobCounter.* Final ' \${OUTPUT_FILE} | sed 's/^.*Final //'"

_usage() {
cat <<EOF
Expand Down
1 change: 1 addition & 0 deletions SIT/features/05_guardrail/cdm.guardrailCheck.assert
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
Read Record Count: 4
Valid Record Count: 10
Skipped Record Count: 0
Large Record Count: 6
3 changes: 3 additions & 0 deletions SIT/regression/03_performance/migrate.properties
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ spark.cdm.schema.target.keyspaceTable target.regression_performance

spark.cdm.perfops.numParts 32
spark.cdm.perfops.batchSize 1

spark.cdm.perfops.printStatsAfter 450
spark.cdm.perfops.printStatsPerPart true
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@
<counter>LINE</counter>
<value>MISSEDCOUNT</value>
<!-- <maximum>1544</maximum>-->
<maximum>3052</maximum>
<maximum>3073</maximum>
</limit>
</limits>
</rule>
Expand Down
17 changes: 13 additions & 4 deletions src/main/java/com/datastax/cdm/job/AbstractJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public abstract class AbstractJobSession<T> extends BaseJobSession {
protected Guardrail guardrailFeature;
protected boolean guardrailEnabled;
protected String partitionFile = SplitPartitions.getPartitionFile(propertyHelper);
protected JobCounter jobCounter;
protected Long printStatsAfter;

protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
this(originSession, targetSession, sc, false);
Expand All @@ -32,12 +34,13 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
return;
}

printStatsAfter = propertyHelper.getInteger(KnownProperties.PRINT_STATS_AFTER);
if (!propertyHelper.meetsMinimum(KnownProperties.PRINT_STATS_AFTER, printStatsAfter, 1)) {
this.printStatsAfter = propertyHelper.getLong(KnownProperties.PRINT_STATS_AFTER);
if (!propertyHelper.meetsMinimum(KnownProperties.PRINT_STATS_AFTER, printStatsAfter, 1L)) {
logger.warn(KnownProperties.PRINT_STATS_AFTER + " must be greater than 0. Setting to default value of " + KnownProperties.getDefaultAsString(KnownProperties.PRINT_STATS_AFTER));
propertyHelper.setProperty(KnownProperties.PRINT_STATS_AFTER, KnownProperties.getDefault(KnownProperties.PRINT_STATS_AFTER));
printStatsAfter = propertyHelper.getInteger(KnownProperties.PRINT_STATS_AFTER);
printStatsAfter = propertyHelper.getLong(KnownProperties.PRINT_STATS_AFTER);
}
this.jobCounter = new JobCounter(printStatsAfter, propertyHelper.getBoolean(KnownProperties.PRINT_STATS_PER_PART));

rateLimiterOrigin = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_ORIGIN));
rateLimiterTarget = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_TARGET));
Expand Down Expand Up @@ -77,5 +80,11 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,

public abstract void processSlice(T slice);

public abstract void printCounts(boolean isFinal);
public synchronized void printCounts(boolean isFinal) {
if (isFinal) {
jobCounter.printFinal();
} else {
jobCounter.printProgress();
}
}
}
1 change: 0 additions & 1 deletion src/main/java/com/datastax/cdm/job/BaseJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public abstract class BaseJobSession {
protected RateLimiter rateLimiterOrigin;
protected RateLimiter rateLimiterTarget;
protected Integer maxRetries = 10;
protected Integer printStatsAfter = 100000;

protected BaseJobSession(SparkConf sc) {
propertyHelper.initializeSparkConf(sc);
Expand Down
67 changes: 19 additions & 48 deletions src/main/java/com/datastax/cdm/job/CopyJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,20 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicLong;

public class CopyJobSession extends AbstractJobSession<SplitPartitions.Partition> {

private static CopyJobSession copyJobSession;
private final PKFactory pkFactory;
private final boolean isCounterTable;
private final Integer fetchSize;
private final Integer batchSize;
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
protected AtomicLong readCounter = new AtomicLong(0);
protected AtomicLong skippedCounter = new AtomicLong(0);
protected AtomicLong writeCounter = new AtomicLong(0);
protected AtomicLong errorCounter = new AtomicLong(0);
private TargetUpsertStatement targetUpsertStatement;
private TargetSelectByPKStatement targetSelectByPKStatement;

protected CopyJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
super(originSession, targetSession, sc);
this.jobCounter.setRegisteredTypes(JobCounter.CounterType.READ, JobCounter.CounterType.WRITE, JobCounter.CounterType.SKIPPED, JobCounter.CounterType.ERROR, JobCounter.CounterType.UNFLUSHED);

pkFactory = this.originSession.getPKFactory();
isCounterTable = this.originSession.getCqlTable().isCounterTable();
Expand All @@ -60,11 +55,8 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
int maxAttempts = maxRetries + 1;
String guardrailCheck;
for (int attempts = 1; attempts <= maxAttempts && !done; attempts++) {
long readCnt = 0;
long flushedWriteCnt = 0;
long skipCnt = 0;
long errCnt = 0;
long unflushedWrites = 0;
jobCounter.threadReset();

try {
OriginSelectByPartitionRangeStatement originSelectByPartitionRangeStatement = this.originSession.getOriginSelectByPartitionRangeStatement();
targetUpsertStatement = this.targetSession.getTargetUpsertStatement();
Expand All @@ -74,14 +66,11 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {

for (Row originRow : resultSet) {
rateLimiterOrigin.acquire(1);
readCnt++;
if (readCnt % printStatsAfter == 0) {
printCounts(false);
}
jobCounter.threadIncrement(JobCounter.CounterType.READ);

Record record = new Record(pkFactory.getTargetPK(originRow), originRow, null);
if (originSelectByPartitionRangeStatement.shouldFilterRecord(record)) {
skipCnt++;
jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED);
continue;
}

Expand All @@ -90,66 +79,48 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
guardrailCheck = guardrailFeature.guardrailChecks(r);
if (guardrailCheck != null && guardrailCheck != Guardrail.CLEAN_CHECK) {
logger.error("Guardrails failed for PrimaryKey {}; {}", r.getPk(), guardrailCheck);
skipCnt++;
jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED);
continue;
}
}

BoundStatement boundUpsert = bind(r);
if (null == boundUpsert) {
skipCnt++; // TODO: this previously skipped, why not errCnt?
jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED); // TODO: this previously skipped, why not errCnt?
continue;
}

rateLimiterTarget.acquire(1);
batch = writeAsync(batch, writeResults, boundUpsert);
unflushedWrites++;
jobCounter.threadIncrement(JobCounter.CounterType.UNFLUSHED);

if (unflushedWrites > fetchSize) {
if (jobCounter.getCount(JobCounter.CounterType.UNFLUSHED) > fetchSize) {
flushAndClearWrites(batch, writeResults);
flushedWriteCnt += unflushedWrites;
unflushedWrites = 0;
jobCounter.threadIncrement(JobCounter.CounterType.WRITE, jobCounter.getCount(JobCounter.CounterType.UNFLUSHED));
jobCounter.threadReset(JobCounter.CounterType.UNFLUSHED);
}
}
}

flushAndClearWrites(batch, writeResults);
flushedWriteCnt += unflushedWrites;

readCounter.addAndGet(readCnt);
writeCounter.addAndGet(flushedWriteCnt);
skippedCounter.addAndGet(skipCnt);
jobCounter.threadIncrement(JobCounter.CounterType.WRITE, jobCounter.getCount(JobCounter.CounterType.UNFLUSHED));
jobCounter.threadReset(JobCounter.CounterType.UNFLUSHED);
done = true;

} catch (Exception e) {
if (attempts == maxAttempts) {
readCounter.addAndGet(readCnt);
writeCounter.addAndGet(flushedWriteCnt);
skippedCounter.addAndGet(skipCnt);
errorCounter.addAndGet(readCnt - flushedWriteCnt - skipCnt);
jobCounter.threadIncrement(JobCounter.CounterType.ERROR, jobCounter.getCount(JobCounter.CounterType.READ) - jobCounter.getCount(JobCounter.CounterType.WRITE) - jobCounter.getCount(JobCounter.CounterType.SKIPPED));
logFailedPartitionsInFile(partitionFile, min, max);
}
logger.error("Error occurred during Attempt#: {}", attempts, e);
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {} -- Attempt# {}",
Thread.currentThread().getId(), min, max, attempts);
logger.error("Error stats Read#: {}, Wrote#: {}, Skipped#: {}, Error#: {}", readCnt, flushedWriteCnt, skipCnt, (readCnt - flushedWriteCnt - skipCnt));
logger.error("Error stats " + jobCounter.getThreadCounters(false));
}
finally {
jobCounter.globalIncrement();
printCounts(false);
}
}
}

@Override
public synchronized void printCounts(boolean isFinal) {
String msg = "ThreadID: " + Thread.currentThread().getId();
if (isFinal) {
msg += " Final";
logger.info("################################################################################################");
}
logger.info("{} Read Record Count: {}", msg, readCounter.get());
logger.info("{} Skipped Record Count: {}", msg, skippedCounter.get());
logger.info("{} Write Record Count: {}", msg, writeCounter.get());
logger.info("{} Error Record Count: {}", msg, errorCounter.get());
if (isFinal) {
logger.info("################################################################################################");
}
}

Expand Down
38 changes: 9 additions & 29 deletions src/main/java/com/datastax/cdm/job/CopyPKJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,18 @@
import java.beans.PropertyEditorManager;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

public class CopyPKJobSession extends AbstractJobSession<SplitPartitions.PKRows> {

private static CopyPKJobSession copyJobSession;
private final PKFactory pkFactory;
private final List<Class> originPKClasses;
private final boolean isCounterTable;
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
protected AtomicLong readCounter = new AtomicLong(0);
protected AtomicLong missingCounter = new AtomicLong(0);
protected AtomicLong skipCounter = new AtomicLong(0);
protected AtomicLong writeCounter = new AtomicLong(0);
private OriginSelectByPKStatement originSelectByPKStatement;

protected CopyPKJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
super(originSession, targetSession, sc, true);
this.jobCounter.setRegisteredTypes(JobCounter.CounterType.READ, JobCounter.CounterType.WRITE, JobCounter.CounterType.SKIPPED, JobCounter.CounterType.MISSING);
pkFactory = this.originSession.getPKFactory();
isCounterTable = this.originSession.getCqlTable().isCounterTable();
originPKClasses = this.originSession.getCqlTable().getPKClasses();
Expand All @@ -47,64 +42,49 @@ public void processSlice(SplitPartitions.PKRows slice) {
public void getRowAndInsert(SplitPartitions.PKRows rowsList) {
originSelectByPKStatement = originSession.getOriginSelectByPKStatement();
for (String row : rowsList.getPkRows()) {
readCounter.incrementAndGet();
jobCounter.threadIncrement(JobCounter.CounterType.READ);
EnhancedPK pk = toEnhancedPK(row);
if (null == pk || pk.isError()) {
missingCounter.incrementAndGet();
jobCounter.threadIncrement(JobCounter.CounterType.MISSING);
logger.error("Could not build PK object with value <{}>; error is: {}", row, (null == pk ? "null" : pk.getMessages()));
return;
}

rateLimiterOrigin.acquire(1);
Record recordFromOrigin = originSelectByPKStatement.getRecord(pk);
if (null == recordFromOrigin) {
missingCounter.incrementAndGet();
jobCounter.threadIncrement(JobCounter.CounterType.MISSING);
logger.error("Could not find origin row with primary-key: {}", row);
return;
}
Row originRow = recordFromOrigin.getOriginRow();

Record record = new Record(pkFactory.getTargetPK(originRow), originRow, null);
if (originSelectByPKStatement.shouldFilterRecord(record)) {
skipCounter.incrementAndGet();
jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED);
return;
}

if (guardrailEnabled) {
String guardrailCheck = guardrailFeature.guardrailChecks(record);
if (guardrailCheck != null && guardrailCheck != Guardrail.CLEAN_CHECK) {
logger.error("Guardrails failed for PrimaryKey {}; {}", record.getPk(), guardrailCheck);
skipCounter.incrementAndGet();
jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED);
return;
}
}

rateLimiterTarget.acquire(1);
targetSession.getTargetUpsertStatement().putRecord(record);
writeCounter.incrementAndGet();
jobCounter.threadIncrement(JobCounter.CounterType.WRITE);

if (readCounter.get() % printStatsAfter == 0) {
printCounts(false);
}
jobCounter.globalIncrement();
printCounts(false);
}

printCounts(true);
}

@Override
public void printCounts(boolean isFinal) {
if (isFinal) {
logger.info("################################################################################################");
}
logger.info("ThreadID: {} Read Record Count: {}", Thread.currentThread().getId(), readCounter.get());
logger.info("ThreadID: {} Missing Record Count: {}", Thread.currentThread().getId(), missingCounter.get());
logger.info("ThreadID: {} Skipped Record Count: {}", Thread.currentThread().getId(), skipCounter.get());
logger.info("ThreadID: {} Inserted Record Count: {}", Thread.currentThread().getId(), writeCounter.get());
if (isFinal) {
logger.info("################################################################################################");
}
}

private EnhancedPK toEnhancedPK(String rowString) {
String[] pkFields = rowString.split(" %% ");
List<Object> values = new ArrayList<>(originPKClasses.size());
Expand Down
Loading

0 comments on commit 1fc5fc6

Please sign in to comment.