diff --git a/pom.xml b/pom.xml index 06c7606..43dc1dc 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,7 @@ jar - 2.36.0 + 2.46.0 31.0.1-jre 2.1 @@ -36,12 +36,12 @@ 3.7.0 1.6.0 3.0.2 - 1.7.36 + 2.0.5 3.0.0-M5 1.1.3 3.1.2 9.2.1 - 1.8.2 + 1.10.1 diff --git a/src/main/java/com/google/cloud/solutions/StructToMutation.java b/src/main/java/com/google/cloud/solutions/StructToMutation.java index d0d1bec..c864551 100644 --- a/src/main/java/com/google/cloud/solutions/StructToMutation.java +++ b/src/main/java/com/google/cloud/solutions/StructToMutation.java @@ -75,117 +75,130 @@ private void setMutationColumnValues(Struct row, WriteBuilder mutation) { } ValueBinder pendingValue = mutation.set(columnName); - Value columnValue = row.getValue(colNum); - switch (columnValue.getType().getCode()) { - case BOOL: - pendingValue.to(columnValue.getBool()); - break; - case INT64: - pendingValue.to(columnValue.getInt64()); - break; - case NUMERIC: - pendingValue.to(columnValue.getNumeric()); - break; - case FLOAT64: - pendingValue.to(columnValue.getFloat64()); - break; - case STRING: - pendingValue.to(columnValue.getString()); - break; - case JSON: - pendingValue.to(columnValue.getJson()); - break; - case BYTES: - pendingValue.to(columnValue.getBytes()); - break; - case TIMESTAMP: - pendingValue.to(columnValue.getTimestamp()); - break; - case DATE: - pendingValue.to(columnValue.getDate()); - break; - case ARRAY: - switch (columnValue.getType().getArrayElementType().getCode()) { - case BOOL: - pendingValue.toBoolArray(columnValue.getBoolArray()); - break; - case INT64: - pendingValue.toInt64Array(columnValue.getInt64Array()); - break; - case NUMERIC: - pendingValue.toNumericArray(columnValue.getNumericArray()); - break; - case FLOAT64: - pendingValue.toFloat64Array(columnValue.getFloat64Array()); - break; - case STRING: - pendingValue.toStringArray(columnValue.getStringArray()); - break; - case JSON: - pendingValue.toJsonArray(columnValue.getJsonArray()); - break; - case BYTES: - pendingValue.toBytesArray(columnValue.getBytesArray()); - break; - case TIMESTAMP: - pendingValue.toTimestampArray(columnValue.getTimestampArray()); - break; - case DATE: - pendingValue.toDateArray(columnValue.getDateArray()); - break; - default: - throw new IllegalArgumentException( - "Unsupported array column value type in position: " - + colNum - + ": " - + columnValue.getType().getArrayElementType().getCode().toString()); - } - break; - default: - throw new IllegalArgumentException( - "Unsupported column type in position: " - + colNum - + ": " - + columnValue.getType().getCode().toString()); + Value columnValue = row.getValue(columnName); + + if (columnValue.isNull()) { + pendingValue.to((String) null); + } else { + + switch (columnValue.getType().getCode()) { + case BOOL: + pendingValue.to(columnValue.getBool()); + break; + case INT64: + pendingValue.to(columnValue.getInt64()); + break; + case NUMERIC: + pendingValue.to(columnValue.getNumeric()); + break; + case FLOAT64: + pendingValue.to(columnValue.getFloat64()); + break; + case STRING: + pendingValue.to(columnValue.getString()); + break; + case JSON: + pendingValue.to(columnValue.getJson()); + break; + case BYTES: + pendingValue.to(columnValue.getBytes()); + break; + case TIMESTAMP: + pendingValue.to(columnValue.getTimestamp()); + break; + case DATE: + pendingValue.to(columnValue.getDate()); + break; + case ARRAY: + switch (columnValue.getType().getArrayElementType().getCode()) { + case BOOL: + pendingValue.toBoolArray(columnValue.getBoolArray()); + break; + case INT64: + pendingValue.toInt64Array(columnValue.getInt64Array()); + break; + case NUMERIC: + pendingValue.toNumericArray(columnValue.getNumericArray()); + break; + case FLOAT64: + pendingValue.toFloat64Array(columnValue.getFloat64Array()); + break; + case STRING: + pendingValue.toStringArray(columnValue.getStringArray()); + break; + case JSON: + pendingValue.toJsonArray(columnValue.getJsonArray()); + break; + case BYTES: + pendingValue.toBytesArray(columnValue.getBytesArray()); + break; + case TIMESTAMP: + pendingValue.toTimestampArray(columnValue.getTimestampArray()); + break; + case DATE: + pendingValue.toDateArray(columnValue.getDateArray()); + break; + default: + throw new IllegalArgumentException( + "Unsupported array column value type in position: " + + colNum + + ": " + + columnValue.getType().getArrayElementType().getCode().toString()); + } + break; + default: + throw new IllegalArgumentException( + "Unsupported column type in position: " + + colNum + + ": " + + columnValue.getType().getCode().toString()); + } } } } private Key buildKeyFromStruct(Struct row) { + List cols = row.getType().getStructFields(); Key.Builder keyBuilder = Key.newBuilder(); for (int colNum = 0; colNum < row.getColumnCount(); colNum++) { - Value value = row.getValue(colNum); - switch (value.getType().getCode()) { - case BOOL: - keyBuilder.append(value.getBool()); - break; - case INT64: - keyBuilder.append(value.getInt64()); - break; - case NUMERIC: - keyBuilder.append(value.getNumeric()); - break; - case FLOAT64: - keyBuilder.append(value.getFloat64()); - break; - case STRING: - keyBuilder.append(value.getString()); - break; - case BYTES: - keyBuilder.append(value.getBytes()); - break; - case TIMESTAMP: - keyBuilder.append(value.getTimestamp()); - break; - case DATE: - keyBuilder.append(value.getDate()); - break; - default: - throw new IllegalArgumentException( - "Unsupported key column type in position: " - + colNum - + ": " - + value.getType().getCode()); + Value value = row.getValue(cols.get(colNum).getName()); + + if (value.isNull()) { + keyBuilder.append((String) null); + } else { + + switch (value.getType().getCode()) { + case BOOL: + keyBuilder.append(value.getBool()); + break; + case INT64: + keyBuilder.append(value.getInt64()); + break; + case NUMERIC: + keyBuilder.append(value.getNumeric()); + break; + case FLOAT64: + keyBuilder.append(value.getFloat64()); + break; + case STRING: + keyBuilder.append(value.getString()); + break; + case BYTES: + keyBuilder.append(value.getBytes()); + break; + case TIMESTAMP: + keyBuilder.append(value.getTimestamp()); + break; + case DATE: + keyBuilder.append(value.getDate()); + break; + default: + throw new IllegalArgumentException( + "Unsupported key column type in position: " + + colNum + + ": " + + value.getType().getCode()); + } } } return keyBuilder.build(); diff --git a/src/test/java/com/google/cloud/solutions/SpannerTableCopyIntegrationTest.java b/src/test/java/com/google/cloud/solutions/SpannerTableCopyIntegrationTest.java index 5003a5f..6f27fdf 100644 --- a/src/test/java/com/google/cloud/solutions/SpannerTableCopyIntegrationTest.java +++ b/src/test/java/com/google/cloud/solutions/SpannerTableCopyIntegrationTest.java @@ -22,11 +22,11 @@ import com.google.cloud.solutions.testing.SpannerEmulator; import com.google.cloud.spanner.KeySet; import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.ReadContext; import com.google.cloud.spanner.ResultSet; import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.Struct; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -37,6 +37,7 @@ import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.KV; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -49,7 +50,8 @@ /** * This test requires the Cloud Spanner Emulator to be installed. * - *

see https://cloud.google.com/spanner/docs/emulator#installing_and_running_the_emulator + *

see emulator documentation */ @RunWith(JUnit4.class) public class SpannerTableCopyIntegrationTest { @@ -63,10 +65,11 @@ public class SpannerTableCopyIntegrationTest { public static final String INSTANCE_ID = "test"; public static final String DATABASE_ID = "test"; - private static final ImmutableMap TEST_DATA = - ImmutableMap.of( - 1L, "Hello World", - 2L, "Goodbye World"); + private static final ImmutableList> TEST_DATA = + ImmutableList.of( + KV.of(1L, "Hello World"), + KV.of(2L, "Goodbye World"), + KV.of(3L, null)); @ClassRule public static SpannerEmulator emulator = @@ -86,7 +89,7 @@ public void setUpTestDb() { emulator .getDatabaseClient() .writeAtLeastOnce( - TEST_DATA.entrySet().stream() + TEST_DATA.stream() .map( (entry) -> Mutation.newInsertBuilder("source") @@ -146,15 +149,15 @@ public void copySourceToDest() throws IOException { .run() .waitUntilFinish(); - assertThat(readAllRowsFromTable("dest")).containsExactlyEntriesIn(TEST_DATA); + assertThat(readAllRowsFromTable("dest")).containsExactlyElementsIn(TEST_DATA); assertThat(readLinesFromFiles("mutations")) .containsExactlyElementsIn( - TEST_DATA.entrySet().stream() + TEST_DATA.stream() .map( entry -> String.format( - "insert(dest{key=%d,value=%s})", entry.getKey(), entry.getValue())) + "insert(dest{key=%d,value=%s})", entry.getKey(), nullToNULL(entry.getValue()))) .collect(Collectors.toList())); assertThat(readLinesFromFiles("failures")).isEmpty(); @@ -185,16 +188,23 @@ public void copySourceToDestDryRun() throws IOException { assertThat(readLinesFromFiles("mutations")) .containsExactlyElementsIn( - TEST_DATA.entrySet().stream() + TEST_DATA.stream() .map( entry -> String.format( - "insert(dest{key=%d,value=%s})", entry.getKey(), entry.getValue())) + "insert(dest{key=%d,value=%s})", entry.getKey(), nullToNULL(entry.getValue()))) .collect(Collectors.toList())); assertThat(readLinesFromFiles("failures")).isEmpty(); } + /** + * converts null values to NULL string + */ + private static String nullToNULL(String s) { + return s == null ? "NULL" : s; + } + @Test public void pointInTimeRecoverDeletedRowsToDest() throws InterruptedException, IOException { // allow some time to pass to avoid flakyness. Emulator does not seem @@ -231,15 +241,15 @@ public void pointInTimeRecoverDeletedRowsToDest() throws InterruptedException, I .run() .waitUntilFinish(); - assertThat(readAllRowsFromTable("dest")).containsExactlyEntriesIn(TEST_DATA); + assertThat(readAllRowsFromTable("dest")).containsExactlyElementsIn(TEST_DATA); assertThat(readLinesFromFiles("mutations")) .containsExactlyElementsIn( - TEST_DATA.entrySet().stream() + TEST_DATA.stream() .map( entry -> String.format( - "replace(dest{key=%d,value=%s})", entry.getKey(), entry.getValue())) + "replace(dest{key=%d,value=%s})", entry.getKey(), nullToNULL(entry.getValue()))) .collect(Collectors.toList())); assertThat(readLinesFromFiles("failures")).isEmpty(); @@ -268,21 +278,20 @@ public void transformRowsToDest() throws IOException { .waitUntilFinish(); assertThat(readAllRowsFromTable("dest")) - .containsExactlyEntriesIn( - TEST_DATA.entrySet().stream() - .collect( - Collectors.toMap( - (e) -> (e.getKey() * 100L), - (e) -> e.getValue().replace("World", "Jupiter")))); + .containsExactlyElementsIn( + TEST_DATA.stream() + .map(e->KV.of((e.getKey() * 100L), + e.getValue()==null ? null : e.getValue().replace("World", "Jupiter"))) + .collect(Collectors.toList())); assertThat(readLinesFromFiles("mutations")) .containsExactlyElementsIn( - TEST_DATA.entrySet().stream() + TEST_DATA.stream() .map( entry -> String.format( "insert(dest{key=%d,value=%s})", - entry.getKey() * 100, entry.getValue().replace("World", "Jupiter"))) + entry.getKey() * 100, entry.getValue()==null ? "NULL" : entry.getValue().replace("World", "Jupiter"))) .collect(Collectors.toList())); assertThat(readLinesFromFiles("failures")).isEmpty(); } @@ -311,8 +320,8 @@ public void catchingFailuresBadColumnNames() throws IOException { assertThat(readAllRowsFromTable("dest")).isEmpty(); final List expectedMutations = - TEST_DATA.keySet().stream() - .map(k -> String.format("insert(dest{notkey=%d,notvalue=hello})", k * 15)) + TEST_DATA.stream() + .map(e -> String.format("insert(dest{notkey=%d,notvalue=hello})", e.getKey() * 15)) .collect(Collectors.toList()); assertThat(readLinesFromFiles("mutations")).containsExactlyElementsIn(expectedMutations); assertThat(readLinesFromFiles("failures")).containsExactlyElementsIn(expectedMutations); @@ -378,15 +387,15 @@ public void catchingFailuresExistingRow() throws IOException { .waitUntilFinish(); assertThat(readAllRowsFromTable("dest")) - .containsExactly(1L, "existing row", 2L, "Goodbye World"); + .containsExactly(KV.of(1L, "existing row"), KV.of(2L, "Goodbye World"), KV.of(3L, null)); assertThat(readLinesFromFiles("mutations")) .containsExactlyElementsIn( - TEST_DATA.entrySet().stream() + TEST_DATA.stream() .map( entry -> String.format( - "insert(dest{key=%d,value=%s})", entry.getKey(), entry.getValue())) + "insert(dest{key=%d,value=%s})", entry.getKey(), nullToNULL(entry.getValue()))) .collect(Collectors.toList())); assertThat(readLinesFromFiles("failures")) @@ -404,13 +413,13 @@ public void deleteFromDest() throws IOException { .set("key") .to(1) .set("value") - .to("existing row") + .to("existing row - will delete") .build(), Mutation.newInsertBuilder("dest") .set("key") - .to(3) + .to(4) .set("value") - .to("existing row3") + .to("existing row - wont delete") .build())); ImmutableList args = @@ -431,29 +440,27 @@ public void deleteFromDest() throws IOException { .run() .waitUntilFinish(); - assertThat(readAllRowsFromTable("dest")).containsExactly(3L, "existing row3"); + assertThat(readAllRowsFromTable("dest")).containsExactly(KV.of(4L, "existing row - wont delete")); assertThat(readLinesFromFiles("mutations")) .containsExactlyElementsIn( - TEST_DATA.keySet().stream() - .map(k -> String.format("delete(dest{[%d]})", k)) + TEST_DATA.stream() + .map(e -> String.format("delete(dest{[%d]})", e.getKey())) .collect(Collectors.toList())); assertThat(readLinesFromFiles("failures")).isEmpty(); } - private ImmutableMap readAllRowsFromTable(String table) { - try (ResultSet resultSet = - emulator - .getDatabaseClient() - .singleUse() - .read(table, KeySet.all(), ImmutableList.of("key", "value"))) { - ImmutableMap.Builder rb = new ImmutableMap.Builder<>(); - while (resultSet.next()) { - Struct row = resultSet.getCurrentRowAsStruct(); - rb.put(row.getLong("key"), row.getString("value")); + private ImmutableList> readAllRowsFromTable(String table) { + try (ReadContext r = emulator.getDatabaseClient().singleUse()) { + try (ResultSet resultSet = r.read(table, KeySet.all(), ImmutableList.of("key", "value"))) { + ImmutableList.Builder> rb = new ImmutableList.Builder<>(); + while (resultSet.next()) { + Struct row = resultSet.getCurrentRowAsStruct(); + rb.add(KV.of(row.getLong("key"), row.isNull("value") ? null : row.getString("value"))); + } + return rb.build(); } - return rb.build(); } } diff --git a/src/test/java/com/google/cloud/solutions/StructToMutationTest.java b/src/test/java/com/google/cloud/solutions/StructToMutationTest.java index 06ff2f0..3561609 100644 --- a/src/test/java/com/google/cloud/solutions/StructToMutationTest.java +++ b/src/test/java/com/google/cloud/solutions/StructToMutationTest.java @@ -24,8 +24,9 @@ import com.google.cloud.spanner.Struct; import com.google.cloud.spanner.Type.Code; import com.google.cloud.spanner.Value; -import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import java.math.BigDecimal; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -58,7 +59,10 @@ private static Struct.Builder buildStruct(Struct.Builder structBuilder) { .set("timestampCol") .to(Timestamp.parseTimestamp("2022-02-14T16:32:00Z")) .set("dateCol") - .to(Date.parseDate("2022-03-30")); + .to(Date.parseDate("2022-03-30")) + .set("nullCol") + .to((Boolean) null); + return structBuilder; } @@ -132,6 +136,11 @@ private static void validateMutationCols(Mutation m, Struct s) { assertThat(colValueMap.get(colName).getType().getCode()).isEqualTo(Code.DATE); assertThat(colValueMap.get(colName).getDate()).isEqualTo(s.getDate(colName)); + colName = "nullCol"; + // null columns use String type as that is the most compatible... + assertThat(colValueMap.get(colName).getType().getCode()).isEqualTo(Code.STRING); + assertThat(colValueMap.get(colName).isNull()); + colName = "booleanArrayCol"; assertThat(colValueMap.get(colName).getType().getArrayElementType().getCode()) .isEqualTo(Code.BOOL); @@ -269,10 +278,16 @@ public void structToDeleteMutation() { assertThat(m.getOperation()).isEqualTo(Op.DELETE); assertThat(m.getTable()).isEqualTo("testTable5"); - List keyParts = - ImmutableList.copyOf(m.getKeySet().getKeys().iterator().next().getParts()); - assertThat(keyParts).hasSize(9); + // should have only one key. + assertThat(Iterables.size(m.getKeySet().getKeys())).isEqualTo(1); + + List keyParts = new ArrayList<>(); + Iterables.addAll(keyParts, m.getKeySet().getKeys().iterator().next().getParts()); + + // check num elements. + assertThat(keyParts.size()).isEqualTo(10); + // booleanCol assertThat(keyParts.get(0)).isEqualTo(Boolean.TRUE); // int64Col @@ -291,6 +306,8 @@ public void structToDeleteMutation() { assertThat(keyParts.get(7)).isEqualTo(Timestamp.parseTimestamp("2022-02-14T16:32:00Z")); // dateCol assertThat(keyParts.get(8)).isEqualTo(Date.parseDate("2022-03-30")); + // null value + assertThat(keyParts.get(9)).isNull(); } @Test