Skip to content

Commit

Permalink
Minor refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
pravinbhat committed Jul 16, 2024
1 parent 7d5e962 commit cf71c7e
Show file tree
Hide file tree
Showing 13 changed files with 60 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,8 @@
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OriginSelectByPKStatement extends OriginSelectStatement {
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());

public OriginSelectByPKStatement(IPropertyHelper propertyHelper, EnhancedSession session) {
super(propertyHelper, session);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,10 @@
import com.datastax.cdm.properties.PropertyHelper;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigInteger;

public class OriginSelectByPartitionRangeStatement extends OriginSelectStatement {
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());

public OriginSelectByPartitionRangeStatement(IPropertyHelper propertyHelper, EnhancedSession session) {
super(propertyHelper, session);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,10 @@
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletionStage;

public class TargetSelectByPKStatement extends BaseCdmStatement {
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());

public TargetSelectByPKStatement(IPropertyHelper propertyHelper, EnhancedSession session) {
super(propertyHelper, session);
this.statement = buildStatement();
Expand Down
1 change: 0 additions & 1 deletion src/main/java/com/datastax/cdm/data/PKFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.datastax.cdm.schema.CqlTable;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.cdm.properties.KnownProperties;
import com.datastax.cdm.properties.PropertyHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
1 change: 0 additions & 1 deletion src/main/java/com/datastax/cdm/data/Record.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public enum Diff {
private Row originRow;
private Row targetRow;
private CompletionStage<AsyncResultSet> targetFutureRow;
private Diff diff = Diff.UNKNOWN;

public Record(EnhancedPK pk, Row originRow, Row targetRow, CompletionStage<AsyncResultSet> targetFutureRow) {
if (null == pk || (null == originRow && null == targetRow && null == targetFutureRow)) {
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/com/datastax/cdm/job/CopyPKJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,13 @@ public class CopyPKJobSession extends AbstractJobSession<SplitPartitions.PKRows>

private final PKFactory pkFactory;
private final List<Class> originPKClasses;
private final boolean isCounterTable;
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
private OriginSelectByPKStatement originSelectByPKStatement;

protected CopyPKJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
super(originSession, targetSession, sc, true);
this.jobCounter.setRegisteredTypes(JobCounter.CounterType.READ, JobCounter.CounterType.WRITE, JobCounter.CounterType.SKIPPED, JobCounter.CounterType.MISSING);
pkFactory = this.originSession.getPKFactory();
isCounterTable = this.originSession.getCqlTable().isCounterTable();
originPKClasses = this.originSession.getCqlTable().getPKClasses();

logger.info("CQL -- origin select: {}", this.originSession.getOriginSelectByPKStatement().getCQL());
Expand All @@ -56,6 +54,7 @@ public void processSlice(SplitPartitions.PKRows slice) {

public void getRowAndInsert(SplitPartitions.PKRows rowsList) {
originSelectByPKStatement = originSession.getOriginSelectByPKStatement();
jobCounter.threadReset();
for (String row : rowsList.getPkRows()) {
jobCounter.threadIncrement(JobCounter.CounterType.READ);
EnhancedPK pk = toEnhancedPK(row);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

public class GuardrailCheckJobSession extends AbstractJobSession<SplitPartitions.Partition> {

private static GuardrailCheckJobSession guardrailJobSession;
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());

private final PKFactory pkFactory;
Expand Down Expand Up @@ -60,6 +59,7 @@ public void guardrailCheck(BigInteger min, BigInteger max) {
OriginSelectByPartitionRangeStatement originSelectByPartitionRangeStatement = this.originSession.getOriginSelectByPartitionRangeStatement();
ResultSet resultSet = originSelectByPartitionRangeStatement.execute(originSelectByPartitionRangeStatement.bind(min, max));
String checkString;
jobCounter.threadReset();
for (Row originRow : resultSet) {
rateLimiterOrigin.acquire(1);
jobCounter.threadIncrement(JobCounter.CounterType.READ);
Expand Down
8 changes: 0 additions & 8 deletions src/main/java/com/datastax/cdm/schema/BaseTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,6 @@ public String getKeyspaceName() {
public String getTableName() {
return this.tableName;
}

public String getRunInfoTableName() {
return "cdm_run_info";
}

public String getRunDetailsTableName() {
return "cdm_run_details";
}

public String getKeyspaceTable() {
return this.keyspaceName + "." + this.tableName;
Expand Down
14 changes: 3 additions & 11 deletions src/main/java/com/datastax/cdm/schema/CqlTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import com.datastax.cdm.properties.IPropertyHelper;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -37,7 +38,6 @@
import com.datastax.cdm.feature.Featureset;
import com.datastax.cdm.feature.WritetimeTTL;
import com.datastax.cdm.properties.KnownProperties;
import com.datastax.cdm.properties.PropertyHelper;
import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
Expand Down Expand Up @@ -84,7 +84,7 @@ public class CqlTable extends BaseTable {
private final Long defaultForMissingTimestamp;
private final String defaultForMissingString;

public CqlTable(PropertyHelper propertyHelper, boolean isOrigin, CqlSession session) {
public CqlTable(IPropertyHelper propertyHelper, boolean isOrigin, CqlSession session) {
super(propertyHelper, isOrigin);
this.keyspaceName = unFormatName(keyspaceName);
this.tableName = unFormatName(tableName);
Expand Down Expand Up @@ -139,14 +139,6 @@ public String getKeyspaceTable() {
return formatName(this.keyspaceName) + "." + formatName(this.tableName);
}

public String getRunInfoTable() {
return formatName(this.keyspaceName) + "." + formatName(getRunInfoTableName());
}

public String getRunDetailTable() {
return formatName(this.keyspaceName) + "." + formatName(getRunDetailsTableName());
}

public void setFeatureMap(Map<Featureset, Feature> featureMap) { this.featureMap = featureMap; }
public Feature getFeature(Featureset featureEnum) { return featureMap.get(featureEnum); }

Expand Down Expand Up @@ -449,7 +441,7 @@ public boolean hasUnfrozenList() {
.anyMatch(columnMetadata -> !CqlData.isFrozen(columnMetadata.getType()));
}

private static ConsistencyLevel mapToConsistencyLevel(String level) {
protected static ConsistencyLevel mapToConsistencyLevel(String level) {
ConsistencyLevel retVal = ConsistencyLevel.LOCAL_QUORUM;
if (StringUtils.isNotEmpty(level)) {
switch (level.toUpperCase()) {
Expand Down
9 changes: 9 additions & 0 deletions src/resources/primary_key_rows.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
# This is a sample input file for job: MigrateRowsFromFile
# list of primary-key fields separated by ' %% '
-1000154815969456717 %% 0 %% 10 %% 1021
-1000154815969456717 %% 0 %% 10 %% 1022
-1000154815969456717 %% 0 %% 10 %% 1023
-1000154815969456717 %% 0 %% 10 %% 1024
-1000154815969456717 %% 0 %% 10 %% 1025
-1000154815969456717 %% 0 %% 10 %% 1026
-1000154815969456717 %% 0 %% 10 %% 1027
-1000154815969456717 %% 0 %% 10 %% 1028
-1000154815969456717 %% 0 %% 10 %% 1029
-1000154815969456717 %% 0 %% 10 %% 1030

6 changes: 6 additions & 0 deletions src/test/java/com/datastax/cdm/job/SplitPartitionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ void getSubPartitionsFromFileTest() throws IOException {
assertEquals(25, partitions.size());
}

@Test
void getRowPartsFromFileTest() throws IOException {
List<SplitPartitions.PKRows> partitions = SplitPartitions.getRowPartsFromFile(5, "./src/resources/primary_key_rows.csv");
assertEquals(6, partitions.size());
}

@Test
void getSubPartitionsFromHighNumPartTest() throws IOException {
List<SplitPartitions.Partition> partitions = SplitPartitions.getSubPartitionsFromFile(1000, "./src/resources/partitions.csv");
Expand Down
13 changes: 13 additions & 0 deletions src/test/java/com/datastax/cdm/schema/BaseTableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,20 @@ public void useOriginWhenTargetAbsent() {
BaseTable bt = new BaseTable(propertyHelper, false);

assertAll(
() -> assertEquals(false, bt.isOrigin()),
() -> assertEquals("origin_ks", bt.getKeyspaceName()),
() -> assertEquals("origin_table", bt.getTableName()),
() -> assertEquals("origin_ks.origin_table", bt.getKeyspaceTable())
);
}

@Test
public void useKSAbsent() {
when(propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE)).thenReturn("origin_table");
BaseTable bt = new BaseTable(propertyHelper, false);

assertAll(
() -> assertEquals("", bt.getKeyspaceName()),
() -> assertEquals("origin_table", bt.getTableName())
);
}
Expand Down
27 changes: 27 additions & 0 deletions src/test/java/com/datastax/cdm/schema/CqlTableTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.datastax.cdm.schema;

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.jupiter.api.Test;

import com.datastax.cdm.cql.CommonMocks;
import com.datastax.oss.driver.api.core.ConsistencyLevel;

class CqlTableTest extends CommonMocks {

@Test
void testCL() {
assertEquals(CqlTable.mapToConsistencyLevel("LOCAL_QUORUM"), ConsistencyLevel.LOCAL_QUORUM);
assertEquals(CqlTable.mapToConsistencyLevel("any"), ConsistencyLevel.ANY);
assertEquals(CqlTable.mapToConsistencyLevel("one"), ConsistencyLevel.ONE);
assertEquals(CqlTable.mapToConsistencyLevel("two"), ConsistencyLevel.TWO);
assertEquals(CqlTable.mapToConsistencyLevel("three"), ConsistencyLevel.THREE);
assertEquals(CqlTable.mapToConsistencyLevel("QUORUM"), ConsistencyLevel.QUORUM);
assertEquals(CqlTable.mapToConsistencyLevel("Local_one"), ConsistencyLevel.LOCAL_ONE);
assertEquals(CqlTable.mapToConsistencyLevel("EACH_quorum"), ConsistencyLevel.EACH_QUORUM);
assertEquals(CqlTable.mapToConsistencyLevel("serial"), ConsistencyLevel.SERIAL);
assertEquals(CqlTable.mapToConsistencyLevel("local_serial"), ConsistencyLevel.LOCAL_SERIAL);
assertEquals(CqlTable.mapToConsistencyLevel("all"), ConsistencyLevel.ALL);
}

}

0 comments on commit cf71c7e

Please sign in to comment.