Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixing progress counter bug #197

Merged
merged 7 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions SIT/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ test_features: reset test_features_cmd
test_features_cmd:
./test.sh -p features

test: test_smoke test_regression_cmd test_features_cmd

# Local tests are not included in automated tests, but provide a means
# to use the test harness to validate project-specific work
test_local: reset test_local_cmd
Expand Down
2 changes: 1 addition & 1 deletion SIT/cdm-assert.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash

assertCmd="egrep 'JobSession.* Final ' \${OUTPUT_FILE} | sed 's/^.*Final //'"
assertCmd="egrep 'JobCounter.* Final ' \${OUTPUT_FILE} | sed 's/^.*Final //'"

_usage() {
cat <<EOF
Expand Down
1 change: 1 addition & 0 deletions SIT/features/05_guardrail/cdm.guardrailCheck.assert
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
Read Record Count: 4
Valid Record Count: 10
Skipped Record Count: 0
Large Record Count: 6
3 changes: 3 additions & 0 deletions SIT/regression/03_performance/migrate.properties
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ spark.cdm.schema.target.keyspaceTable target.regression_performance

spark.cdm.perfops.numParts 32
spark.cdm.perfops.batchSize 1

spark.cdm.perfops.printStatsAfter 450
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Let's please coordinate with docs crew to publish this at https://docs.datastax.com/en/astra-serverless/docs/migrate/cassandra-data-migrator.html page

spark.cdm.perfops.printStatsPerPart true
17 changes: 13 additions & 4 deletions src/main/java/com/datastax/cdm/job/AbstractJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public abstract class AbstractJobSession<T> extends BaseJobSession {
protected Guardrail guardrailFeature;
protected boolean guardrailEnabled;
protected String partitionFile = SplitPartitions.getPartitionFile(propertyHelper);
protected JobCounter jobCounter;
protected Long printStatsAfter;

protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
this(originSession, targetSession, sc, false);
Expand All @@ -32,12 +34,13 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
return;
}

printStatsAfter = propertyHelper.getInteger(KnownProperties.PRINT_STATS_AFTER);
if (!propertyHelper.meetsMinimum(KnownProperties.PRINT_STATS_AFTER, printStatsAfter, 1)) {
this.printStatsAfter = propertyHelper.getLong(KnownProperties.PRINT_STATS_AFTER);
if (!propertyHelper.meetsMinimum(KnownProperties.PRINT_STATS_AFTER, printStatsAfter, 1L)) {
logger.warn(KnownProperties.PRINT_STATS_AFTER + " must be greater than 0. Setting to default value of " + KnownProperties.getDefaultAsString(KnownProperties.PRINT_STATS_AFTER));
propertyHelper.setProperty(KnownProperties.PRINT_STATS_AFTER, KnownProperties.getDefault(KnownProperties.PRINT_STATS_AFTER));
printStatsAfter = propertyHelper.getInteger(KnownProperties.PRINT_STATS_AFTER);
printStatsAfter = propertyHelper.getLong(KnownProperties.PRINT_STATS_AFTER);
}
this.jobCounter = new JobCounter(printStatsAfter, propertyHelper.getBoolean(KnownProperties.PRINT_STATS_PER_PART));

rateLimiterOrigin = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_ORIGIN));
rateLimiterTarget = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_TARGET));
Expand Down Expand Up @@ -77,5 +80,11 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,

public abstract void processSlice(T slice);

public abstract void printCounts(boolean isFinal);
public synchronized void printCounts(boolean isFinal) {
if (isFinal) {
jobCounter.printFinal();
} else {
jobCounter.printProgress();
}
}
}
1 change: 0 additions & 1 deletion src/main/java/com/datastax/cdm/job/BaseJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public abstract class BaseJobSession {
protected RateLimiter rateLimiterOrigin;
protected RateLimiter rateLimiterTarget;
protected Integer maxRetries = 10;
protected Integer printStatsAfter = 100000;

protected BaseJobSession(SparkConf sc) {
propertyHelper.initializeSparkConf(sc);
Expand Down
67 changes: 19 additions & 48 deletions src/main/java/com/datastax/cdm/job/CopyJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,20 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicLong;

public class CopyJobSession extends AbstractJobSession<SplitPartitions.Partition> {

private static CopyJobSession copyJobSession;
private final PKFactory pkFactory;
private final boolean isCounterTable;
private final Integer fetchSize;
private final Integer batchSize;
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
protected AtomicLong readCounter = new AtomicLong(0);
protected AtomicLong skippedCounter = new AtomicLong(0);
protected AtomicLong writeCounter = new AtomicLong(0);
protected AtomicLong errorCounter = new AtomicLong(0);
private TargetUpsertStatement targetUpsertStatement;
private TargetSelectByPKStatement targetSelectByPKStatement;

protected CopyJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
super(originSession, targetSession, sc);
this.jobCounter.setRegisteredTypes(JobCounter.CounterType.READ, JobCounter.CounterType.WRITE, JobCounter.CounterType.SKIPPED, JobCounter.CounterType.ERROR, JobCounter.CounterType.UNFLUSHED);

pkFactory = this.originSession.getPKFactory();
isCounterTable = this.originSession.getCqlTable().isCounterTable();
Expand All @@ -60,11 +55,8 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
int maxAttempts = maxRetries + 1;
String guardrailCheck;
for (int attempts = 1; attempts <= maxAttempts && !done; attempts++) {
long readCnt = 0;
long flushedWriteCnt = 0;
long skipCnt = 0;
long errCnt = 0;
long unflushedWrites = 0;
jobCounter.threadReset();

try {
OriginSelectByPartitionRangeStatement originSelectByPartitionRangeStatement = this.originSession.getOriginSelectByPartitionRangeStatement();
targetUpsertStatement = this.targetSession.getTargetUpsertStatement();
Expand All @@ -74,14 +66,11 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {

for (Row originRow : resultSet) {
rateLimiterOrigin.acquire(1);
readCnt++;
if (readCnt % printStatsAfter == 0) {
printCounts(false);
}
jobCounter.threadIncrement(JobCounter.CounterType.READ);

Record record = new Record(pkFactory.getTargetPK(originRow), originRow, null);
if (originSelectByPartitionRangeStatement.shouldFilterRecord(record)) {
skipCnt++;
jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED);
continue;
}

Expand All @@ -90,66 +79,48 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
guardrailCheck = guardrailFeature.guardrailChecks(r);
if (guardrailCheck != null && guardrailCheck != Guardrail.CLEAN_CHECK) {
logger.error("Guardrails failed for PrimaryKey {}; {}", r.getPk(), guardrailCheck);
skipCnt++;
jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED);
continue;
}
}

BoundStatement boundUpsert = bind(r);
if (null == boundUpsert) {
skipCnt++; // TODO: this previously skipped, why not errCnt?
jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED); // TODO: this previously skipped, why not errCnt?
continue;
}

rateLimiterTarget.acquire(1);
batch = writeAsync(batch, writeResults, boundUpsert);
unflushedWrites++;
jobCounter.threadIncrement(JobCounter.CounterType.UNFLUSHED);

if (unflushedWrites > fetchSize) {
if (jobCounter.getCount(JobCounter.CounterType.UNFLUSHED) > fetchSize) {
flushAndClearWrites(batch, writeResults);
flushedWriteCnt += unflushedWrites;
unflushedWrites = 0;
jobCounter.threadIncrement(JobCounter.CounterType.WRITE, jobCounter.getCount(JobCounter.CounterType.UNFLUSHED));
jobCounter.threadReset(JobCounter.CounterType.UNFLUSHED);
}
}
}

flushAndClearWrites(batch, writeResults);
flushedWriteCnt += unflushedWrites;

readCounter.addAndGet(readCnt);
writeCounter.addAndGet(flushedWriteCnt);
skippedCounter.addAndGet(skipCnt);
jobCounter.threadIncrement(JobCounter.CounterType.WRITE, jobCounter.getCount(JobCounter.CounterType.UNFLUSHED));
jobCounter.threadReset(JobCounter.CounterType.UNFLUSHED);
done = true;

} catch (Exception e) {
if (attempts == maxAttempts) {
readCounter.addAndGet(readCnt);
writeCounter.addAndGet(flushedWriteCnt);
skippedCounter.addAndGet(skipCnt);
errorCounter.addAndGet(readCnt - flushedWriteCnt - skipCnt);
jobCounter.threadIncrement(JobCounter.CounterType.ERROR, jobCounter.getCount(JobCounter.CounterType.READ) - jobCounter.getCount(JobCounter.CounterType.WRITE) - jobCounter.getCount(JobCounter.CounterType.SKIPPED));
logFailedPartitionsInFile(partitionFile, min, max);
}
logger.error("Error occurred during Attempt#: {}", attempts, e);
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {} -- Attempt# {}",
Thread.currentThread().getId(), min, max, attempts);
logger.error("Error stats Read#: {}, Wrote#: {}, Skipped#: {}, Error#: {}", readCnt, flushedWriteCnt, skipCnt, (readCnt - flushedWriteCnt - skipCnt));
logger.error("Error stats " + jobCounter.getThreadCounters(false));
}
finally {
jobCounter.globalIncrement();
printCounts(false);
}
}
}

@Override
public synchronized void printCounts(boolean isFinal) {
String msg = "ThreadID: " + Thread.currentThread().getId();
if (isFinal) {
msg += " Final";
logger.info("################################################################################################");
}
logger.info("{} Read Record Count: {}", msg, readCounter.get());
logger.info("{} Skipped Record Count: {}", msg, skippedCounter.get());
logger.info("{} Write Record Count: {}", msg, writeCounter.get());
logger.info("{} Error Record Count: {}", msg, errorCounter.get());
if (isFinal) {
logger.info("################################################################################################");
}
}

Expand Down
38 changes: 9 additions & 29 deletions src/main/java/com/datastax/cdm/job/CopyPKJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,18 @@
import java.beans.PropertyEditorManager;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

public class CopyPKJobSession extends AbstractJobSession<SplitPartitions.PKRows> {

private static CopyPKJobSession copyJobSession;
private final PKFactory pkFactory;
private final List<Class> originPKClasses;
private final boolean isCounterTable;
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
protected AtomicLong readCounter = new AtomicLong(0);
protected AtomicLong missingCounter = new AtomicLong(0);
protected AtomicLong skipCounter = new AtomicLong(0);
protected AtomicLong writeCounter = new AtomicLong(0);
private OriginSelectByPKStatement originSelectByPKStatement;

protected CopyPKJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
super(originSession, targetSession, sc, true);
this.jobCounter.setRegisteredTypes(JobCounter.CounterType.READ, JobCounter.CounterType.WRITE, JobCounter.CounterType.SKIPPED, JobCounter.CounterType.MISSING);
pkFactory = this.originSession.getPKFactory();
isCounterTable = this.originSession.getCqlTable().isCounterTable();
originPKClasses = this.originSession.getCqlTable().getPKClasses();
Expand All @@ -47,64 +42,49 @@ public void processSlice(SplitPartitions.PKRows slice) {
public void getRowAndInsert(SplitPartitions.PKRows rowsList) {
originSelectByPKStatement = originSession.getOriginSelectByPKStatement();
for (String row : rowsList.getPkRows()) {
readCounter.incrementAndGet();
jobCounter.threadIncrement(JobCounter.CounterType.READ);
EnhancedPK pk = toEnhancedPK(row);
if (null == pk || pk.isError()) {
missingCounter.incrementAndGet();
jobCounter.threadIncrement(JobCounter.CounterType.MISSING);
logger.error("Could not build PK object with value <{}>; error is: {}", row, (null == pk ? "null" : pk.getMessages()));
return;
}

rateLimiterOrigin.acquire(1);
Record recordFromOrigin = originSelectByPKStatement.getRecord(pk);
if (null == recordFromOrigin) {
missingCounter.incrementAndGet();
jobCounter.threadIncrement(JobCounter.CounterType.MISSING);
logger.error("Could not find origin row with primary-key: {}", row);
return;
}
Row originRow = recordFromOrigin.getOriginRow();

Record record = new Record(pkFactory.getTargetPK(originRow), originRow, null);
if (originSelectByPKStatement.shouldFilterRecord(record)) {
skipCounter.incrementAndGet();
jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED);
return;
}

if (guardrailEnabled) {
String guardrailCheck = guardrailFeature.guardrailChecks(record);
if (guardrailCheck != null && guardrailCheck != Guardrail.CLEAN_CHECK) {
logger.error("Guardrails failed for PrimaryKey {}; {}", record.getPk(), guardrailCheck);
skipCounter.incrementAndGet();
jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED);
return;
}
}

rateLimiterTarget.acquire(1);
targetSession.getTargetUpsertStatement().putRecord(record);
writeCounter.incrementAndGet();
jobCounter.threadIncrement(JobCounter.CounterType.WRITE);

if (readCounter.get() % printStatsAfter == 0) {
printCounts(false);
}
jobCounter.globalIncrement();
printCounts(false);
}

printCounts(true);
}

@Override
public void printCounts(boolean isFinal) {
if (isFinal) {
logger.info("################################################################################################");
}
logger.info("ThreadID: {} Read Record Count: {}", Thread.currentThread().getId(), readCounter.get());
logger.info("ThreadID: {} Missing Record Count: {}", Thread.currentThread().getId(), missingCounter.get());
logger.info("ThreadID: {} Skipped Record Count: {}", Thread.currentThread().getId(), skipCounter.get());
logger.info("ThreadID: {} Inserted Record Count: {}", Thread.currentThread().getId(), writeCounter.get());
if (isFinal) {
logger.info("################################################################################################");
}
}

private EnhancedPK toEnhancedPK(String rowString) {
String[] pkFields = rowString.split(" %% ");
List<Object> values = new ArrayList<>(originPKClasses.size());
Expand Down
Loading