diff --git a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergWriteInstructionsTest.java b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergWriteInstructionsTest.java new file mode 100644 index 00000000000..327c27fa493 --- /dev/null +++ b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergWriteInstructionsTest.java @@ -0,0 +1,51 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg.util; + +import io.deephaven.engine.table.Table; +import io.deephaven.engine.util.TableTools; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +class IcebergWriteInstructionsTest { + + @Test + void testSetDhTables() { + final Table table1 = TableTools.emptyTable(3); + final Table table2 = TableTools.emptyTable(4); + final IcebergWriteInstructions instructions = IcebergWriteInstructions.builder() + .addTables(table1) + .addTables(table2) + .build(); + assertThat(instructions.tables().size()).isEqualTo(2); + assertThat(instructions.tables().contains(table1)).isTrue(); + assertThat(instructions.tables().contains(table2)).isTrue(); + } + + @Test + void testSetPartitionPaths() { + final Table table1 = TableTools.emptyTable(3); + final String pp1 = "P1C=1/PC2=2"; + final Table table2 = TableTools.emptyTable(4); + final String pp2 = "P1C=2/PC2=3"; + try { + final IcebergWriteInstructions instructions = IcebergWriteInstructions.builder() + .addPartitionPaths(pp1, pp2) + .build(); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (final IllegalArgumentException e) { + assertThat(e).hasMessageContaining("Partition path must be provided for each table"); + } + + final IcebergWriteInstructions instructions = IcebergWriteInstructions.builder() + .addTables(table1, table2) + .addPartitionPaths(pp1, pp2) + .build(); + assertThat(instructions.partitionPaths().size()).isEqualTo(2); + assertThat(instructions.partitionPaths().contains(pp1)).isTrue(); + assertThat(instructions.partitionPaths().contains(pp2)).isTrue(); + } +} diff --git a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergParquetWriteInstructionsTest.java b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/TableParquetWriterOptionsTest.java similarity index 61% rename from extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergParquetWriteInstructionsTest.java rename to extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/TableParquetWriterOptionsTest.java index bce9940d2b9..287a883e4c4 100644 --- a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergParquetWriteInstructionsTest.java +++ b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/TableParquetWriterOptionsTest.java @@ -4,9 +4,7 @@ package io.deephaven.iceberg.util; import io.deephaven.engine.table.ColumnDefinition; -import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; -import io.deephaven.engine.util.TableTools; import io.deephaven.parquet.table.ParquetInstructions; import org.junit.jupiter.api.Test; @@ -15,28 +13,36 @@ import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; -class IcebergParquetWriteInstructionsTest { +class TableParquetWriterOptionsTest { /** - * Create a new IcebergParquetWriteInstructions builder with an empty table. + * Create a new TableParquetWriterOptions builder with an empty table definition. */ - private static IcebergParquetWriteInstructions.Builder instructions() { - return IcebergParquetWriteInstructions.builder().addTables(TableTools.emptyTable(0)); + private static TableParquetWriterOptions.Builder instructions() { + return TableParquetWriterOptions.builder().tableDefinition(TableDefinition.of()); } @Test void defaults() { - final IcebergParquetWriteInstructions instructions = instructions().build(); - assertThat(instructions.tableDefinition().isEmpty()).isTrue(); + final TableParquetWriterOptions instructions = instructions().build(); assertThat(instructions.dataInstructions().isEmpty()).isTrue(); assertThat(instructions.compressionCodecName()).isEqualTo("SNAPPY"); assertThat(instructions.maximumDictionaryKeys()).isEqualTo(1048576); assertThat(instructions.maximumDictionarySize()).isEqualTo(1048576); assertThat(instructions.targetPageSize()).isEqualTo(65536); - assertThat(instructions.tables().isEmpty()).isFalse(); - assertThat(instructions.partitionPaths().isEmpty()).isTrue(); - assertThat(instructions.snapshot()).isEmpty(); - assertThat(instructions.snapshotId()).isEmpty(); + } + + @Test + void testSetTableDefinition() { + final TableDefinition definition = TableDefinition.of( + ColumnDefinition.ofInt("PC1").withPartitioning(), + ColumnDefinition.ofInt("PC2").withPartitioning(), + ColumnDefinition.ofLong("I")); + assertThat(TableParquetWriterOptions.builder() + .tableDefinition(definition) + .build() + .tableDefinition()) + .isEqualTo(definition); } @Test @@ -114,7 +120,7 @@ void testMinTargetPageSize() { @Test void toParquetInstructionTest() { - final IcebergParquetWriteInstructions writeInstructions = instructions() + final TableParquetWriterOptions writeInstructions = instructions() .compressionCodecName("GZIP") .maximumDictionaryKeys(100) .maximumDictionarySize(200) @@ -138,49 +144,4 @@ void toParquetInstructionTest() { assertThat(parquetInstructions.onWriteCompleted()).isEmpty(); assertThat(parquetInstructions.getTableDefinition()).hasValue(definition); } - - @Test - void testSetSnapshotID() { - final IcebergParquetWriteInstructions instructions = instructions() - .snapshotId(12345) - .build(); - assertThat(instructions.snapshotId().getAsLong()).isEqualTo(12345); - } - - @Test - void testSetDhTables() { - final Table table1 = TableTools.emptyTable(3); - final Table table2 = TableTools.emptyTable(4); - final IcebergParquetWriteInstructions instructions = IcebergParquetWriteInstructions.builder() - .addTables(table1) - .addTables(table2) - .build(); - assertThat(instructions.tables().size()).isEqualTo(2); - assertThat(instructions.tables().contains(table1)).isTrue(); - assertThat(instructions.tables().contains(table2)).isTrue(); - } - - @Test - void testSetPartitionPaths() { - final Table table1 = TableTools.emptyTable(3); - final String pp1 = "P1C=1/PC2=2"; - final Table table2 = TableTools.emptyTable(4); - final String pp2 = "P1C=2/PC2=3"; - try { - final IcebergParquetWriteInstructions instructions = IcebergParquetWriteInstructions.builder() - .addPartitionPaths(pp1, pp2) - .build(); - failBecauseExceptionWasNotThrown(IllegalArgumentException.class); - } catch (final IllegalArgumentException e) { - assertThat(e).hasMessageContaining("Partition path must be provided for each table"); - } - - final IcebergParquetWriteInstructions instructions = IcebergParquetWriteInstructions.builder() - .addTables(table1, table2) - .addPartitionPaths(pp1, pp2) - .build(); - assertThat(instructions.partitionPaths().size()).isEqualTo(2); - assertThat(instructions.partitionPaths().contains(pp1)).isTrue(); - assertThat(instructions.partitionPaths().contains(pp2)).isTrue(); - } } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergBaseInstructions.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergBaseInstructions.java deleted file mode 100644 index c8ddc7e9ba4..00000000000 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergBaseInstructions.java +++ /dev/null @@ -1,70 +0,0 @@ -// -// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending -// -package io.deephaven.iceberg.util; - -import io.deephaven.engine.table.TableDefinition; -import org.apache.iceberg.Snapshot; -import org.immutables.value.Value; - -import java.util.Optional; -import java.util.OptionalLong; - -public interface IcebergBaseInstructions { - - /** - * The {@link TableDefinition} to use when reading or writing Iceberg data files, instead of the one implied by the - * table being read/written itself. This definition can be used to skip some columns or add additional columns with - * {@code null} values. - * - *

- * When using an {@link IcebergTableWriter}, this table definition should either: - *

- */ - Optional tableDefinition(); - - /** - * The data instructions to use for reading/writing the Iceberg data files (might be S3Instructions or other cloud - * provider-specific instructions). - */ - Optional dataInstructions(); - - /** - * The identifier of the snapshot to load for reading. - *

- * If both this and {@link #snapshot()} are provided, the {@link Snapshot#snapshotId()} should match this. - * Otherwise, only one of them should be provided. If neither is provided, the latest snapshot will be loaded. - */ - OptionalLong snapshotId(); - - /** - * The snapshot to load for reading. - *

- * If both this and {@link #snapshotId()} are provided, the {@link Snapshot#snapshotId()} should match the - * {@link #snapshotId()}. Otherwise, only one of them should be provided. If neither is provided, the latest - * snapshot will be loaded. - */ - Optional snapshot(); - - interface Builder> { - INSTRUCTIONS_BUILDER tableDefinition(TableDefinition tableDefinition); - - INSTRUCTIONS_BUILDER dataInstructions(Object s3Instructions); - - INSTRUCTIONS_BUILDER snapshotId(long snapshotId); - - INSTRUCTIONS_BUILDER snapshot(Snapshot snapshot); - } - - @Value.Check - default void checkSnapshotId() { - if (snapshotId().isPresent() && snapshot().isPresent() && - snapshotId().getAsLong() != snapshot().get().snapshotId()) { - throw new IllegalArgumentException("If both snapshotID and snapshot are provided, the snapshot Ids " + - "must match, found " + snapshotId().getAsLong() + " and " + snapshot().get().snapshotId()); - } - } -} diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java index fc5dc01fda6..2c0206c0c89 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java @@ -260,6 +260,23 @@ public Catalog catalog() { return catalog; } + /** + * Create a new Iceberg table in the catalog with the given table identifier and definition. + *

+ * All columns of type {@link ColumnDefinition.ColumnType#Partitioning partitioning} will be used to create the + * partition spec for the table. + * + * @param tableIdentifier The identifier string of the new table. + * @param definition The {@link TableDefinition} of the new table. + * @return The {@link IcebergTableAdapter table adapter} for the new Iceberg table. + * @throws AlreadyExistsException if the table already exists + */ + public IcebergTableAdapter createTable( + @NotNull final String tableIdentifier, + @NotNull final TableDefinition definition) { + return createTable(TableIdentifier.parse(tableIdentifier), definition); + } + /** * Create a new Iceberg table in the catalog with the given table identifier and definition. *

diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergReadInstructions.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergReadInstructions.java index 3e27e292075..fa91cc9c2d3 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergReadInstructions.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergReadInstructions.java @@ -4,11 +4,14 @@ package io.deephaven.iceberg.util; import io.deephaven.annotations.CopyableStyle; +import io.deephaven.engine.table.TableDefinition; import org.apache.iceberg.Snapshot; import org.immutables.value.Value; import org.immutables.value.Value.Immutable; import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; /** * This class provides instructions intended for reading Iceberg catalogs and tables. The default values documented in @@ -16,7 +19,7 @@ */ @Immutable @CopyableStyle -public abstract class IcebergReadInstructions implements IcebergBaseInstructions { +public abstract class IcebergReadInstructions { /** * The default {@link IcebergReadInstructions} to use when reading Iceberg data files. Providing this will use * system defaults for cloud provider-specific parameters. @@ -27,6 +30,17 @@ public static Builder builder() { return ImmutableIcebergReadInstructions.builder(); } + /** + * The {@link TableDefinition} to use when reading Iceberg data files. + */ + public abstract Optional tableDefinition(); + + /** + * The data instructions to use for reading the Iceberg data files (might be S3Instructions or other cloud + * provider-specific instructions). + */ + public abstract Optional dataInstructions(); + /** * A {@link Map map} of rename instructions from Iceberg to Deephaven column names to use when reading the Iceberg * data files. @@ -47,23 +61,54 @@ public IcebergUpdateMode updateMode() { return IcebergUpdateMode.staticMode(); } + /** + * The identifier of the snapshot to load for reading. If both this and {@link #snapshot()} are provided, the + * {@link Snapshot#snapshotId()} should match this. Otherwise, only one of them should be provided. If neither is + * provided, the latest snapshot will be loaded. + */ + public abstract OptionalLong snapshotId(); + /** * Return a copy of this instructions object with the snapshot ID replaced by {@code value}. */ public abstract IcebergReadInstructions withSnapshotId(long value); + /** + * The snapshot to load for reading. If both this and {@link #snapshotId()} are provided, the + * {@link Snapshot#snapshotId()} should match the {@link #snapshotId()}. Otherwise, only one of them should be + * provided. If neither is provided, the latest snapshot will be loaded. + */ + public abstract Optional snapshot(); + /** * Return a copy of this instructions object with the snapshot replaced by {@code value}. */ public abstract IcebergReadInstructions withSnapshot(Snapshot value); - public interface Builder extends IcebergBaseInstructions.Builder { + public interface Builder { + Builder tableDefinition(TableDefinition tableDefinition); + + Builder dataInstructions(Object s3Instructions); + Builder putColumnRenames(String key, String value); Builder putAllColumnRenames(Map entries); Builder updateMode(IcebergUpdateMode updateMode); + Builder snapshotId(long snapshotId); + + Builder snapshot(Snapshot snapshot); + IcebergReadInstructions build(); } + + @Value.Check + final void checkSnapshotId() { + if (snapshotId().isPresent() && snapshot().isPresent() && + snapshotId().getAsLong() != snapshot().get().snapshotId()) { + throw new IllegalArgumentException("If both snapshotID and snapshot are provided, the snapshot Ids " + + "must match, found " + snapshotId().getAsLong() + " and " + snapshot().get().snapshotId()); + } + } } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableAdapter.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableAdapter.java index 4b0bbdc66fe..dff4f620a25 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableAdapter.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableAdapter.java @@ -27,7 +27,6 @@ import io.deephaven.time.DateTimeUtils; import io.deephaven.util.annotations.InternalUseOnly; import io.deephaven.util.annotations.VisibleForTesting; -import org.apache.iceberg.DataFile; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -251,13 +250,13 @@ public synchronized Optional schema(final int schemaId) { } /** - * Retrieves the appropriate {@link Snapshot} based on the provided {@link IcebergBaseInstructions}, or {@code null} - * if no {@link IcebergBaseInstructions#snapshot() snapshot} or {@link IcebergBaseInstructions#snapshotId() + * Retrieves the appropriate {@link Snapshot} based on the provided {@link IcebergReadInstructions}, or {@code null} + * if no {@link IcebergReadInstructions#snapshot() snapshot} or {@link IcebergReadInstructions#snapshotId() * snapshotId} is provided. */ @InternalUseOnly @Nullable - public Snapshot getSnapshot(@NotNull final IcebergBaseInstructions readInstructions) { + public Snapshot getSnapshot(@NotNull final IcebergReadInstructions readInstructions) { if (readInstructions.snapshot().isPresent()) { return readInstructions.snapshot().get(); } else if (readInstructions.snapshotId().isPresent()) { @@ -574,16 +573,12 @@ private static TableDefinition fromSchema( return tableDef; } - /** * Create a new {@link IcebergTableWriter} for this Iceberg table using the provided {@link TableWriterOptions}. *

* This method will perform schema validation to ensure that the provided * {@link TableWriterOptions#tableDefinition()} is compatible with the Iceberg table schema. All further writes - * performed by the returned writer will not be validated against the table's schema, and thus would be faster. - *

- * Creating an {@link IcebergTableWriter} is the recommended approach if users want to write to the same iceberg - * table multiple times. + * performed by the returned writer will not be validated against the table's schema, and thus will be faster. * * @param tableWriterOptions The options to configure the table writer. * @return A new instance of {@link IcebergTableWriter} configured with the provided options. @@ -591,40 +586,4 @@ private static TableDefinition fromSchema( public IcebergTableWriter tableWriter(final TableWriterOptions tableWriterOptions) { return new IcebergTableWriter(tableWriterOptions, this); } - - /** - * Append the provided Deephaven {@link IcebergWriteInstructions#tables()} as new partitions to the existing Iceberg - * table in a single snapshot. This will not change the schema of the existing table. - *

- * This method will create a new {@link IcebergTableWriter} with the provided - * {@link IcebergWriteInstructions#tableDefinition()} and will use that writer to write the data to the table. - * Therefore, this method is not recommended if users want to write to the table multiple times. Instead, users - * should create a single {@link IcebergTableWriter} and use it to write multiple times. - * - * @param writeInstructions The instructions for customizations while writing. - */ - public void append(@NotNull final IcebergWriteInstructions writeInstructions) { - final TableDefinition userDefinition = writeInstructions.tableDefinitionOrFirst(); - final IcebergTableWriter newWriter = - new IcebergTableWriter(TableWriterOptions.builder().tableDefinition(userDefinition).build(), this); - newWriter.append(writeInstructions); - } - - /** - * Writes data from Deephaven tables to an Iceberg table without creating a new snapshot. This method returns a list - * of data files that were written. Users can use this list to create a transaction/snapshot if needed. - *

- * This method will create a new {@link IcebergTableWriter} with the provided - * {@link IcebergWriteInstructions#tableDefinition()} and will use that writer to write the data to the table. - * Therefore, this method is not recommended if users want to write to the table multiple times. Instead, users - * should create a single {@link IcebergTableWriter} and use it to write multiple times. - * - * @param writeInstructions The instructions for customizations while writing. - */ - public List writeDataFiles(@NotNull final IcebergWriteInstructions writeInstructions) { - final TableDefinition userDefinition = writeInstructions.tableDefinitionOrFirst(); - final IcebergTableWriter newWriter = - new IcebergTableWriter(TableWriterOptions.builder().tableDefinition(userDefinition).build(), this); - return newWriter.writeDataFiles(writeInstructions); - } } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java index b223471f2ed..c798a559ff3 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java @@ -48,6 +48,11 @@ */ public class IcebergTableWriter { + /** + * The options used to configure the behavior of this writer instance. + */ + private final TableParquetWriterOptions tableWriterOptions; + /** * The Iceberg table which will be written to by this instance. */ @@ -84,8 +89,10 @@ public class IcebergTableWriter { */ private final OutputFileFactory outputFileFactory; - - IcebergTableWriter(final TableWriterOptions tableWriterOptions, final IcebergTableAdapter tableAdapter) { + IcebergTableWriter( + final TableWriterOptions tableWriterOptions, + final IcebergTableAdapter tableAdapter) { + this.tableWriterOptions = verifyWriterOptions(tableWriterOptions); this.table = tableAdapter.icebergTable(); if (table instanceof HasTableOperations) { @@ -111,6 +118,17 @@ public class IcebergTableWriter { .build(); } + private static TableParquetWriterOptions verifyWriterOptions( + @NotNull final TableWriterOptions tableWriterOptions) { + // We ony support writing to Parquet files + if (!(tableWriterOptions instanceof TableParquetWriterOptions)) { + throw new IllegalArgumentException( + "Unsupported options of class " + tableWriterOptions.getClass() + " for" + + " writing Iceberg table, expected: " + TableParquetWriterOptions.class); + } + return (TableParquetWriterOptions) tableWriterOptions; + } + /** * Check that all the field IDs are present in the schema. */ @@ -218,20 +236,9 @@ public void append(@NotNull final IcebergWriteInstructions writeInstructions) { * transaction/snapshot if needed. This method will not perform any compatibility checks between the existing schema * and the provided Deephaven tables. * - * @param instructions The instructions for customizations while writing. + * @param writeInstructions The instructions for customizations while writing. */ - public List writeDataFiles(@NotNull final IcebergWriteInstructions instructions) { - final IcebergParquetWriteInstructions writeInstructions = verifyInstructions(instructions); - // Verify that the table definition matches the Iceberg table writer - if (writeInstructions.tableDefinition().isPresent() && - !writeInstructions.tableDefinition().get().equals(tableDefinition)) { - throw new IllegalArgumentException( - "Failed to write data to Iceberg table. The provided table definition does not match the " + - "table definition of the Iceberg table writer. Table definition provided : " + - writeInstructions.tableDefinition().get() + ", table definition of the Iceberg " + - "table writer : " + tableDefinition); - } - + public List writeDataFiles(@NotNull final IcebergWriteInstructions writeInstructions) { final List partitionPaths = writeInstructions.partitionPaths(); verifyPartitionPaths(table, partitionPaths); final Pair, List> ret = partitionDataFromPaths(table.spec(), partitionPaths); @@ -242,16 +249,6 @@ public List writeDataFiles(@NotNull final IcebergWriteInstructions ins return dataFilesFromParquet(parquetFileInfo, partitionData); } - static IcebergParquetWriteInstructions verifyInstructions( - @NotNull final IcebergWriteInstructions instructions) { - // We ony support writing to Parquet files - if (!(instructions instanceof IcebergParquetWriteInstructions)) { - throw new IllegalArgumentException("Unsupported instructions of class " + instructions.getClass() + " for" + - " writing Iceberg table, expected: " + IcebergParquetWriteInstructions.class); - } - return (IcebergParquetWriteInstructions) instructions; - } - private static void verifyPartitionPaths( final org.apache.iceberg.Table icebergTable, final Collection partitionPaths) { @@ -267,7 +264,7 @@ private static void verifyPartitionPaths( private List writeParquet( @NotNull final List partitionDataList, @NotNull final List dhTableUpdateStrings, - @NotNull final IcebergParquetWriteInstructions writeInstructions) { + @NotNull final IcebergWriteInstructions writeInstructions) { final List dhTables = writeInstructions.tables(); final boolean isPartitioned = table.spec().isPartitioned(); if (isPartitioned) { @@ -280,7 +277,7 @@ private List writeParquet( // Build the parquet instructions final List parquetFilesWritten = new ArrayList<>(dhTables.size()); final ParquetInstructions.OnWriteCompleted onWriteCompleted = parquetFilesWritten::add; - final ParquetInstructions parquetInstructions = writeInstructions.toParquetInstructions( + final ParquetInstructions parquetInstructions = tableWriterOptions.toParquetInstructions( onWriteCompleted, tableDefinition, fieldIdToColumnName); // Write the data to parquet files diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergWriteInstructions.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergWriteInstructions.java index 21b40bfee62..3527c56d858 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergWriteInstructions.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergWriteInstructions.java @@ -3,20 +3,26 @@ // package io.deephaven.iceberg.util; +import io.deephaven.annotations.BuildableStyle; import io.deephaven.engine.table.Table; -import io.deephaven.engine.table.TableDefinition; import org.immutables.value.Value; +import org.immutables.value.Value.Immutable; import java.util.List; /** - * This class provides instructions intended for writing Iceberg tables. The default values documented in this class may - * change in the future. As such, callers may wish to explicitly set the values. + * This class provides instructions intended for writing deephaven tables as partitions to Iceberg tables. */ -public abstract class IcebergWriteInstructions implements IcebergBaseInstructions { +@Immutable +@BuildableStyle +public abstract class IcebergWriteInstructions { + + public static Builder builder() { + return ImmutableIcebergWriteInstructions.builder(); + } + /** - * The Deephaven tables to be written. All tables should have the same definition, else a {@link #tableDefinition() - * table definition} should be provided. + * The Deephaven tables to be written. */ public abstract List
tables(); @@ -29,59 +35,31 @@ public abstract class IcebergWriteInstructions implements IcebergBaseInstruction */ public abstract List partitionPaths(); - /** - * Returns {@link #tableDefinition()} if present, else the definition of the first table in {@link #tables()}. - */ - final TableDefinition tableDefinitionOrFirst() { - return tableDefinition().orElse(tables().get(0).getDefinition()); - } - // @formatter:off - interface Builder> - extends IcebergBaseInstructions.Builder { + public interface Builder { // @formatter:on - INSTRUCTIONS_BUILDER addTables(Table element); + Builder addTables(Table element); - INSTRUCTIONS_BUILDER addTables(Table... elements); + Builder addTables(Table... elements); - INSTRUCTIONS_BUILDER addAllTables(Iterable elements); + Builder addAllTables(Iterable elements); - INSTRUCTIONS_BUILDER addPartitionPaths(String element); + Builder addPartitionPaths(String element); - INSTRUCTIONS_BUILDER addPartitionPaths(String... elements); - - INSTRUCTIONS_BUILDER addAllPartitionPaths(Iterable elements); - } + Builder addPartitionPaths(String... elements); + Builder addAllPartitionPaths(Iterable elements); - @Value.Check - final void validateTables() { - countCheckTables(); - verifySameDefinition(); + IcebergWriteInstructions build(); } + @Value.Check final void countCheckTables() { if (tables().isEmpty()) { throw new IllegalArgumentException("At least one table must be provided"); } } - final void verifySameDefinition() { - if (tableDefinition().isEmpty()) { - // Verify that all tables have the same definition - final List
tables = tables(); - final int numTables = tables.size(); - final TableDefinition firstDefinition = tables.get(0).getDefinition(); - for (int idx = 1; idx < numTables; idx++) { - if (!firstDefinition.equals(tables.get(idx).getDefinition())) { - throw new IllegalArgumentException( - "All Deephaven tables must have the same definition, else table definition should be " + - "provided when writing multiple tables with different definitions"); - } - } - } - } - @Value.Check final void countCheckPartitionPaths() { if (!partitionPaths().isEmpty() && partitionPaths().size() != tables().size()) { diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergParquetWriteInstructions.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/TableParquetWriterOptions.java similarity index 83% rename from extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergParquetWriteInstructions.java rename to extensions/iceberg/src/main/java/io/deephaven/iceberg/util/TableParquetWriterOptions.java index 9dba03dfbfd..98f2211907a 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergParquetWriteInstructions.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/TableParquetWriterOptions.java @@ -6,9 +6,7 @@ import io.deephaven.annotations.BuildableStyle; import io.deephaven.engine.table.TableDefinition; import io.deephaven.parquet.table.ParquetInstructions; -import org.immutables.value.Value.Default; -import org.immutables.value.Value.Immutable; -import org.immutables.value.Value.Check; +import org.immutables.value.Value; import org.jetbrains.annotations.NotNull; import java.util.Map; @@ -16,22 +14,23 @@ import static io.deephaven.parquet.table.ParquetInstructions.MIN_TARGET_PAGE_SIZE; /** - * This class provides instructions intended for writing Iceberg tables as Parquet data files. The default values - * documented in this class may change in the future. As such, callers may wish to explicitly set the values. + * This class provides instructions for building {@link IcebergTableWriter} intended for writing Iceberg tables as + * Parquet data files. The default values documented in this class may change in the future. As such, callers may wish + * to explicitly set the values. */ -@Immutable +@Value.Immutable @BuildableStyle -public abstract class IcebergParquetWriteInstructions extends IcebergWriteInstructions { +public abstract class TableParquetWriterOptions extends TableWriterOptions { public static Builder builder() { - return ImmutableIcebergParquetWriteInstructions.builder(); + return ImmutableTableParquetWriterOptions.builder(); } /** * The name of the compression codec to use when writing Parquet files; defaults to * {@link ParquetInstructions#DEFAULT_COMPRESSION_CODEC_NAME}. */ - @Default + @Value.Default public String compressionCodecName() { return ParquetInstructions.DEFAULT_COMPRESSION_CODEC_NAME; } @@ -41,7 +40,7 @@ public String compressionCodecName() { * non-dictionary encoding; defaults to {@value ParquetInstructions#DEFAULT_MAXIMUM_DICTIONARY_KEYS}; never * evaluated for non-String columns. */ - @Default + @Value.Default public int maximumDictionaryKeys() { return ParquetInstructions.DEFAULT_MAXIMUM_DICTIONARY_KEYS; } @@ -51,7 +50,7 @@ public int maximumDictionaryKeys() { * encoding; defaults to {@value ParquetInstructions#DEFAULT_MAXIMUM_DICTIONARY_SIZE}; never evaluated for * non-String columns. */ - @Default + @Value.Default public int maximumDictionarySize() { return ParquetInstructions.DEFAULT_MAXIMUM_DICTIONARY_SIZE; } @@ -61,13 +60,13 @@ public int maximumDictionarySize() { * {@link ParquetInstructions#DEFAULT_TARGET_PAGE_SIZE}, should be greater than or equal to * {@link ParquetInstructions#MIN_TARGET_PAGE_SIZE}. */ - @Default + @Value.Default public int targetPageSize() { return ParquetInstructions.DEFAULT_TARGET_PAGE_SIZE; } /** - * Convert this {@link IcebergParquetWriteInstructions} to a {@link ParquetInstructions}. + * Convert this to a {@link ParquetInstructions}. * * @param onWriteCompleted The callback to be invoked after writing the parquet file. * @param tableDefinition The table definition to be populated inside the parquet file's schema @@ -95,7 +94,7 @@ ParquetInstructions toParquetInstructions( return builder.build(); } - public interface Builder extends IcebergWriteInstructions.Builder { + public interface Builder extends TableWriterOptions.Builder { Builder compressionCodecName(String compressionCodecName); Builder maximumDictionaryKeys(int maximumDictionaryKeys); @@ -104,24 +103,24 @@ public interface Builder extends IcebergWriteInstructions.Builder { Builder targetPageSize(int targetPageSize); - IcebergParquetWriteInstructions build(); + TableParquetWriterOptions build(); } - @Check + @Value.Check final void boundsCheckMaxDictionaryKeys() { if (maximumDictionaryKeys() < 0) { throw new IllegalArgumentException("maximumDictionaryKeys(=" + maximumDictionaryKeys() + ") must be >= 0"); } } - @Check + @Value.Check final void boundsCheckMaxDictionarySize() { if (maximumDictionarySize() < 0) { throw new IllegalArgumentException("maximumDictionarySize(=" + maximumDictionarySize() + ") must be >= 0"); } } - @Check + @Value.Check final void boundsCheckTargetPageSize() { if (targetPageSize() < MIN_TARGET_PAGE_SIZE) { throw new IllegalArgumentException( diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/TableWriterOptions.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/TableWriterOptions.java index a1214409db6..7fa8cc7a64a 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/TableWriterOptions.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/TableWriterOptions.java @@ -3,7 +3,6 @@ // package io.deephaven.iceberg.util; -import io.deephaven.annotations.BuildableStyle; import io.deephaven.engine.table.TableDefinition; import org.apache.iceberg.Schema; import org.immutables.value.Value; @@ -12,10 +11,9 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; -@Value.Immutable -@BuildableStyle public abstract class TableWriterOptions { /** @@ -25,6 +23,12 @@ public abstract class TableWriterOptions { */ public abstract TableDefinition tableDefinition(); + /** + * The data instructions to use for reading/writing the Iceberg data files (might be S3Instructions or other cloud + * provider-specific instructions). + */ + public abstract Optional dataInstructions(); + /** * Used to extract a {@link Schema} from a table. That schema will be used in conjunction with the * {@link #fieldIdToColumnName()} to map Deephaven columns from {@link #tableDefinition()} to Iceberg columns. If @@ -57,20 +61,18 @@ Map dhColumnNameToFieldId() { return reversedMap; } - public static Builder builder() { - return ImmutableTableWriterOptions.builder(); - } - - public interface Builder { - Builder tableDefinition(TableDefinition tableDefinition); + // @formatter:off + interface Builder> { + // @formatter:on + INSTRUCTIONS_BUILDER tableDefinition(TableDefinition tableDefinition); - Builder schemaProvider(SchemaProvider schemaProvider); + INSTRUCTIONS_BUILDER dataInstructions(Object s3Instructions); - Builder putFieldIdToColumnName(int value, String key); + INSTRUCTIONS_BUILDER schemaProvider(SchemaProvider schemaProvider); - Builder putAllFieldIdToColumnName(Map entries); + INSTRUCTIONS_BUILDER putFieldIdToColumnName(int value, String key); - TableWriterOptions build(); + INSTRUCTIONS_BUILDER putAllFieldIdToColumnName(Map entries); } /** diff --git a/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java b/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java index 81261d69fde..53c2393a677 100644 --- a/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java +++ b/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java @@ -18,11 +18,11 @@ import io.deephaven.iceberg.util.IcebergCatalogAdapter; import io.deephaven.iceberg.util.IcebergReadInstructions; import io.deephaven.iceberg.util.IcebergTableAdapter; -import io.deephaven.iceberg.util.IcebergParquetWriteInstructions; import io.deephaven.iceberg.util.IcebergTableImpl; import io.deephaven.iceberg.util.IcebergTableWriter; import io.deephaven.iceberg.util.IcebergUpdateMode; -import io.deephaven.iceberg.util.TableWriterOptions; +import io.deephaven.iceberg.util.IcebergWriteInstructions; +import io.deephaven.iceberg.util.TableParquetWriterOptions; import io.deephaven.parquet.table.ParquetInstructions; import io.deephaven.parquet.table.ParquetTools; import io.deephaven.parquet.table.location.ParquetTableLocationKey; @@ -78,10 +78,11 @@ void tearDown() throws Exception { engineCleanup.tearDown(); } - private IcebergParquetWriteInstructions.Builder instructionsBuilder() { - final IcebergParquetWriteInstructions.Builder builder = IcebergParquetWriteInstructions.builder(); - if (dataInstructions() != null) { - return builder.dataInstructions(dataInstructions()); + private TableParquetWriterOptions.Builder writerOptionsBuilder() { + final TableParquetWriterOptions.Builder builder = TableParquetWriterOptions.builder(); + final Object dataInstructions; + if ((dataInstructions = dataInstructions()) != null) { + return builder.dataInstructions(dataInstructions); } return builder; } @@ -126,12 +127,14 @@ void appendTableBasicTest() { "doubleCol = (double) 2.5 * i + 10"); final TableIdentifier tableIdentifier = TableIdentifier.parse("MyNamespace.MyTable"); final IcebergTableAdapter tableAdapter = catalogAdapter.createTable(tableIdentifier, source.getDefinition()); - final IcebergTableWriter tableWriter = tableAdapter.tableWriter(TableWriterOptions.builder() - .tableDefinition(source.getDefinition()) - .build()); - tableWriter.append(instructionsBuilder() - .addTables(source) - .build()); + { + final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder() + .tableDefinition(source.getDefinition()) + .build()); + tableWriter.append(IcebergWriteInstructions.builder() + .addTables(source) + .build()); + } Table fromIceberg = tableAdapter.table(); assertTableEquals(source, fromIceberg); @@ -141,10 +144,14 @@ void appendTableBasicTest() { final Table moreData = TableTools.emptyTable(5) .update("intCol = (int) 3 * i + 20", "doubleCol = (double) 3.5 * i + 20"); - tableWriter.append(instructionsBuilder() - .addTables(moreData) + final IcebergTableWriter lz4TableWriter = tableAdapter.tableWriter(writerOptionsBuilder() + .tableDefinition(source.getDefinition()) .compressionCodecName("LZ4") .build()); + lz4TableWriter.append(IcebergWriteInstructions.builder() + .addTables(moreData) + .build()); + fromIceberg = tableAdapter.table(); final Table expected = TableTools.merge(source, moreData); assertTableEquals(expected, fromIceberg); @@ -154,7 +161,7 @@ void appendTableBasicTest() { final Table emptyTable = TableTools.emptyTable(0) .update("intCol = (int) 4 * i + 30", "doubleCol = (double) 4.5 * i + 30"); - tableWriter.append(instructionsBuilder() + lz4TableWriter.append(IcebergWriteInstructions.builder() .addTables(emptyTable) .build()); fromIceberg = tableAdapter.table(); @@ -165,10 +172,16 @@ void appendTableBasicTest() { final Table someMoreData = TableTools.emptyTable(3) .update("intCol = (int) 5 * i + 40", "doubleCol = (double) 5.5 * i + 40"); - tableWriter.append(instructionsBuilder() - .addTables(someMoreData, moreData, emptyTable) - .compressionCodecName("GZIP") - .build()); + { + final IcebergTableWriter gzipTableWriter = tableAdapter.tableWriter(writerOptionsBuilder() + .tableDefinition(source.getDefinition()) + .compressionCodecName("GZIP") + .build()); + gzipTableWriter.append(IcebergWriteInstructions.builder() + .addTables(someMoreData, moreData, emptyTable) + .build()); + } + fromIceberg = tableAdapter.table(); final Table expected2 = TableTools.merge(expected, someMoreData, moreData); assertTableEquals(expected2, fromIceberg); @@ -188,36 +201,24 @@ void appendWithDifferentDefinition() { final TableIdentifier tableIdentifier = TableIdentifier.parse("MyNamespace.MyTable"); final IcebergTableAdapter tableAdapter = catalogAdapter.createTable(tableIdentifier, source.getDefinition()); - final IcebergTableWriter tableWriter = tableAdapter.tableWriter(TableWriterOptions.builder() + final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder() .tableDefinition(source.getDefinition()) .build()); - tableWriter.append(instructionsBuilder() + tableWriter.append(IcebergWriteInstructions.builder() .addTables(source) .build()); Table fromIceberg = tableAdapter.table(); assertTableEquals(source, fromIceberg); verifySnapshots(tableIdentifier, List.of("append")); - final Table differentSource = TableTools.emptyTable(10) - .update("shortCol = (short) 2 * i + 10"); - try { - // Try appending using table adapter because it will perform verification each time - tableAdapter.append(instructionsBuilder() - .addTables(differentSource) - .build()); - failBecauseExceptionWasNotThrown(IllegalArgumentException.class); - } catch (IllegalArgumentException e) { - assertThat(e).hasMessageContaining("Column shortCol not found in the schema"); - } - - // Append a table with just the int column, should be compatible with the existing schema - final Table compatibleSource = TableTools.emptyTable(10) + // Append a table with just the int column + final Table singleColumnSource = TableTools.emptyTable(10) .update("intCol = (int) 5 * i + 10"); - tableAdapter.append(instructionsBuilder() - .addTables(compatibleSource) + tableWriter.append(IcebergWriteInstructions.builder() + .addTables(singleColumnSource) .build()); fromIceberg = tableAdapter.table(); - final Table expected = TableTools.merge(source, compatibleSource.update("doubleCol = NULL_DOUBLE")); + final Table expected = TableTools.merge(source, singleColumnSource.update("doubleCol = NULL_DOUBLE")); assertTableEquals(expected, fromIceberg); verifySnapshots(tableIdentifier, List.of("append", "append")); @@ -225,7 +226,7 @@ void appendWithDifferentDefinition() { final Table moreData = TableTools.emptyTable(5) .update("intCol = (int) 3 * i + 20", "doubleCol = (double) 3.5 * i + 20"); - tableWriter.append(instructionsBuilder() + tableWriter.append(IcebergWriteInstructions.builder() .addTables(moreData) .build()); fromIceberg = tableAdapter.table(); @@ -234,10 +235,9 @@ void appendWithDifferentDefinition() { verifySnapshots(tableIdentifier, List.of("append", "append", "append")); // Append an empty table - final IcebergParquetWriteInstructions writeInstructions = IcebergParquetWriteInstructions.builder() + tableWriter.append(IcebergWriteInstructions.builder() .addTables(TableTools.emptyTable(0)) - .build(); - tableWriter.append(writeInstructions); + .build()); fromIceberg = tableAdapter.table(); assertTableEquals(expected2, fromIceberg); verifySnapshots(tableIdentifier, List.of("append", "append", "append", "append")); @@ -251,10 +251,10 @@ void appendWithDifferentDefinitionWithTabeWriterTest() { final TableIdentifier tableIdentifier = TableIdentifier.parse("MyNamespace.MyTable"); final IcebergTableAdapter tableAdapter = catalogAdapter.createTable(tableIdentifier, source.getDefinition()); - final IcebergTableWriter tableWriter = tableAdapter.tableWriter(TableWriterOptions.builder() + final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder() .tableDefinition(source.getDefinition()) .build()); - tableWriter.append(instructionsBuilder() + tableWriter.append(IcebergWriteInstructions.builder() .addTables(source) .build()); { @@ -265,7 +265,7 @@ void appendWithDifferentDefinitionWithTabeWriterTest() { // Table writer should not do any schema verification at the time of writing final Table differentSource = TableTools.emptyTable(10) .update("shortCol = (short) 2 * i + 10"); - tableWriter.append(instructionsBuilder() + tableWriter.append(IcebergWriteInstructions.builder() .addTables(differentSource) .build()); { @@ -285,10 +285,10 @@ void appendMultipleTablesWithDefinitionTest() { "doubleCol = (double) 2.5 * i + 10"); final TableIdentifier tableIdentifier = TableIdentifier.parse("MyNamespace.MyTable"); final IcebergTableAdapter tableAdapter = catalogAdapter.createTable(tableIdentifier, source.getDefinition()); - final IcebergTableWriter tableWriter = tableAdapter.tableWriter(TableWriterOptions.builder() + final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder() .tableDefinition(source.getDefinition()) .build()); - tableWriter.append(instructionsBuilder() + tableWriter.append(IcebergWriteInstructions.builder() .addTables(source) .build()); Table fromIceberg = tableAdapter.table(); @@ -302,23 +302,8 @@ void appendMultipleTablesWithDefinitionTest() { .update("charCol = (char) 65 + i % 26", "intCol = (int) 4 * i + 30", "doubleCol = (double) 4.5 * i + 30"); - - try { - tableAdapter.append(instructionsBuilder() - .addTables(appendTable1, appendTable2) - .build()); - failBecauseExceptionWasNotThrown(IllegalArgumentException.class); - } catch (IllegalArgumentException e) { - assertThat(e).hasMessageContaining("All Deephaven tables must have the same definition"); - } - - // Set a table definition that is compatible with all tables - final TableDefinition writeDefinition = TableDefinition.of( - ColumnDefinition.ofInt("intCol"), - ColumnDefinition.ofDouble("doubleCol")); - tableAdapter.append(instructionsBuilder() + tableWriter.append(IcebergWriteInstructions.builder() .addTables(appendTable1, appendTable2) - .tableDefinition(writeDefinition) .build()); fromIceberg = tableAdapter.table(); final Table expected = TableTools.merge( @@ -360,7 +345,10 @@ void appendToCatalogTableWithAllDataTypesTest() { "localTimeCol = java.time.LocalTime.now()", "binaryCol = new byte[] {(byte) i}"); final IcebergTableAdapter tableAdapter = catalogAdapter.loadTable(myTableId); - tableAdapter.append(instructionsBuilder() + final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder() + .tableDefinition(source.getDefinition()) + .build()); + tableWriter.append(IcebergWriteInstructions.builder() .addTables(source) .build()); final Table fromIceberg = tableAdapter.table(); @@ -377,12 +365,12 @@ void testFailureInWrite() { final Namespace myNamespace = Namespace.of("MyNamespace"); final TableIdentifier tableIdentifier = TableIdentifier.of(myNamespace, "MyTable"); final IcebergTableAdapter tableAdapter = catalogAdapter.createTable(tableIdentifier, badSource.getDefinition()); - final IcebergTableWriter tableWriter = tableAdapter.tableWriter(TableWriterOptions.builder() + final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder() .tableDefinition(badSource.getDefinition()) .build()); try { - tableWriter.append(instructionsBuilder() + tableWriter.append(IcebergWriteInstructions.builder() .addTables(badSource) .build()); failBecauseExceptionWasNotThrown(UncheckedDeephavenException.class); @@ -395,14 +383,14 @@ void testFailureInWrite() { final Table goodSource = TableTools.emptyTable(5) .update("stringCol = Long.toString(ii)", "intCol = (int) i"); - tableWriter.append(instructionsBuilder() + tableWriter.append(IcebergWriteInstructions.builder() .addTables(goodSource) .build()); Table fromIceberg = tableAdapter.table(); assertTableEquals(goodSource, fromIceberg); try { - tableWriter.append(instructionsBuilder() + tableWriter.append(IcebergWriteInstructions.builder() .addTables(badSource) .build()); failBecauseExceptionWasNotThrown(UncheckedDeephavenException.class); @@ -426,12 +414,16 @@ void testColumnRenameWhileWriting() throws URISyntaxException { final TableIdentifier tableIdentifier = TableIdentifier.parse("MyNamespace.MyTable"); final TableDefinition originalDefinition = source.getDefinition(); final IcebergTableAdapter tableAdapter = catalogAdapter.createTable(tableIdentifier, originalDefinition); - tableAdapter.append(instructionsBuilder() - .addTables(source) - .build()); - - verifyDataFiles(tableIdentifier, List.of(source)); + { + final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder() + .tableDefinition(source.getDefinition()) + .build()); + tableWriter.append(IcebergWriteInstructions.builder() + .addTables(source) + .build()); + verifyDataFiles(tableIdentifier, List.of(source)); + } // Get field IDs for the columns for this table final Map nameToFieldIdFromSchema = new HashMap<>(); @@ -452,12 +444,12 @@ void testColumnRenameWhileWriting() throws URISyntaxException { "newDoubleCol = (double) 3.5 * i + 20"); { // Now append more data to it but with different column names and field Id mapping - final IcebergTableWriter tableWriter = tableAdapter.tableWriter(TableWriterOptions.builder() + final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder() .tableDefinition(moreData.getDefinition()) .putFieldIdToColumnName(nameToFieldIdFromSchema.get("intCol"), "newIntCol") .putFieldIdToColumnName(nameToFieldIdFromSchema.get("doubleCol"), "newDoubleCol") .build()); - tableWriter.append(instructionsBuilder() + tableWriter.append(IcebergWriteInstructions.builder() .addTables(moreData) .build()); @@ -486,12 +478,14 @@ void testColumnRenameWhileWriting() throws URISyntaxException { * Verify that the schema of the parquet file read from the provided path has the provided column and corresponding * field IDs. */ - private static void verifyFieldIdsFromParquetFile( + private void verifyFieldIdsFromParquetFile( final String path, final List columnNames, final Map nameToFieldId) throws URISyntaxException { final ParquetMetadata metadata = - new ParquetTableLocationKey(new URI(path), 0, null, ParquetInstructions.EMPTY) + new ParquetTableLocationKey(new URI(path), 0, null, ParquetInstructions.builder() + .setSpecialInstructions(dataInstructions()) + .build()) .getMetadata(); final List columnsMetadata = metadata.getFileMetaData().getSchema().getColumns(); @@ -552,11 +546,11 @@ void writeDataFilesBasicTest() { final TableIdentifier tableIdentifier = TableIdentifier.parse("MyNamespace.MyTable"); final IcebergTableAdapter tableAdapter = catalogAdapter.createTable(tableIdentifier, source.getDefinition()); - final IcebergTableWriter tableWriter = tableAdapter.tableWriter(TableWriterOptions.builder() + final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder() .tableDefinition(source.getDefinition()) .build()); - final List dataFilesWritten = tableWriter.writeDataFiles(instructionsBuilder() + final List dataFilesWritten = tableWriter.writeDataFiles(IcebergWriteInstructions.builder() .addTables(source, anotherSource) .build()); verifySnapshots(tableIdentifier, List.of()); @@ -566,7 +560,7 @@ void writeDataFilesBasicTest() { final Table moreData = TableTools.emptyTable(5) .update("intCol = (int) 3 * i + 20", "doubleCol = (double) 3.5 * i + 20"); - tableWriter.append(instructionsBuilder() + tableWriter.append(IcebergWriteInstructions.builder() .addTables(moreData) .build()); { @@ -606,11 +600,11 @@ void testPartitionedAppendBasic() { { final TableDefinition tableDefinition = part1.getDefinition(); final IcebergTableAdapter tableAdapter = catalogAdapter.createTable(tableIdentifier, tableDefinition); - final IcebergTableWriter tableWriter = tableAdapter.tableWriter(TableWriterOptions.builder() + final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder() .tableDefinition(tableDefinition) .build()); try { - tableWriter.append(instructionsBuilder() + tableWriter.append(IcebergWriteInstructions.builder() .addTables(part1, part2) .addAllPartitionPaths(partitionPaths) .build()); @@ -627,14 +621,13 @@ void testPartitionedAppendBasic() { ColumnDefinition.ofDouble("doubleCol"), ColumnDefinition.ofString("PC").withPartitioning()); final IcebergTableAdapter tableAdapter = catalogAdapter.createTable(tableIdentifier, partitioningTableDef); - final IcebergTableWriter tableWriter = tableAdapter.tableWriter(TableWriterOptions.builder() + final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder() .tableDefinition(partitioningTableDef) .build()); try { - tableWriter.append(instructionsBuilder() + tableWriter.append(IcebergWriteInstructions.builder() .addTables(part1, part2) - .tableDefinition(partitioningTableDef) .build()); failBecauseExceptionWasNotThrown(IllegalArgumentException.class); } catch (IllegalArgumentException e) { @@ -642,10 +635,9 @@ void testPartitionedAppendBasic() { assertThat(e).hasMessageContaining("partition paths"); } - tableWriter.append(instructionsBuilder() + tableWriter.append(IcebergWriteInstructions.builder() .addTables(part1, part2) .addAllPartitionPaths(partitionPaths) - .tableDefinition(partitioningTableDef) .build()); final Table fromIceberg = tableAdapter.table(); assertThat(tableAdapter.definition()).isEqualTo(partitioningTableDef); @@ -660,10 +652,9 @@ void testPartitionedAppendBasic() { .update("intCol = (int) 4 * i + 30", "doubleCol = (double) 4.5 * i + 30"); final String partitionPath = "PC=boy"; - tableWriter.append(instructionsBuilder() + tableWriter.append(IcebergWriteInstructions.builder() .addTables(part3) .addPartitionPaths(partitionPath) - .tableDefinition(partitioningTableDef) .build()); final Table fromIceberg2 = tableAdapter.table(); final Table expected2 = TableTools.merge( @@ -690,13 +681,12 @@ void testPartitionedAppendBasicIntegerPartitions() { ColumnDefinition.ofDouble("doubleCol"), ColumnDefinition.ofInt("PC").withPartitioning()); final IcebergTableAdapter tableAdapter = catalogAdapter.createTable(tableIdentifier, tableDefinition); - final IcebergTableWriter tableWriter = tableAdapter.tableWriter(TableWriterOptions.builder() + final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder() .tableDefinition(tableDefinition) .build()); - tableWriter.append(instructionsBuilder() + tableWriter.append(IcebergWriteInstructions.builder() .addTables(part1, part2) .addAllPartitionPaths(partitionPaths) - .tableDefinition(tableDefinition) .build()); final Table fromIceberg = tableAdapter.table(); assertThat(tableAdapter.definition()).isEqualTo(tableDefinition); @@ -711,10 +701,9 @@ void testPartitionedAppendBasicIntegerPartitions() { .update("intCol = (int) 4 * i + 30", "doubleCol = (double) 4.5 * i + 30"); final String partitionPath = "PC=2"; - tableWriter.append(instructionsBuilder() + tableWriter.append(IcebergWriteInstructions.builder() .addTables(part3) .addPartitionPaths(partitionPath) - .tableDefinition(tableDefinition) .build()); final Table fromIceberg2 = tableAdapter.table(); final Table expected2 = TableTools.merge( @@ -731,12 +720,14 @@ void testManualRefreshingAppend() { "doubleCol = (double) 2.5 * i + 10"); final TableIdentifier tableIdentifier = TableIdentifier.parse("MyNamespace.MyTable"); final IcebergTableAdapter tableAdapter = catalogAdapter.createTable(tableIdentifier, source.getDefinition()); - final IcebergTableWriter tableWriter = tableAdapter.tableWriter(TableWriterOptions.builder() - .tableDefinition(source.getDefinition()) - .build()); - tableWriter.append(instructionsBuilder() - .addTables(source) - .build()); + { + final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder() + .tableDefinition(source.getDefinition()) + .build()); + tableWriter.append(IcebergWriteInstructions.builder() + .addTables(source) + .build()); + } final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); @@ -752,10 +743,15 @@ void testManualRefreshingAppend() { final Table moreData = TableTools.emptyTable(5) .update("intCol = (int) 3 * i + 20", "doubleCol = (double) 3.5 * i + 20"); - tableWriter.append(instructionsBuilder() - .addTables(moreData) - .compressionCodecName("LZ4") - .build()); + { + final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder() + .tableDefinition(source.getDefinition()) + .compressionCodecName("LZ4") + .build()); + tableWriter.append(IcebergWriteInstructions.builder() + .addTables(moreData) + .build()); + } fromIcebergRefreshing.update(); updateGraph.runWithinUnitTestCycle(fromIcebergRefreshing::refresh); @@ -774,12 +770,14 @@ void testAutomaticRefreshingAppend() throws InterruptedException { "doubleCol = (double) 2.5 * i + 10"); final TableIdentifier tableIdentifier = TableIdentifier.parse("MyNamespace.MyTable"); final IcebergTableAdapter tableAdapter = catalogAdapter.createTable(tableIdentifier, source.getDefinition()); - final IcebergTableWriter tableWriter = tableAdapter.tableWriter(TableWriterOptions.builder() - .tableDefinition(source.getDefinition()) - .build()); - tableWriter.append(instructionsBuilder() - .addTables(source) - .build()); + { + final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder() + .tableDefinition(source.getDefinition()) + .build()); + tableWriter.append(IcebergWriteInstructions.builder() + .addTables(source) + .build()); + } final IcebergTableImpl fromIcebergRefreshing = (IcebergTableImpl) tableAdapter.table(IcebergReadInstructions.builder() @@ -792,10 +790,15 @@ void testAutomaticRefreshingAppend() throws InterruptedException { final Table moreData = TableTools.emptyTable(5) .update("intCol = (int) 3 * i + 20", "doubleCol = (double) 3.5 * i + 20"); - tableWriter.append(instructionsBuilder() - .addTables(moreData) - .compressionCodecName("LZ4") - .build()); + { + final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder() + .tableDefinition(source.getDefinition()) + .compressionCodecName("LZ4") + .build()); + tableWriter.append(IcebergWriteInstructions.builder() + .addTables(moreData) + .build()); + } // Sleep for 0.5 second Thread.sleep(500); @@ -826,13 +829,12 @@ void testManualRefreshingPartitionedAppend() { ColumnDefinition.ofDouble("doubleCol"), ColumnDefinition.ofString("PC").withPartitioning()); final IcebergTableAdapter tableAdapter = catalogAdapter.createTable(tableIdentifier, tableDefinition); - final IcebergTableWriter tableWriter = tableAdapter.tableWriter(TableWriterOptions.builder() + final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder() .tableDefinition(tableDefinition) .build()); - tableWriter.append(instructionsBuilder() + tableWriter.append(IcebergWriteInstructions.builder() .addTables(part1, part2) .addAllPartitionPaths(partitionPaths) - .tableDefinition(tableDefinition) .build()); final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); @@ -853,10 +855,9 @@ void testManualRefreshingPartitionedAppend() { .update("intCol = (int) 4 * i + 30", "doubleCol = (double) 4.5 * i + 30"); final String partitionPath = "PC=cat"; - tableWriter.append(instructionsBuilder() + tableWriter.append(IcebergWriteInstructions.builder() .addTables(part3) .addPartitionPaths(partitionPath) - .tableDefinition(tableDefinition) .build()); fromIcebergRefreshing.update(); @@ -882,13 +883,12 @@ void testAutoRefreshingPartitionedAppend() { ColumnDefinition.ofDouble("doubleCol"), ColumnDefinition.ofString("PC").withPartitioning()); final IcebergTableAdapter tableAdapter = catalogAdapter.createTable(tableIdentifier, tableDefinition); - final IcebergTableWriter tableWriter = tableAdapter.tableWriter(TableWriterOptions.builder() + final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder() .tableDefinition(tableDefinition) .build()); - tableWriter.append(instructionsBuilder() + tableWriter.append(IcebergWriteInstructions.builder() .addTables(part1, part2) .addAllPartitionPaths(partitionPaths) - .tableDefinition(tableDefinition) .build()); final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); @@ -909,10 +909,9 @@ void testAutoRefreshingPartitionedAppend() { .update("intCol = (int) 4 * i + 30", "doubleCol = (double) 4.5 * i + 30"); final String partitionPath = "PC=cat"; - tableWriter.append(instructionsBuilder() + tableWriter.append(IcebergWriteInstructions.builder() .addTables(part3) .addPartitionPaths(partitionPath) - .tableDefinition(tableDefinition) .build()); // Sleep for 0.5 second diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java index a990dba9228..1036bbacf89 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java @@ -203,7 +203,8 @@ public abstract ParquetInstructions withTableDefinitionAndLayout(final TableDefi /** * @return A callback to be executed when on completing each parquet data file write (excluding the index and - * metadata files). + * metadata files). This callback gets invoked by the writing thread in a linear fashion. The consumer is + * responsible for thread safety. */ public abstract Optional onWriteCompleted(); diff --git a/py/server/deephaven/experimental/iceberg.py b/py/server/deephaven/experimental/iceberg.py index c0db6c49753..47e2a35fe31 100644 --- a/py/server/deephaven/experimental/iceberg.py +++ b/py/server/deephaven/experimental/iceberg.py @@ -16,9 +16,9 @@ _JIcebergUpdateMode = jpy.get_type("io.deephaven.iceberg.util.IcebergUpdateMode") _JIcebergReadInstructions = jpy.get_type("io.deephaven.iceberg.util.IcebergReadInstructions") -_JIcebergParquetWriteInstructions = jpy.get_type("io.deephaven.iceberg.util.IcebergParquetWriteInstructions") +_JIcebergWriteInstructions = jpy.get_type("io.deephaven.iceberg.util.IcebergWriteInstructions") _JSchemaProvider = jpy.get_type("io.deephaven.iceberg.util.SchemaProvider") -_JTableWriterOptions = jpy.get_type("io.deephaven.iceberg.util.TableWriterOptions") +_JTableParquetWriterOptions = jpy.get_type("io.deephaven.iceberg.util.TableParquetWriterOptions") _JIcebergCatalogAdapter = jpy.get_type("io.deephaven.iceberg.util.IcebergCatalogAdapter") _JIcebergTableAdapter = jpy.get_type("io.deephaven.iceberg.util.IcebergTableAdapter") _JIcebergTableWriter = jpy.get_type("io.deephaven.iceberg.util.IcebergTableWriter") @@ -38,7 +38,7 @@ class IcebergUpdateMode(JObjectWrapper): """ - This class specifies the update mode for an Iceberg table to be loaded into Deephaven. The modes are: + IcebergUpdateMode specifies the update mode for an Iceberg table to be loaded into Deephaven. The modes are: - :py:func:`static() `: The table is loaded once and does not change - :py:func:`manual_refresh() `: The table can be manually refreshed by the user. @@ -85,8 +85,9 @@ def j_object(self) -> jpy.JType: class IcebergReadInstructions(JObjectWrapper): """ - This class specifies the instructions for reading an Iceberg table into Deephaven. These include column rename - instructions and table definitions, as well as special data instructions for loading data files from the cloud. + IcebergReadInstructions specifies the instructions for reading an Iceberg table into Deephaven. These include column + rename instructions and table definitions, as well as special data instructions for loading data files from the + cloud. """ j_object_type = _JIcebergReadInstructions @@ -144,23 +145,17 @@ def j_object(self) -> jpy.JType: return self._j_object -class IcebergParquetWriteInstructions(JObjectWrapper): +class IcebergWriteInstructions(JObjectWrapper): """ - This class specifies the instructions for writing Iceberg tables as Parquet data files. These include column rename - instructions, table definitions, special data instructions for loading data files from the cloud, etc. + :class:`.IcebergWriteInstructions` provides instructions intended for writing deephaven tables as partitions to Iceberg + tables. """ - j_object_type = _JIcebergParquetWriteInstructions + j_object_type = _JIcebergWriteInstructions def __init__(self, tables: Union[Table, Sequence[Table]], - partition_paths: Optional[Union[str, Sequence[str]]] = None, - compression_codec_name: Optional[str] = None, - maximum_dictionary_keys: Optional[int] = None, - maximum_dictionary_size: Optional[int] = None, - target_page_size: Optional[int] = None, - table_definition: Optional[TableDefinitionLike] = None, - data_instructions: Optional[s3.S3Instructions] = None): + partition_paths: Optional[Union[str, Sequence[str]]] = None): """ Initializes the instructions using the provided parameters. @@ -172,25 +167,8 @@ def __init__(self, If writing to a partitioned iceberg table, users must provide partition path for each table in tables argument in the same order. Else when writing to a non-partitioned table, users should not provide any partition paths. - By default, the deephaven tables will be written to the root data directory of the iceberg table. - compression_codec_name (Optional[str]): The compression codec to use. Allowed values include "UNCOMPRESSED", - "SNAPPY", "GZIP", "LZO", "LZ4", "LZ4_RAW", "ZSTD", etc. If not specified, defaults to "SNAPPY". - maximum_dictionary_keys (Optional[int]): the maximum number of unique keys the writer should add to a - dictionary page before switching to non-dictionary encoding, never evaluated for non-String columns, - defaults to 2^20 (1,048,576) - maximum_dictionary_size (Optional[int]): the maximum number of bytes the writer should add to the dictionary - before switching to non-dictionary encoding, never evaluated for non-String columns, defaults to - 2^20 (1,048,576) - target_page_size (Optional[int]): the target page size in bytes, if not specified, defaults to - 2^20 bytes (1 MiB) - table_definition (Optional[TableDefinitionLike]): the TableDefinition to use when writing Iceberg data - files, instead of the one implied by the table being written itself. This definition can be used to skip - some columns or add additional columns with null values. - When passing this value to an IcebergTableWriter, this table definition should either: - - Not be provided, in which case the definition will be derived from the writer instance, or - - Match the writer's table definition if it is provided. - By default, the table definition is inferred from the deephaven tables being written, and all tables - must have the same definition. + Defaults to `None`, which means the deephaven tables will be written to the root data directory of the + iceberg table. Raises: DHError: If unable to build the instructions object. @@ -212,24 +190,6 @@ def __init__(self, for partition_path in partition_paths: builder.addPartitionPaths(partition_path) - if compression_codec_name: - builder.compressionCodecName(compression_codec_name) - - if maximum_dictionary_keys: - builder.maximumDictionaryKeys(maximum_dictionary_keys) - - if maximum_dictionary_size: - builder.maximumDictionarySize(maximum_dictionary_size) - - if target_page_size: - builder.targetPageSize(target_page_size) - - if table_definition: - builder.tableDefinition(TableDefinition(table_definition).j_table_definition) - - if data_instructions: - builder.dataInstructions(data_instructions.j_object) - self._j_object = builder.build() except Exception as e: @@ -242,9 +202,11 @@ def j_object(self) -> jpy.JType: class SchemaProvider(JObjectWrapper): """ - Used for extracting the schema from a table. - Instances of this class are used for configuring IcebergTableWriter. + Used for extracting the schema from an Iceberg table. Users can specify multiple ways to do so, for example, by + schema ID, snapshot ID, current schema, etc. This can be useful for passing a schema when writing to an Iceberg + table. """ + j_object_type = _JSchemaProvider def __init__(self, _j_object: jpy.JType): @@ -284,7 +246,7 @@ def from_schema_id(cls, schema_id: int) -> 'SchemaProvider': return cls(_JSchemaProvider.fromSchemaId(schema_id)) @classmethod - def from_snapshot_d(cls, snapshot_id: int) -> 'SchemaProvider': + def from_snapshot_id(cls, snapshot_id: int) -> 'SchemaProvider': """ Used for extracting the schema from the table using the specified snapshot id. @@ -307,17 +269,23 @@ def from_current_snapshot(cls) -> 'SchemaProvider': return cls(_JSchemaProvider.fromCurrentSnapshot()) -class TableWriterOptions(JObjectWrapper): +class TableParquetWriterOptions(JObjectWrapper): """ - This class provides specialized instructions for configuring IcebergTableWriter objects. + :class:`.TableParquetWriterOptions` provides specialized instructions for configuring :class:`.IcebergTableWriter` + instances. """ - j_object_type = _JTableWriterOptions + j_object_type = _JTableParquetWriterOptions def __init__(self, table_definition: TableDefinitionLike, schema_provider: Optional[SchemaProvider] = None, - field_id_to_column_name: Optional[Dict[int, str]] = None): + field_id_to_column_name: Optional[Dict[int, str]] = None, + compression_codec_name: Optional[str] = None, + maximum_dictionary_keys: Optional[int] = None, + maximum_dictionary_size: Optional[int] = None, + target_page_size: Optional[int] = None, + data_instructions: Optional[s3.S3Instructions] = None): """ Initializes the instructions using the provided parameters. @@ -329,11 +297,21 @@ def __init__(self, be used in conjunction with the field_id_to_column_name to map Deephaven columns from table_definition to Iceberg columns. Users can specify how to extract the schema in multiple ways (by ID, snapshot ID, initial schema, etc.). - By default, we use the current schema from the table. + Defaults to `None`, which means use the current schema from the table. field_id_to_column_name: Optional[Dict[int, str]]: A one-to-one map from Iceberg field IDs from the schema_spec to Deephaven column names from the table_definition. - By default, we assume this dictionary is empty, and we will map Iceberg columns to Deephaven columns - using column names. + Defaults to `None`, which means map Iceberg columns to Deephaven columns using column names. + compression_codec_name (Optional[str]): The compression codec to use for writing the parquet file. Allowed + values include "UNCOMPRESSED", "SNAPPY", "GZIP", "LZO", "LZ4", "LZ4_RAW", "ZSTD", etc. Defaults to + `None`, which means use "SNAPPY". + maximum_dictionary_keys (Optional[int]): the maximum number of unique keys the Parquet writer should add to + a dictionary page before switching to non-dictionary encoding, never used for non-String columns. + Defaults to `None`, which means use 2^20 (1,048,576) + maximum_dictionary_size (Optional[int]): the maximum number of bytes the Parquet writer should add to the + dictionary before switching to non-dictionary encoding, never used for non-String columns. Defaults to + `None`, which means use 2^20 (1,048,576) + target_page_size (Optional[int]): the target Parquet file page size in bytes, if not specified. Defaults to + `None`, which means use 2^20 bytes (1 MiB) Raises: DHError: If unable to build the object. @@ -351,6 +329,21 @@ def __init__(self, for field_id, column_name in field_id_to_column_name.items(): builder.putFieldIdToColumnName(field_id, column_name) + if compression_codec_name: + builder.compressionCodecName(compression_codec_name) + + if maximum_dictionary_keys: + builder.maximumDictionaryKeys(maximum_dictionary_keys) + + if maximum_dictionary_size: + builder.maximumDictionarySize(maximum_dictionary_size) + + if target_page_size: + builder.targetPageSize(target_page_size) + + if data_instructions: + builder.dataInstructions(data_instructions.j_object) + self._j_object = builder.build() except Exception as e: @@ -402,23 +395,26 @@ def j_object(self) -> jpy.JType: class IcebergTableWriter(JObjectWrapper): """ - This class is responsible for writing Deephaven tables to an Iceberg table. Each instance of this class is - associated with a single IcebergTableAdapter and can be used to write multiple Deephaven tables to this Iceberg - table. + :class:`.IcebergTableWriter` is responsible for writing Deephaven tables to an Iceberg table. Each + :class:`.IcebergTableWriter` instance associated with a single IcebergTableAdapter and can be used to write multiple + Deephaven tables to this Iceberg table. """ j_object_type = _JIcebergTableWriter or type(None) def __init__(self, j_object: _JIcebergTableWriter): self.j_table_writer = j_object - def append(self, instructions: IcebergParquetWriteInstructions): + def append(self, instructions: IcebergWriteInstructions): """ - Append the provided Deephaven tables from the write instructions as new partitions to the existing Iceberg - table in a single snapshot. This method will not perform any compatibility checks between the existing schema and - the provided Deephaven tables. + Append the provided Deephaven tables as new partitions to the existing Iceberg table in a single snapshot. + Users can provide the tables using the :attr:`.IcebergWriteInstructions.tables` parameter and optionally provide the + partition paths where each table will be written using the :attr:`.IcebergWriteInstructions.partition_paths` + parameter. + This method will not perform any compatibility checks between the existing schema and the provided Deephaven + tables. All such checks happen at the time of creation of the :class:`.IcebergTableWriter` instance. Args: - instructions (IcebergParquetWriteInstructions): the customization instructions for write. + instructions (IcebergWriteInstructions): the customization instructions for write. """ self.j_object.append(instructions.j_object) @@ -429,7 +425,7 @@ def j_object(self) -> jpy.JType: class IcebergTableAdapter(JObjectWrapper): """ - This class provides an interface for interacting with Iceberg tables. It allows the user to list snapshots, + IcebergTableAdapter provides an interface for interacting with Iceberg tables. It allows the user to list snapshots, retrieve table definitions and reading Iceberg tables into Deephaven tables. """ j_object_type = _JIcebergTableAdapter or type(None) @@ -488,15 +484,12 @@ def table(self, instructions: Optional[IcebergReadInstructions] = None) -> Icebe return IcebergTable(self.j_object.table(instructions.j_object)) return IcebergTable(self.j_object.table()) - def table_writer(self, writer_options: TableWriterOptions): + def table_writer(self, writer_options: TableParquetWriterOptions) -> IcebergTableWriter: """ - Create a new IcebergTableWriter for this Iceberg table using the provided TableWriterOptions. + Create a new :class:`.IcebergTableWriter` for this Iceberg table using the provided writer options. This method will perform schema validation to ensure that the provided table definition from the writer options is compatible with the Iceberg table schema. All further writes performed by the returned writer will not be - validated against the table's schema, and thus would be faster. - - Creating a table writer is the recommended approach if users want to write to the same iceberg table multiple - times. + validated against the table's schema, and thus will be faster. Args: writer_options: The options to configure the table writer. @@ -506,21 +499,6 @@ def table_writer(self, writer_options: TableWriterOptions): """ return IcebergTableWriter(self.j_object.tableWriter(writer_options.j_object)) - def append(self, instructions: IcebergParquetWriteInstructions): - """ - Append the provided Deephaven tables from the write instructions as new partitions to the existing Iceberg - table in a single snapshot. This will not change the schema of the existing table. - - This method will create a new IcebergTableWriter with the provided table definition from the write instructions, - and use that writer to write the data to the table. Therefore, this method is not recommended if users want to - write to the iceberg table multiple times. Instead, users should create a single IcebergTableWriter and use it - to append multiple times. - - Args: - instructions (IcebergParquetWriteInstructions): the customization instructions for write. - """ - self.j_object.append(instructions.j_object) - @property def j_object(self) -> jpy.JType: return self.j_table_adapter @@ -528,8 +506,8 @@ def j_object(self) -> jpy.JType: class IcebergCatalogAdapter(JObjectWrapper): """ - This class provides an interface for interacting with Iceberg catalogs. It allows listing namespaces, tables and - snapshots, as well as reading Iceberg tables into Deephaven tables. + IcebergCatalogAdapter provides an interface for interacting with Iceberg catalogs. It allows listing namespaces, + tables and snapshots, as well as reading Iceberg tables into Deephaven tables. """ j_object_type = _JIcebergCatalogAdapter or type(None) diff --git a/py/server/tests/test_iceberg.py b/py/server/tests/test_iceberg.py index 6016b84ecf7..b1dfd4400bf 100644 --- a/py/server/tests/test_iceberg.py +++ b/py/server/tests/test_iceberg.py @@ -13,6 +13,7 @@ _JTableDefinition = jpy.get_type("io.deephaven.engine.table.TableDefinition") + class IcebergTestCase(BaseTestCase): """ Test cases for the deephaven.iceberg module (performed locally) """ @@ -46,7 +47,7 @@ def test_instruction_create_with_col_renames(self): self.assertTrue(col_rename_dict["old_name_c"] == "new_name_c") def test_instruction_create_with_table_definition_dict(self): - table_def={ + table_def = { "x": dtypes.int32, "y": dtypes.double, "z": dtypes.double, @@ -59,7 +60,7 @@ def test_instruction_create_with_table_definition_dict(self): self.assertTrue(col_names[2] == "z") def test_instruction_create_with_table_definition_list(self): - table_def=[ + table_def = [ col_def("Partition", dtypes.int32, column_type=ColumnType.PARTITIONING), col_def("x", dtypes.int32), col_def("y", dtypes.double), @@ -77,36 +78,33 @@ def test_instruction_create_with_snapshot_id(self): iceberg_read_instructions = iceberg.IcebergReadInstructions(snapshot_id=12345) self.assertTrue(iceberg_read_instructions.j_object.snapshotId().getAsLong() == 12345) - def test_write_instruction_create_default(self): - iceberg_write_instructions = iceberg.IcebergParquetWriteInstructions([empty_table(0)]) - self.assertEqual(iceberg_write_instructions.j_object.compressionCodecName(), "SNAPPY") - self.assertEqual(iceberg_write_instructions.j_object.maximumDictionaryKeys(), 1048576) - self.assertEqual(iceberg_write_instructions.j_object.maximumDictionarySize(), 1048576) - self.assertEqual(iceberg_write_instructions.j_object.targetPageSize(), 65536) + def test_writer_options_create_default(self): + writer_options = iceberg.TableParquetWriterOptions(empty_table(0).definition) + self.assertEqual(writer_options.j_object.compressionCodecName(), "SNAPPY") + self.assertEqual(writer_options.j_object.maximumDictionaryKeys(), 1048576) + self.assertEqual(writer_options.j_object.maximumDictionarySize(), 1048576) + self.assertEqual(writer_options.j_object.targetPageSize(), 65536) - def test_write_instruction_create_with_s3_instructions(self): + def test_writer_options_create_with_s3_instructions(self): s3_instructions = s3.S3Instructions(region_name="us-east-1", access_key_id="some_access_key_id", secret_access_key="some_secret_access_key" ) - iceberg_write_instructions = iceberg.IcebergParquetWriteInstructions([empty_table(0)], - data_instructions=s3_instructions) + writer_options = iceberg.TableParquetWriterOptions(empty_table(0).definition, data_instructions=s3_instructions) - def test_write_instruction_create_with_table_definition_dict(self): + def test_writer_options_create_with_table_definition_dict(self): table_def = { "x": dtypes.int32, "y": dtypes.double, "z": dtypes.double, } - - iceberg_write_instructions = iceberg.IcebergParquetWriteInstructions([empty_table(0)], - table_definition=table_def) - col_names = j_list_to_list(iceberg_write_instructions.j_object.tableDefinition().get().getColumnNames()) + writer_options = iceberg.TableParquetWriterOptions(table_def) + col_names = j_list_to_list(writer_options.j_object.tableDefinition().getColumnNames()) self.assertTrue(col_names[0] == "x") self.assertTrue(col_names[1] == "y") self.assertTrue(col_names[2] == "z") - def test_write_instruction_create_with_table_definition_list(self): + def test_writer_options_create_with_table_definition_list(self): table_def = [ col_def("Partition", dtypes.int32, column_type=ColumnType.PARTITIONING), col_def("x", dtypes.int32), @@ -114,30 +112,29 @@ def test_write_instruction_create_with_table_definition_list(self): col_def("z", dtypes.double), ] - iceberg_write_instructions = iceberg.IcebergParquetWriteInstructions([empty_table(0)], - table_definition=table_def) - col_names = j_list_to_list(iceberg_write_instructions.j_object.tableDefinition().get().getColumnNames()) + writer_options = iceberg.TableParquetWriterOptions(table_def) + col_names = j_list_to_list(writer_options.j_object.tableDefinition().getColumnNames()) self.assertTrue(col_names[0] == "Partition") self.assertTrue(col_names[1] == "x") self.assertTrue(col_names[2] == "y") self.assertTrue(col_names[3] == "z") - def test_write_instruction_create_with_compression_codec(self): - iceberg_write_instructions = iceberg.IcebergParquetWriteInstructions([empty_table(0)], - compression_codec_name="GZIP") - self.assertEqual(iceberg_write_instructions.j_object.compressionCodecName(), "GZIP") - - def test_write_instruction_create_with_max_dictionary_keys(self): - iceberg_write_instructions = iceberg.IcebergParquetWriteInstructions([empty_table(0)], - maximum_dictionary_keys=1024) - self.assertEqual(iceberg_write_instructions.j_object.maximumDictionaryKeys(), 1024) - - def test_write_instruction_create_with_max_dictionary_size(self): - iceberg_write_instructions = iceberg.IcebergParquetWriteInstructions([empty_table(0)], - maximum_dictionary_size=8192) - self.assertEqual(iceberg_write_instructions.j_object.maximumDictionarySize(), 8192) - - def test_write_instruction_create_with_target_page_size(self): - iceberg_write_instructions = iceberg.IcebergParquetWriteInstructions([empty_table(0)], - target_page_size=4096) - self.assertEqual(iceberg_write_instructions.j_object.targetPageSize(), 4096) + def test_writer_options_create_with_compression_codec(self): + writer_options = iceberg.TableParquetWriterOptions(empty_table(0).definition, + compression_codec_name="GZIP") + self.assertEqual(writer_options.j_object.compressionCodecName(), "GZIP") + + def test_writer_options_create_with_max_dictionary_keys(self): + writer_options = iceberg.TableParquetWriterOptions(empty_table(0).definition, + maximum_dictionary_keys=1024) + self.assertEqual(writer_options.j_object.maximumDictionaryKeys(), 1024) + + def test_writer_options_create_with_max_dictionary_size(self): + writer_options = iceberg.TableParquetWriterOptions(empty_table(0).definition, + maximum_dictionary_size=8192) + self.assertEqual(writer_options.j_object.maximumDictionarySize(), 8192) + + def test_writer_options_create_with_target_page_size(self): + writer_options = iceberg.TableParquetWriterOptions(empty_table(0).definition, + target_page_size=4096) + self.assertEqual(writer_options.j_object.targetPageSize(), 4096)