Skip to content

Commit

Permalink
Continuing review with Devin
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Nov 22, 2024
1 parent aa49f7f commit 2dacdf8
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.Assertions.assertThat;

class IcebergWriteInstructionsTest {

Expand All @@ -20,9 +20,9 @@ void testSetDhTables() {
.addTables(table1)
.addTables(table2)
.build();
assertThat(instructions.tables().size()).isEqualTo(2);
assertThat(instructions.tables().contains(table1)).isTrue();
assertThat(instructions.tables().contains(table2)).isTrue();
assertThat(instructions.tables()).hasSize(2);
assertThat(instructions.tables()).contains(table1);
assertThat(instructions.tables()).contains(table2);
}

@Test
Expand All @@ -44,8 +44,8 @@ void testSetPartitionPaths() {
.addTables(table1, table2)
.addPartitionPaths(pp1, pp2)
.build();
assertThat(instructions.partitionPaths().size()).isEqualTo(2);
assertThat(instructions.partitionPaths().contains(pp1)).isTrue();
assertThat(instructions.partitionPaths().contains(pp2)).isTrue();
assertThat(instructions.partitionPaths()).hasSize(2);
assertThat(instructions.partitionPaths()).contains(pp1);
assertThat(instructions.partitionPaths()).contains(pp2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ private static TableParquetWriterOptions.Builder instructions() {
@Test
void defaults() {
final TableParquetWriterOptions instructions = instructions().build();
assertThat(instructions.dataInstructions().isEmpty()).isTrue();
assertThat(instructions.dataInstructions()).isEmpty();
assertThat(instructions.compressionCodecName()).isEqualTo("SNAPPY");
assertThat(instructions.maximumDictionaryKeys()).isEqualTo(1048576);
assertThat(instructions.maximumDictionarySize()).isEqualTo(1048576);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,9 @@
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
Expand All @@ -69,12 +67,6 @@ public final class IcebergUtils {
// TODO (deephaven-core#6327) Add support for more types like ZonedDateTime, Big Decimals, and Lists
}

/**
* Characters to be used for generating random variable names of length {@link #VARIABLE_NAME_LENGTH}.
*/
private static final String CHARACTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
private static final int VARIABLE_NAME_LENGTH = 6;

/**
* Get a stream of all {@link DataFile} objects from the given {@link Table} and {@link Snapshot}.
*
Expand Down Expand Up @@ -325,121 +317,4 @@ public static void verifyPartitioningColumns(
}
}
}

/**
* Creates a list of {@link PartitionData} and corresponding update strings for Deephaven tables from partition
* paths and spec. Also, validates that the partition paths are compatible with the provided partition spec.
*
* @param partitionSpec The partition spec to use for validation.
* @param partitionPaths The list of partition paths to process.
* @return A pair containing a list of PartitionData objects and a list of update strings for Deephaven tables.
* @throws IllegalArgumentException if the partition paths are not compatible with the partition spec.
*/
public static Pair<List<PartitionData>, List<String[]>> partitionDataFromPaths(
final PartitionSpec partitionSpec,
final Collection<String> partitionPaths) {
final List<PartitionData> partitionDataList = new ArrayList<>(partitionPaths.size());
final List<String[]> dhTableUpdateStringList = new ArrayList<>(partitionPaths.size());
final int numPartitioningFields = partitionSpec.fields().size();
final QueryScope queryScope = ExecutionContext.getContext().getQueryScope();
for (final String partitionPath : partitionPaths) {
final String[] dhTableUpdateString = new String[numPartitioningFields];
try {
final String[] partitions = partitionPath.split("/", -1);
if (partitions.length != numPartitioningFields) {
throw new IllegalArgumentException("Expecting " + numPartitioningFields + " number of fields, " +
"found " + partitions.length);
}
final PartitionData partitionData = new PartitionData(partitionSpec.partitionType());
for (int colIdx = 0; colIdx < partitions.length; colIdx += 1) {
final String[] parts = partitions[colIdx].split("=", 2);
if (parts.length != 2) {
throw new IllegalArgumentException("Expecting key=value format, found " + partitions[colIdx]);
}
final PartitionField field = partitionSpec.fields().get(colIdx);
if (!field.name().equals(parts[0])) {
throw new IllegalArgumentException("Expecting field name " + field.name() + " at idx " +
colIdx + ", found " + parts[0]);
}
final Type type = partitionData.getType(colIdx);
dhTableUpdateString[colIdx] = getTableUpdateString(field.name(), type, parts[1], queryScope);
partitionData.set(colIdx, Conversions.fromPartitionString(partitionData.getType(colIdx), parts[1]));
}
} catch (final Exception e) {
throw new IllegalArgumentException("Failed to parse partition path: " + partitionPath + " using" +
" partition spec " + partitionSpec, e);
}
dhTableUpdateStringList.add(dhTableUpdateString);
partitionDataList.add(DataFiles.data(partitionSpec, partitionPath));
}
return new Pair<>(partitionDataList, dhTableUpdateStringList);
}

/**
* This method would convert a partitioning column info to a string which can be used in
* {@link io.deephaven.engine.table.Table#updateView(Collection) Table#updateView} method. For example, if the
* partitioning column of name "partitioningColumnName" if of type {@link Types.TimestampType} and the value is
* "2021-01-01T00:00:00Z", then this method would:
* <ul>
* <li>Add a new parameter to the query scope with a random name and value as {@link Instant} parsed from the string
* "2021-01-01T00:00:00Z"</li>
* <li>Return the string "partitioningColumnName = randomName"</li>
* </ul>
*
* @param colName The name of the partitioning column
* @param colType The type of the partitioning column
* @param value The value of the partitioning column
* @param queryScope The query scope to add the parameter to
*/
private static String getTableUpdateString(
@NotNull final String colName,
@NotNull final Type colType,
@NotNull final String value,
@NotNull final QueryScope queryScope) {
// Randomly generated name to be added to the query scope for each value to avoid repeated casts
// TODO Is this the right approach? Also, how would we clean up these params?
final String paramName = generateRandomAlphabetString(VARIABLE_NAME_LENGTH);
final Type.TypeID typeId = colType.typeId();
if (typeId == Type.TypeID.BOOLEAN) {
queryScope.putParam(paramName, Boolean.parseBoolean(value));
} else if (typeId == Type.TypeID.DOUBLE) {
queryScope.putParam(paramName, Double.parseDouble(value));
} else if (typeId == Type.TypeID.FLOAT) {
queryScope.putParam(paramName, Float.parseFloat(value));
} else if (typeId == Type.TypeID.INTEGER) {
queryScope.putParam(paramName, Integer.parseInt(value));
} else if (typeId == Type.TypeID.LONG) {
queryScope.putParam(paramName, Long.parseLong(value));
} else if (typeId == Type.TypeID.STRING) {
queryScope.putParam(paramName, value);
} else if (typeId == Type.TypeID.TIMESTAMP) {
final Types.TimestampType timestampType = (Types.TimestampType) colType;
if (timestampType == Types.TimestampType.withZone()) {
queryScope.putParam(paramName, Instant.parse(value));
} else {
queryScope.putParam(paramName, LocalDateTime.parse(value));
}
} else if (typeId == Type.TypeID.DATE) {
queryScope.putParam(paramName, LocalDate.parse(value));
} else if (typeId == Type.TypeID.TIME) {
queryScope.putParam(paramName, LocalTime.parse(value));
} else {
// TODO (deephaven-core#6327) Add support for more types like ZonedDateTime, Big Decimals
throw new TableDataException("Unsupported partitioning column type " + typeId.name());
}
return colName + " = " + paramName;
}

/**
* Generate a random string of length {@code length} using just alphabets.
*/
private static String generateRandomAlphabetString(final int length) {
final StringBuilder stringBuilder = new StringBuilder();
final Random random = new Random();
for (int i = 0; i < length; i++) {
final int index = random.nextInt(CHARACTERS.length());
stringBuilder.append(CHARACTERS.charAt(index));
}
return stringBuilder.toString();
}
}
Loading

0 comments on commit 2dacdf8

Please sign in to comment.