Skip to content

Commit

Permalink
Handle NULL column values. (#1)
Browse files Browse the repository at this point in the history
also update dependency versions.
  • Loading branch information
nielm authored Jun 5, 2023
1 parent 1ec213e commit b8157d3
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 157 deletions.
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<packaging>jar</packaging>

<properties>
<beam.version>2.36.0</beam.version>
<beam.version>2.46.0</beam.version>

<guava.version>31.0.1-jre</guava.version>
<hamcrest.version>2.1</hamcrest.version>
Expand All @@ -36,12 +36,12 @@
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
<maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
<maven-jar-plugin.version>3.0.2</maven-jar-plugin.version>
<slf4j.version>1.7.36</slf4j.version>
<slf4j.version>2.0.5</slf4j.version>
<maven-surefire-plugin.version>3.0.0-M5</maven-surefire-plugin.version>
<truth.version>1.1.3</truth.version>
<maven-checkstyle.version>3.1.2</maven-checkstyle.version>
<checkstyle.version>9.2.1</checkstyle.version>
<auto-value.version>1.8.2</auto-value.version>
<auto-value.version>1.10.1</auto-value.version>
</properties>

<build>
Expand Down
221 changes: 117 additions & 104 deletions src/main/java/com/google/cloud/solutions/StructToMutation.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,117 +75,130 @@ private void setMutationColumnValues(Struct row, WriteBuilder mutation) {
}
ValueBinder<WriteBuilder> pendingValue = mutation.set(columnName);

Value columnValue = row.getValue(colNum);
switch (columnValue.getType().getCode()) {
case BOOL:
pendingValue.to(columnValue.getBool());
break;
case INT64:
pendingValue.to(columnValue.getInt64());
break;
case NUMERIC:
pendingValue.to(columnValue.getNumeric());
break;
case FLOAT64:
pendingValue.to(columnValue.getFloat64());
break;
case STRING:
pendingValue.to(columnValue.getString());
break;
case JSON:
pendingValue.to(columnValue.getJson());
break;
case BYTES:
pendingValue.to(columnValue.getBytes());
break;
case TIMESTAMP:
pendingValue.to(columnValue.getTimestamp());
break;
case DATE:
pendingValue.to(columnValue.getDate());
break;
case ARRAY:
switch (columnValue.getType().getArrayElementType().getCode()) {
case BOOL:
pendingValue.toBoolArray(columnValue.getBoolArray());
break;
case INT64:
pendingValue.toInt64Array(columnValue.getInt64Array());
break;
case NUMERIC:
pendingValue.toNumericArray(columnValue.getNumericArray());
break;
case FLOAT64:
pendingValue.toFloat64Array(columnValue.getFloat64Array());
break;
case STRING:
pendingValue.toStringArray(columnValue.getStringArray());
break;
case JSON:
pendingValue.toJsonArray(columnValue.getJsonArray());
break;
case BYTES:
pendingValue.toBytesArray(columnValue.getBytesArray());
break;
case TIMESTAMP:
pendingValue.toTimestampArray(columnValue.getTimestampArray());
break;
case DATE:
pendingValue.toDateArray(columnValue.getDateArray());
break;
default:
throw new IllegalArgumentException(
"Unsupported array column value type in position: "
+ colNum
+ ": "
+ columnValue.getType().getArrayElementType().getCode().toString());
}
break;
default:
throw new IllegalArgumentException(
"Unsupported column type in position: "
+ colNum
+ ": "
+ columnValue.getType().getCode().toString());
Value columnValue = row.getValue(columnName);

if (columnValue.isNull()) {
pendingValue.to((String) null);
} else {

switch (columnValue.getType().getCode()) {
case BOOL:
pendingValue.to(columnValue.getBool());
break;
case INT64:
pendingValue.to(columnValue.getInt64());
break;
case NUMERIC:
pendingValue.to(columnValue.getNumeric());
break;
case FLOAT64:
pendingValue.to(columnValue.getFloat64());
break;
case STRING:
pendingValue.to(columnValue.getString());
break;
case JSON:
pendingValue.to(columnValue.getJson());
break;
case BYTES:
pendingValue.to(columnValue.getBytes());
break;
case TIMESTAMP:
pendingValue.to(columnValue.getTimestamp());
break;
case DATE:
pendingValue.to(columnValue.getDate());
break;
case ARRAY:
switch (columnValue.getType().getArrayElementType().getCode()) {
case BOOL:
pendingValue.toBoolArray(columnValue.getBoolArray());
break;
case INT64:
pendingValue.toInt64Array(columnValue.getInt64Array());
break;
case NUMERIC:
pendingValue.toNumericArray(columnValue.getNumericArray());
break;
case FLOAT64:
pendingValue.toFloat64Array(columnValue.getFloat64Array());
break;
case STRING:
pendingValue.toStringArray(columnValue.getStringArray());
break;
case JSON:
pendingValue.toJsonArray(columnValue.getJsonArray());
break;
case BYTES:
pendingValue.toBytesArray(columnValue.getBytesArray());
break;
case TIMESTAMP:
pendingValue.toTimestampArray(columnValue.getTimestampArray());
break;
case DATE:
pendingValue.toDateArray(columnValue.getDateArray());
break;
default:
throw new IllegalArgumentException(
"Unsupported array column value type in position: "
+ colNum
+ ": "
+ columnValue.getType().getArrayElementType().getCode().toString());
}
break;
default:
throw new IllegalArgumentException(
"Unsupported column type in position: "
+ colNum
+ ": "
+ columnValue.getType().getCode().toString());
}
}
}
}

private Key buildKeyFromStruct(Struct row) {
List<StructField> cols = row.getType().getStructFields();
Key.Builder keyBuilder = Key.newBuilder();
for (int colNum = 0; colNum < row.getColumnCount(); colNum++) {
Value value = row.getValue(colNum);
switch (value.getType().getCode()) {
case BOOL:
keyBuilder.append(value.getBool());
break;
case INT64:
keyBuilder.append(value.getInt64());
break;
case NUMERIC:
keyBuilder.append(value.getNumeric());
break;
case FLOAT64:
keyBuilder.append(value.getFloat64());
break;
case STRING:
keyBuilder.append(value.getString());
break;
case BYTES:
keyBuilder.append(value.getBytes());
break;
case TIMESTAMP:
keyBuilder.append(value.getTimestamp());
break;
case DATE:
keyBuilder.append(value.getDate());
break;
default:
throw new IllegalArgumentException(
"Unsupported key column type in position: "
+ colNum
+ ": "
+ value.getType().getCode());
Value value = row.getValue(cols.get(colNum).getName());

if (value.isNull()) {
keyBuilder.append((String) null);
} else {

switch (value.getType().getCode()) {
case BOOL:
keyBuilder.append(value.getBool());
break;
case INT64:
keyBuilder.append(value.getInt64());
break;
case NUMERIC:
keyBuilder.append(value.getNumeric());
break;
case FLOAT64:
keyBuilder.append(value.getFloat64());
break;
case STRING:
keyBuilder.append(value.getString());
break;
case BYTES:
keyBuilder.append(value.getBytes());
break;
case TIMESTAMP:
keyBuilder.append(value.getTimestamp());
break;
case DATE:
keyBuilder.append(value.getDate());
break;
default:
throw new IllegalArgumentException(
"Unsupported key column type in position: "
+ colNum
+ ": "
+ value.getType().getCode());
}
}
}
return keyBuilder.build();
Expand Down
Loading

0 comments on commit b8157d3

Please sign in to comment.