From 2dacdf8bc6c91d8e17969b091207944d347d14e2 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Fri, 22 Nov 2024 21:47:26 +0530 Subject: [PATCH] Continuing review with Devin --- .../util/IcebergWriteInstructionsTest.java | 14 +- .../util/TableParquetWriterOptionsTest.java | 2 +- .../deephaven/iceberg/base/IcebergUtils.java | 125 ----------- .../iceberg/util/IcebergTableWriter.java | 198 +++++++++++++++--- .../iceberg/util/TableWriterOptions.java | 4 +- .../iceberg/junit5/SqliteCatalogBase.java | 10 + .../parquet/table/ParquetInstructions.java | 5 +- 7 files changed, 186 insertions(+), 172 deletions(-) diff --git a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergWriteInstructionsTest.java b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergWriteInstructionsTest.java index 327c27fa493..631fea540f1 100644 --- a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergWriteInstructionsTest.java +++ b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergWriteInstructionsTest.java @@ -8,7 +8,7 @@ import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown; -import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.Assertions.assertThat; class IcebergWriteInstructionsTest { @@ -20,9 +20,9 @@ void testSetDhTables() { .addTables(table1) .addTables(table2) .build(); - assertThat(instructions.tables().size()).isEqualTo(2); - assertThat(instructions.tables().contains(table1)).isTrue(); - assertThat(instructions.tables().contains(table2)).isTrue(); + assertThat(instructions.tables()).hasSize(2); + assertThat(instructions.tables()).contains(table1); + assertThat(instructions.tables()).contains(table2); } @Test @@ -44,8 +44,8 @@ void testSetPartitionPaths() { .addTables(table1, table2) .addPartitionPaths(pp1, pp2) .build(); - assertThat(instructions.partitionPaths().size()).isEqualTo(2); - assertThat(instructions.partitionPaths().contains(pp1)).isTrue(); - assertThat(instructions.partitionPaths().contains(pp2)).isTrue(); + assertThat(instructions.partitionPaths()).hasSize(2); + assertThat(instructions.partitionPaths()).contains(pp1); + assertThat(instructions.partitionPaths()).contains(pp2); } } diff --git a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/TableParquetWriterOptionsTest.java b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/TableParquetWriterOptionsTest.java index 287a883e4c4..548dba09121 100644 --- a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/TableParquetWriterOptionsTest.java +++ b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/TableParquetWriterOptionsTest.java @@ -25,7 +25,7 @@ private static TableParquetWriterOptions.Builder instructions() { @Test void defaults() { final TableParquetWriterOptions instructions = instructions().build(); - assertThat(instructions.dataInstructions().isEmpty()).isTrue(); + assertThat(instructions.dataInstructions()).isEmpty(); assertThat(instructions.compressionCodecName()).isEqualTo("SNAPPY"); assertThat(instructions.maximumDictionaryKeys()).isEqualTo(1048576); assertThat(instructions.maximumDictionarySize()).isEqualTo(1048576); diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java index 3908bbe959b..925c7a3a923 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java @@ -41,11 +41,9 @@ import java.time.LocalTime; import java.util.ArrayList; import java.util.Collection; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -69,12 +67,6 @@ public final class IcebergUtils { // TODO (deephaven-core#6327) Add support for more types like ZonedDateTime, Big Decimals, and Lists } - /** - * Characters to be used for generating random variable names of length {@link #VARIABLE_NAME_LENGTH}. - */ - private static final String CHARACTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; - private static final int VARIABLE_NAME_LENGTH = 6; - /** * Get a stream of all {@link DataFile} objects from the given {@link Table} and {@link Snapshot}. * @@ -325,121 +317,4 @@ public static void verifyPartitioningColumns( } } } - - /** - * Creates a list of {@link PartitionData} and corresponding update strings for Deephaven tables from partition - * paths and spec. Also, validates that the partition paths are compatible with the provided partition spec. - * - * @param partitionSpec The partition spec to use for validation. - * @param partitionPaths The list of partition paths to process. - * @return A pair containing a list of PartitionData objects and a list of update strings for Deephaven tables. - * @throws IllegalArgumentException if the partition paths are not compatible with the partition spec. - */ - public static Pair, List> partitionDataFromPaths( - final PartitionSpec partitionSpec, - final Collection partitionPaths) { - final List partitionDataList = new ArrayList<>(partitionPaths.size()); - final List dhTableUpdateStringList = new ArrayList<>(partitionPaths.size()); - final int numPartitioningFields = partitionSpec.fields().size(); - final QueryScope queryScope = ExecutionContext.getContext().getQueryScope(); - for (final String partitionPath : partitionPaths) { - final String[] dhTableUpdateString = new String[numPartitioningFields]; - try { - final String[] partitions = partitionPath.split("/", -1); - if (partitions.length != numPartitioningFields) { - throw new IllegalArgumentException("Expecting " + numPartitioningFields + " number of fields, " + - "found " + partitions.length); - } - final PartitionData partitionData = new PartitionData(partitionSpec.partitionType()); - for (int colIdx = 0; colIdx < partitions.length; colIdx += 1) { - final String[] parts = partitions[colIdx].split("=", 2); - if (parts.length != 2) { - throw new IllegalArgumentException("Expecting key=value format, found " + partitions[colIdx]); - } - final PartitionField field = partitionSpec.fields().get(colIdx); - if (!field.name().equals(parts[0])) { - throw new IllegalArgumentException("Expecting field name " + field.name() + " at idx " + - colIdx + ", found " + parts[0]); - } - final Type type = partitionData.getType(colIdx); - dhTableUpdateString[colIdx] = getTableUpdateString(field.name(), type, parts[1], queryScope); - partitionData.set(colIdx, Conversions.fromPartitionString(partitionData.getType(colIdx), parts[1])); - } - } catch (final Exception e) { - throw new IllegalArgumentException("Failed to parse partition path: " + partitionPath + " using" + - " partition spec " + partitionSpec, e); - } - dhTableUpdateStringList.add(dhTableUpdateString); - partitionDataList.add(DataFiles.data(partitionSpec, partitionPath)); - } - return new Pair<>(partitionDataList, dhTableUpdateStringList); - } - - /** - * This method would convert a partitioning column info to a string which can be used in - * {@link io.deephaven.engine.table.Table#updateView(Collection) Table#updateView} method. For example, if the - * partitioning column of name "partitioningColumnName" if of type {@link Types.TimestampType} and the value is - * "2021-01-01T00:00:00Z", then this method would: - *
    - *
  • Add a new parameter to the query scope with a random name and value as {@link Instant} parsed from the string - * "2021-01-01T00:00:00Z"
  • - *
  • Return the string "partitioningColumnName = randomName"
  • - *
- * - * @param colName The name of the partitioning column - * @param colType The type of the partitioning column - * @param value The value of the partitioning column - * @param queryScope The query scope to add the parameter to - */ - private static String getTableUpdateString( - @NotNull final String colName, - @NotNull final Type colType, - @NotNull final String value, - @NotNull final QueryScope queryScope) { - // Randomly generated name to be added to the query scope for each value to avoid repeated casts - // TODO Is this the right approach? Also, how would we clean up these params? - final String paramName = generateRandomAlphabetString(VARIABLE_NAME_LENGTH); - final Type.TypeID typeId = colType.typeId(); - if (typeId == Type.TypeID.BOOLEAN) { - queryScope.putParam(paramName, Boolean.parseBoolean(value)); - } else if (typeId == Type.TypeID.DOUBLE) { - queryScope.putParam(paramName, Double.parseDouble(value)); - } else if (typeId == Type.TypeID.FLOAT) { - queryScope.putParam(paramName, Float.parseFloat(value)); - } else if (typeId == Type.TypeID.INTEGER) { - queryScope.putParam(paramName, Integer.parseInt(value)); - } else if (typeId == Type.TypeID.LONG) { - queryScope.putParam(paramName, Long.parseLong(value)); - } else if (typeId == Type.TypeID.STRING) { - queryScope.putParam(paramName, value); - } else if (typeId == Type.TypeID.TIMESTAMP) { - final Types.TimestampType timestampType = (Types.TimestampType) colType; - if (timestampType == Types.TimestampType.withZone()) { - queryScope.putParam(paramName, Instant.parse(value)); - } else { - queryScope.putParam(paramName, LocalDateTime.parse(value)); - } - } else if (typeId == Type.TypeID.DATE) { - queryScope.putParam(paramName, LocalDate.parse(value)); - } else if (typeId == Type.TypeID.TIME) { - queryScope.putParam(paramName, LocalTime.parse(value)); - } else { - // TODO (deephaven-core#6327) Add support for more types like ZonedDateTime, Big Decimals - throw new TableDataException("Unsupported partitioning column type " + typeId.name()); - } - return colName + " = " + paramName; - } - - /** - * Generate a random string of length {@code length} using just alphabets. - */ - private static String generateRandomAlphabetString(final int length) { - final StringBuilder stringBuilder = new StringBuilder(); - final Random random = new Random(); - for (int i = 0; i < length; i++) { - final int index = random.nextInt(CHARACTERS.length()); - stringBuilder.append(CHARACTERS.charAt(index)); - } - return stringBuilder.toString(); - } } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java index c798a559ff3..97aec357c9f 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java @@ -5,19 +5,25 @@ import io.deephaven.base.Pair; import io.deephaven.base.verify.Require; +import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.context.QueryScope; +import io.deephaven.engine.context.StandaloneQueryScope; import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; +import io.deephaven.engine.table.impl.locations.TableDataException; import io.deephaven.parquet.table.CompletedParquetWrite; import io.deephaven.parquet.table.ParquetInstructions; import io.deephaven.parquet.table.ParquetTools; import io.deephaven.iceberg.util.SchemaProviderInternal.SchemaProviderImpl; +import io.deephaven.util.SafeCloseable; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.PartitionData; +import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.TableMetadata; @@ -28,17 +34,22 @@ import org.apache.iceberg.mapping.MappedField; import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; -import static io.deephaven.iceberg.base.IcebergUtils.partitionDataFromPaths; import static io.deephaven.iceberg.base.IcebergUtils.verifyPartitioningColumns; import static io.deephaven.iceberg.base.IcebergUtils.verifyRequiredFields; @@ -58,9 +69,6 @@ public class IcebergTableWriter { */ private final org.apache.iceberg.Table table; - @Nullable - private final TableMetadata tableMetadata; - /** * The table definition used for all writes by this writer instance. */ @@ -78,16 +86,15 @@ public class IcebergTableWriter { private final Map fieldIdToColumnName; /** - * Initialized lazily from the {@value TableProperties#DEFAULT_NAME_MAPPING} property in the table metadata, if - * needed. + * The factory to create new output file locations for writing data files. */ - @Nullable - private Map nameMappingDefault; + private final OutputFileFactory outputFileFactory; /** - * The factory to create new output file locations for writing data files. + * Characters to be used for generating random variable names of length {@link #VARIABLE_NAME_LENGTH}. */ - private final OutputFileFactory outputFileFactory; + private static final String CHARACTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + private static final int VARIABLE_NAME_LENGTH = 6; IcebergTableWriter( final TableWriterOptions tableWriterOptions, @@ -95,12 +102,6 @@ public class IcebergTableWriter { this.tableWriterOptions = verifyWriterOptions(tableWriterOptions); this.table = tableAdapter.icebergTable(); - if (table instanceof HasTableOperations) { - tableMetadata = ((HasTableOperations) table).operations().current(); - } else { - tableMetadata = null; - } - this.tableDefinition = tableWriterOptions.tableDefinition(); verifyRequiredFields(table.schema(), tableDefinition); verifyPartitioningColumns(table.spec(), tableDefinition); @@ -111,7 +112,7 @@ public class IcebergTableWriter { // Create a copy of the fieldIdToColumnName map since we might need to add new entries for columns which are not // provided by the user. this.fieldIdToColumnName = new HashMap<>(tableWriterOptions.fieldIdToColumnName()); - addFieldIdsForAllColumns(tableWriterOptions); + addFieldIdsForAllColumns(); outputFileFactory = OutputFileFactory.builderFor(table, 0, 0) .format(FileFormat.PARQUET) @@ -147,8 +148,9 @@ private static void verifyFieldIdsInSchema(final Collection fieldIds, f * Populate the {@link #fieldIdToColumnName} map for all the columns in the {@link #tableDefinition} and do * additional checks to ensure that the table definition is compatible with schema provided by user. */ - private void addFieldIdsForAllColumns(final TableWriterOptions tableWriterOptions) { + private void addFieldIdsForAllColumns() { final Map dhColumnNameToFieldId = tableWriterOptions.dhColumnNameToFieldId(); + Map nameMappingDefault = null; // Lazily initialized for (final ColumnDefinition columnDefinition : tableDefinition.getColumns()) { final String columnName = columnDefinition.getName(); @@ -162,7 +164,10 @@ private void addFieldIdsForAllColumns(final TableWriterOptions tableWriterOption Types.NestedField nestedField; // Check in the schema.name_mapping.default map - fieldId = lazyNameMappingDefault().get(columnName); + if (nameMappingDefault == null) { + nameMappingDefault = readNameMappingDefault(); + } + fieldId = nameMappingDefault.get(columnName); if (fieldId != null) { nestedField = userSchema.findField(fieldId); if (nestedField == null) { @@ -195,19 +200,20 @@ private void addFieldIdsForAllColumns(final TableWriterOptions tableWriterOption *

* Return an empty map if the table metadata is null or the mapping is not present in the table metadata. */ - private Map lazyNameMappingDefault() { - if (nameMappingDefault != null) { - return nameMappingDefault; - } - if (tableMetadata == null) { - return nameMappingDefault = Map.of(); + private Map readNameMappingDefault() { + final TableMetadata tableMetadata; + if (table instanceof HasTableOperations) { + tableMetadata = ((HasTableOperations) table).operations().current(); + } else { + // TableMetadata is not available, so nothing to add to the map + return Map.of(); } final String nameMappingJson = tableMetadata.property(TableProperties.DEFAULT_NAME_MAPPING, null); if (nameMappingJson == null) { - return nameMappingDefault = Map.of(); + return Map.of(); } // Iterate over all mapped fields and build a reverse map from column name to field ID - nameMappingDefault = new HashMap<>(); + final Map nameMappingDefault = new HashMap<>(); final NameMapping nameMapping = NameMappingParser.fromJson(nameMappingJson); for (final MappedField field : nameMapping.asMappedFields().fields()) { final Integer fieldId = field.id(); @@ -241,11 +247,17 @@ public void append(@NotNull final IcebergWriteInstructions writeInstructions) { public List writeDataFiles(@NotNull final IcebergWriteInstructions writeInstructions) { final List partitionPaths = writeInstructions.partitionPaths(); verifyPartitionPaths(table, partitionPaths); - final Pair, List> ret = partitionDataFromPaths(table.spec(), partitionPaths); - final List partitionData = ret.getFirst(); - final List dhTableUpdateStrings = ret.getSecond(); - final List parquetFileInfo = - writeParquet(partitionData, dhTableUpdateStrings, writeInstructions); + final List partitionData; + final List parquetFileInfo; + // Start a new query scope to avoid polluting the existing query scope with new parameters added for + // partitioning columns + try (final SafeCloseable _ignore = + ExecutionContext.getContext().withQueryScope(new StandaloneQueryScope()).open()) { + final Pair, List> ret = partitionDataFromPaths(table.spec(), partitionPaths); + partitionData = ret.getFirst(); + final List dhTableUpdateStrings = ret.getSecond(); + parquetFileInfo = writeParquet(partitionData, dhTableUpdateStrings, writeInstructions); + } return dataFilesFromParquet(parquetFileInfo, partitionData); } @@ -260,6 +272,123 @@ private static void verifyPartitionPaths( } } + /** + * Creates a list of {@link PartitionData} and corresponding update strings for Deephaven tables from partition + * paths and spec. Also, validates that the partition paths are compatible with the provided partition spec. + * + * @param partitionSpec The partition spec to use for validation. + * @param partitionPaths The list of partition paths to process. + * @return A pair containing a list of PartitionData objects and a list of update strings for Deephaven tables. + * @throws IllegalArgumentException if the partition paths are not compatible with the partition spec. + */ + private static Pair, List> partitionDataFromPaths( + final PartitionSpec partitionSpec, + final Collection partitionPaths) { + final List partitionDataList = new ArrayList<>(partitionPaths.size()); + final List dhTableUpdateStringList = new ArrayList<>(partitionPaths.size()); + final int numPartitioningFields = partitionSpec.fields().size(); + final QueryScope queryScope = ExecutionContext.getContext().getQueryScope(); + for (final String partitionPath : partitionPaths) { + final String[] dhTableUpdateString = new String[numPartitioningFields]; + try { + final String[] partitions = partitionPath.split("/", -1); + if (partitions.length != numPartitioningFields) { + throw new IllegalArgumentException("Expecting " + numPartitioningFields + " number of fields, " + + "found " + partitions.length); + } + final PartitionData partitionData = new PartitionData(partitionSpec.partitionType()); + for (int colIdx = 0; colIdx < partitions.length; colIdx += 1) { + final String[] parts = partitions[colIdx].split("=", 2); + if (parts.length != 2) { + throw new IllegalArgumentException("Expecting key=value format, found " + partitions[colIdx]); + } + final PartitionField field = partitionSpec.fields().get(colIdx); + if (!field.name().equals(parts[0])) { + throw new IllegalArgumentException("Expecting field name " + field.name() + " at idx " + + colIdx + ", found " + parts[0]); + } + final Type type = partitionData.getType(colIdx); + dhTableUpdateString[colIdx] = getTableUpdateString(field.name(), type, parts[1], queryScope); + partitionData.set(colIdx, Conversions.fromPartitionString(partitionData.getType(colIdx), parts[1])); + } + } catch (final Exception e) { + throw new IllegalArgumentException("Failed to parse partition path: " + partitionPath + " using" + + " partition spec " + partitionSpec, e); + } + dhTableUpdateStringList.add(dhTableUpdateString); + partitionDataList.add(DataFiles.data(partitionSpec, partitionPath)); + } + return new Pair<>(partitionDataList, dhTableUpdateStringList); + } + + /** + * This method would convert a partitioning column info to a string which can be used in + * {@link io.deephaven.engine.table.Table#updateView(Collection) Table#updateView} method. For example, if the + * partitioning column of name "partitioningColumnName" if of type {@link Types.TimestampType} and the value is + * "2021-01-01T00:00:00Z", then this method would: + *

    + *
  • Add a new parameter to the query scope with a random name and value as {@link Instant} parsed from the string + * "2021-01-01T00:00:00Z"
  • + *
  • Return the string "partitioningColumnName = randomName"
  • + *
+ * + * @param colName The name of the partitioning column + * @param colType The type of the partitioning column + * @param value The value of the partitioning column + * @param queryScope The query scope to add the parameter to + */ + private static String getTableUpdateString( + @NotNull final String colName, + @NotNull final Type colType, + @NotNull final String value, + @NotNull final QueryScope queryScope) { + // Randomly generated name to be added to the query scope for each value to avoid repeated casts + // TODO(deephaven-core#6418): Find a better way to handle these table updates instead of using query scope + final String paramName = generateRandomAlphabetString(VARIABLE_NAME_LENGTH); + final Type.TypeID typeId = colType.typeId(); + if (typeId == Type.TypeID.BOOLEAN) { + queryScope.putParam(paramName, Boolean.parseBoolean(value)); + } else if (typeId == Type.TypeID.DOUBLE) { + queryScope.putParam(paramName, Double.parseDouble(value)); + } else if (typeId == Type.TypeID.FLOAT) { + queryScope.putParam(paramName, Float.parseFloat(value)); + } else if (typeId == Type.TypeID.INTEGER) { + queryScope.putParam(paramName, Integer.parseInt(value)); + } else if (typeId == Type.TypeID.LONG) { + queryScope.putParam(paramName, Long.parseLong(value)); + } else if (typeId == Type.TypeID.STRING) { + queryScope.putParam(paramName, value); + } else if (typeId == Type.TypeID.TIMESTAMP) { + final Types.TimestampType timestampType = (Types.TimestampType) colType; + if (timestampType == Types.TimestampType.withZone()) { + queryScope.putParam(paramName, Instant.parse(value)); + } else { + queryScope.putParam(paramName, LocalDateTime.parse(value)); + } + } else if (typeId == Type.TypeID.DATE) { + queryScope.putParam(paramName, LocalDate.parse(value)); + } else if (typeId == Type.TypeID.TIME) { + queryScope.putParam(paramName, LocalTime.parse(value)); + } else { + // TODO (deephaven-core#6327) Add support for more types like ZonedDateTime, Big Decimals + throw new TableDataException("Unsupported partitioning column type " + typeId.name()); + } + return colName + " = " + paramName; + } + + /** + * Generate a random string of length {@code length} using just alphabets. + */ + private static String generateRandomAlphabetString(final int length) { + final StringBuilder stringBuilder = new StringBuilder(); + final Random random = new Random(); + for (int i = 0; i < length; i++) { + final int index = random.nextInt(CHARACTERS.length()); + stringBuilder.append(CHARACTERS.charAt(index)); + } + return stringBuilder.toString(); + } + @NotNull private List writeParquet( @NotNull final List partitionDataList, @@ -272,6 +401,9 @@ private List writeParquet( partitionDataList.size(), "partitionDataList.size()"); Require.eq(dhTables.size(), "dhTables.size()", dhTableUpdateStrings.size(), "dhTableUpdateStrings.size()"); + } else { + Require.eqZero(partitionDataList.size(), "partitionDataList.size()"); + Require.eqZero(dhTableUpdateStrings.size(), "dhTableUpdateStrings.size()"); } // Build the parquet instructions diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/TableWriterOptions.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/TableWriterOptions.java index 7fa8cc7a64a..cdcea9cd70a 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/TableWriterOptions.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/TableWriterOptions.java @@ -34,9 +34,9 @@ public abstract class TableWriterOptions { * {@link #fieldIdToColumnName()} to map Deephaven columns from {@link #tableDefinition()} to Iceberg columns. If * {@link #fieldIdToColumnName()} is not provided, the mapping is done by column name. *

- * Users can specify how to extract the schema in multiple ways (by ID, snapshot ID, initial schema, etc.). + * Users can specify how to extract the schema in multiple ways (by schema ID, snapshot ID, etc.). *

- * If not provided, we use the current schema from the table. + * Defaults to {@link SchemaProvider#current()}, which means use the current schema from the table. */ @Value.Default public SchemaProvider schemaProvider() { diff --git a/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java b/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java index 53c2393a677..7b8ce0768e1 100644 --- a/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java +++ b/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java @@ -399,6 +399,16 @@ void testFailureInWrite() { assertThat(e).cause().isInstanceOf(FormulaEvaluationException.class); } + try { + final IcebergTableWriter badWriter = tableAdapter.tableWriter(writerOptionsBuilder() + .tableDefinition(TableDefinition.of(ColumnDefinition.ofDouble("doubleCol"))) + .build()); + failBecauseExceptionWasNotThrown(UncheckedDeephavenException.class); + } catch (IllegalArgumentException e) { + // Exception expected because "doubleCol" is not present in the table + assertThat(e).hasMessageContaining("Column doubleCol not found in the schema"); + } + // Make sure existing good data is not deleted assertThat(catalogAdapter.listNamespaces()).contains(myNamespace); assertThat(catalogAdapter.listTables(myNamespace)).containsExactly(tableIdentifier); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java index 1036bbacf89..d8fa3ff9aa7 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java @@ -3,7 +3,6 @@ // package io.deephaven.parquet.table; -import gnu.trove.map.hash.TIntObjectHashMap; import io.deephaven.api.util.NameValidator; import io.deephaven.base.verify.Require; import io.deephaven.configuration.Configuration; @@ -203,8 +202,7 @@ public abstract ParquetInstructions withTableDefinitionAndLayout(final TableDefi /** * @return A callback to be executed when on completing each parquet data file write (excluding the index and - * metadata files). This callback gets invoked by the writing thread in a linear fashion. The consumer is - * responsible for thread safety. + * metadata files). This callback gets invoked by the writing thread in a linear fashion. */ public abstract Optional onWriteCompleted(); @@ -711,7 +709,6 @@ public static class Builder { private TableDefinition tableDefinition; private Collection> indexColumns; private OnWriteCompleted onWriteCompleted; - private TIntObjectHashMap usedFieldIdToColumn; /** * For each additional field added, make sure to update the copy constructor builder