diff --git a/src/main/java/com/datastax/cdm/cql/statement/OriginSelectByPKStatement.java b/src/main/java/com/datastax/cdm/cql/statement/OriginSelectByPKStatement.java index 35b1e63a..14dbb8e0 100644 --- a/src/main/java/com/datastax/cdm/cql/statement/OriginSelectByPKStatement.java +++ b/src/main/java/com/datastax/cdm/cql/statement/OriginSelectByPKStatement.java @@ -23,12 +23,8 @@ import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class OriginSelectByPKStatement extends OriginSelectStatement { - public Logger logger = LoggerFactory.getLogger(this.getClass().getName()); - public OriginSelectByPKStatement(IPropertyHelper propertyHelper, EnhancedSession session) { super(propertyHelper, session); } diff --git a/src/main/java/com/datastax/cdm/cql/statement/OriginSelectByPartitionRangeStatement.java b/src/main/java/com/datastax/cdm/cql/statement/OriginSelectByPartitionRangeStatement.java index 3b0e7d0e..1c8ea04e 100644 --- a/src/main/java/com/datastax/cdm/cql/statement/OriginSelectByPartitionRangeStatement.java +++ b/src/main/java/com/datastax/cdm/cql/statement/OriginSelectByPartitionRangeStatement.java @@ -23,14 +23,10 @@ import com.datastax.cdm.properties.PropertyHelper; import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.PreparedStatement; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.math.BigInteger; public class OriginSelectByPartitionRangeStatement extends OriginSelectStatement { - public Logger logger = LoggerFactory.getLogger(this.getClass().getName()); - public OriginSelectByPartitionRangeStatement(IPropertyHelper propertyHelper, EnhancedSession session) { super(propertyHelper, session); } diff --git a/src/main/java/com/datastax/cdm/cql/statement/TargetSelectByPKStatement.java b/src/main/java/com/datastax/cdm/cql/statement/TargetSelectByPKStatement.java index 2152ac37..96673d6e 100644 --- a/src/main/java/com/datastax/cdm/cql/statement/TargetSelectByPKStatement.java +++ b/src/main/java/com/datastax/cdm/cql/statement/TargetSelectByPKStatement.java @@ -26,14 +26,10 @@ import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.concurrent.CompletionStage; public class TargetSelectByPKStatement extends BaseCdmStatement { - public Logger logger = LoggerFactory.getLogger(this.getClass().getName()); - public TargetSelectByPKStatement(IPropertyHelper propertyHelper, EnhancedSession session) { super(propertyHelper, session); this.statement = buildStatement(); diff --git a/src/main/java/com/datastax/cdm/data/PKFactory.java b/src/main/java/com/datastax/cdm/data/PKFactory.java index e2d9f85b..b31ccf3a 100644 --- a/src/main/java/com/datastax/cdm/data/PKFactory.java +++ b/src/main/java/com/datastax/cdm/data/PKFactory.java @@ -19,7 +19,6 @@ import com.datastax.cdm.schema.CqlTable; import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.Row; -import com.datastax.cdm.properties.KnownProperties; import com.datastax.cdm.properties.PropertyHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/com/datastax/cdm/data/Record.java b/src/main/java/com/datastax/cdm/data/Record.java index 95ca6445..b27e6f6f 100644 --- a/src/main/java/com/datastax/cdm/data/Record.java +++ b/src/main/java/com/datastax/cdm/data/Record.java @@ -32,7 +32,6 @@ public enum Diff { private Row originRow; private Row targetRow; private CompletionStage targetFutureRow; - private Diff diff = Diff.UNKNOWN; public Record(EnhancedPK pk, Row originRow, Row targetRow, CompletionStage targetFutureRow) { if (null == pk || (null == originRow && null == targetRow && null == targetFutureRow)) { diff --git a/src/main/java/com/datastax/cdm/job/CopyPKJobSession.java b/src/main/java/com/datastax/cdm/job/CopyPKJobSession.java index cbee4ac9..ccf9b4cc 100644 --- a/src/main/java/com/datastax/cdm/job/CopyPKJobSession.java +++ b/src/main/java/com/datastax/cdm/job/CopyPKJobSession.java @@ -35,7 +35,6 @@ public class CopyPKJobSession extends AbstractJobSession private final PKFactory pkFactory; private final List originPKClasses; - private final boolean isCounterTable; public Logger logger = LoggerFactory.getLogger(this.getClass().getName()); private OriginSelectByPKStatement originSelectByPKStatement; @@ -43,7 +42,6 @@ protected CopyPKJobSession(CqlSession originSession, CqlSession targetSession, S super(originSession, targetSession, sc, true); this.jobCounter.setRegisteredTypes(JobCounter.CounterType.READ, JobCounter.CounterType.WRITE, JobCounter.CounterType.SKIPPED, JobCounter.CounterType.MISSING); pkFactory = this.originSession.getPKFactory(); - isCounterTable = this.originSession.getCqlTable().isCounterTable(); originPKClasses = this.originSession.getCqlTable().getPKClasses(); logger.info("CQL -- origin select: {}", this.originSession.getOriginSelectByPKStatement().getCQL()); @@ -56,6 +54,7 @@ public void processSlice(SplitPartitions.PKRows slice) { public void getRowAndInsert(SplitPartitions.PKRows rowsList) { originSelectByPKStatement = originSession.getOriginSelectByPKStatement(); + jobCounter.threadReset(); for (String row : rowsList.getPkRows()) { jobCounter.threadIncrement(JobCounter.CounterType.READ); EnhancedPK pk = toEnhancedPK(row); diff --git a/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java b/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java index 20370ac9..a0cce186 100644 --- a/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java +++ b/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java @@ -29,7 +29,6 @@ public class GuardrailCheckJobSession extends AbstractJobSession { - private static GuardrailCheckJobSession guardrailJobSession; public Logger logger = LoggerFactory.getLogger(this.getClass().getName()); private final PKFactory pkFactory; @@ -60,6 +59,7 @@ public void guardrailCheck(BigInteger min, BigInteger max) { OriginSelectByPartitionRangeStatement originSelectByPartitionRangeStatement = this.originSession.getOriginSelectByPartitionRangeStatement(); ResultSet resultSet = originSelectByPartitionRangeStatement.execute(originSelectByPartitionRangeStatement.bind(min, max)); String checkString; + jobCounter.threadReset(); for (Row originRow : resultSet) { rateLimiterOrigin.acquire(1); jobCounter.threadIncrement(JobCounter.CounterType.READ); diff --git a/src/main/java/com/datastax/cdm/schema/BaseTable.java b/src/main/java/com/datastax/cdm/schema/BaseTable.java index 7df83062..b5cfa4f5 100644 --- a/src/main/java/com/datastax/cdm/schema/BaseTable.java +++ b/src/main/java/com/datastax/cdm/schema/BaseTable.java @@ -71,14 +71,6 @@ public String getKeyspaceName() { public String getTableName() { return this.tableName; } - - public String getRunInfoTableName() { - return "cdm_run_info"; - } - - public String getRunDetailsTableName() { - return "cdm_run_details"; - } public String getKeyspaceTable() { return this.keyspaceName + "." + this.tableName; diff --git a/src/main/java/com/datastax/cdm/schema/CqlTable.java b/src/main/java/com/datastax/cdm/schema/CqlTable.java index fdccca70..903668a4 100644 --- a/src/main/java/com/datastax/cdm/schema/CqlTable.java +++ b/src/main/java/com/datastax/cdm/schema/CqlTable.java @@ -26,6 +26,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import com.datastax.cdm.properties.IPropertyHelper; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,7 +38,6 @@ import com.datastax.cdm.feature.Featureset; import com.datastax.cdm.feature.WritetimeTTL; import com.datastax.cdm.properties.KnownProperties; -import com.datastax.cdm.properties.PropertyHelper; import com.datastax.oss.driver.api.core.ConsistencyLevel; import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.CqlSession; @@ -84,7 +84,7 @@ public class CqlTable extends BaseTable { private final Long defaultForMissingTimestamp; private final String defaultForMissingString; - public CqlTable(PropertyHelper propertyHelper, boolean isOrigin, CqlSession session) { + public CqlTable(IPropertyHelper propertyHelper, boolean isOrigin, CqlSession session) { super(propertyHelper, isOrigin); this.keyspaceName = unFormatName(keyspaceName); this.tableName = unFormatName(tableName); @@ -139,14 +139,6 @@ public String getKeyspaceTable() { return formatName(this.keyspaceName) + "." + formatName(this.tableName); } - public String getRunInfoTable() { - return formatName(this.keyspaceName) + "." + formatName(getRunInfoTableName()); - } - - public String getRunDetailTable() { - return formatName(this.keyspaceName) + "." + formatName(getRunDetailsTableName()); - } - public void setFeatureMap(Map featureMap) { this.featureMap = featureMap; } public Feature getFeature(Featureset featureEnum) { return featureMap.get(featureEnum); } @@ -449,7 +441,7 @@ public boolean hasUnfrozenList() { .anyMatch(columnMetadata -> !CqlData.isFrozen(columnMetadata.getType())); } - private static ConsistencyLevel mapToConsistencyLevel(String level) { + protected static ConsistencyLevel mapToConsistencyLevel(String level) { ConsistencyLevel retVal = ConsistencyLevel.LOCAL_QUORUM; if (StringUtils.isNotEmpty(level)) { switch (level.toUpperCase()) { diff --git a/src/resources/primary_key_rows.csv b/src/resources/primary_key_rows.csv index 20ce1ab7..b7e731b6 100644 --- a/src/resources/primary_key_rows.csv +++ b/src/resources/primary_key_rows.csv @@ -1,4 +1,13 @@ # This is a sample input file for job: MigrateRowsFromFile # list of primary-key fields separated by ' %% ' +-1000154815969456717 %% 0 %% 10 %% 1021 +-1000154815969456717 %% 0 %% 10 %% 1022 +-1000154815969456717 %% 0 %% 10 %% 1023 -1000154815969456717 %% 0 %% 10 %% 1024 -1000154815969456717 %% 0 %% 10 %% 1025 +-1000154815969456717 %% 0 %% 10 %% 1026 +-1000154815969456717 %% 0 %% 10 %% 1027 +-1000154815969456717 %% 0 %% 10 %% 1028 +-1000154815969456717 %% 0 %% 10 %% 1029 +-1000154815969456717 %% 0 %% 10 %% 1030 + diff --git a/src/test/java/com/datastax/cdm/job/SplitPartitionsTest.java b/src/test/java/com/datastax/cdm/job/SplitPartitionsTest.java index bbe1fdbd..8b9c8720 100644 --- a/src/test/java/com/datastax/cdm/job/SplitPartitionsTest.java +++ b/src/test/java/com/datastax/cdm/job/SplitPartitionsTest.java @@ -64,6 +64,12 @@ void getSubPartitionsFromFileTest() throws IOException { assertEquals(25, partitions.size()); } + @Test + void getRowPartsFromFileTest() throws IOException { + List partitions = SplitPartitions.getRowPartsFromFile(5, "./src/resources/primary_key_rows.csv"); + assertEquals(6, partitions.size()); + } + @Test void getSubPartitionsFromHighNumPartTest() throws IOException { List partitions = SplitPartitions.getSubPartitionsFromFile(1000, "./src/resources/partitions.csv"); diff --git a/src/test/java/com/datastax/cdm/schema/BaseTableTest.java b/src/test/java/com/datastax/cdm/schema/BaseTableTest.java index 3515b3e2..e87df414 100644 --- a/src/test/java/com/datastax/cdm/schema/BaseTableTest.java +++ b/src/test/java/com/datastax/cdm/schema/BaseTableTest.java @@ -40,7 +40,20 @@ public void useOriginWhenTargetAbsent() { BaseTable bt = new BaseTable(propertyHelper, false); assertAll( + () -> assertEquals(false, bt.isOrigin()), () -> assertEquals("origin_ks", bt.getKeyspaceName()), + () -> assertEquals("origin_table", bt.getTableName()), + () -> assertEquals("origin_ks.origin_table", bt.getKeyspaceTable()) + ); + } + + @Test + public void useKSAbsent() { + when(propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE)).thenReturn("origin_table"); + BaseTable bt = new BaseTable(propertyHelper, false); + + assertAll( + () -> assertEquals("", bt.getKeyspaceName()), () -> assertEquals("origin_table", bt.getTableName()) ); } diff --git a/src/test/java/com/datastax/cdm/schema/CqlTableTest.java b/src/test/java/com/datastax/cdm/schema/CqlTableTest.java new file mode 100644 index 00000000..95b9a210 --- /dev/null +++ b/src/test/java/com/datastax/cdm/schema/CqlTableTest.java @@ -0,0 +1,27 @@ +package com.datastax.cdm.schema; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; + +import com.datastax.cdm.cql.CommonMocks; +import com.datastax.oss.driver.api.core.ConsistencyLevel; + +class CqlTableTest extends CommonMocks { + + @Test + void testCL() { + assertEquals(CqlTable.mapToConsistencyLevel("LOCAL_QUORUM"), ConsistencyLevel.LOCAL_QUORUM); + assertEquals(CqlTable.mapToConsistencyLevel("any"), ConsistencyLevel.ANY); + assertEquals(CqlTable.mapToConsistencyLevel("one"), ConsistencyLevel.ONE); + assertEquals(CqlTable.mapToConsistencyLevel("two"), ConsistencyLevel.TWO); + assertEquals(CqlTable.mapToConsistencyLevel("three"), ConsistencyLevel.THREE); + assertEquals(CqlTable.mapToConsistencyLevel("QUORUM"), ConsistencyLevel.QUORUM); + assertEquals(CqlTable.mapToConsistencyLevel("Local_one"), ConsistencyLevel.LOCAL_ONE); + assertEquals(CqlTable.mapToConsistencyLevel("EACH_quorum"), ConsistencyLevel.EACH_QUORUM); + assertEquals(CqlTable.mapToConsistencyLevel("serial"), ConsistencyLevel.SERIAL); + assertEquals(CqlTable.mapToConsistencyLevel("local_serial"), ConsistencyLevel.LOCAL_SERIAL); + assertEquals(CqlTable.mapToConsistencyLevel("all"), ConsistencyLevel.ALL); + } + +}