Skip to content

Commit

Permalink
Drop support for overwriting
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Nov 21, 2024
1 parent d26bc86 commit 5cb0d5a
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 226 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,15 @@ public interface IcebergBaseInstructions {
Optional<Object> dataInstructions();

/**
* <ul>
* <li>When reading an iceberg table, the snapshot with this {@link Snapshot#snapshotId()} will be loaded.</li>
* <li>When {@link IcebergTableWriter#overwrite overwriting} an iceberg table, the snapshot with this
* {@link Snapshot#snapshotId()} will be used for extracting all the data files to be deleted.</li>
* </ul>
* The identifier of the snapshot to load for reading.
* <p>
* If both this and {@link #snapshot()} are provided, the {@link Snapshot#snapshotId()} should match this.
* Otherwise, only one of them should be provided. If neither is provided, the latest snapshot will be loaded.
*/
OptionalLong snapshotId();

/**
* <ul>
* <li>When reading an iceberg table, this snapshot will be loaded.</li>
* <li>When {@link IcebergTableWriter#overwrite overwriting} an iceberg table, this snapshot will be used for
* extracting all the data files to be deleted.</li>
* </ul>
* The snapshot to load for reading.
* <p>
* If both this and {@link #snapshotId()} are provided, the {@link Snapshot#snapshotId()} should match the
* {@link #snapshotId()}. Otherwise, only one of them should be provided. If neither is provided, the latest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,26 +610,6 @@ public void append(@NotNull final IcebergWriteInstructions writeInstructions) {
newWriter.append(writeInstructions);
}

/**
* Overwrite the existing Iceberg table with the provided Deephaven {@link IcebergWriteInstructions#tables()} in a
* single snapshot. This will delete all existing data but will not change the schema of the existing table.
* Overwriting a table while racing with other writers can lead to failure/undefined results.
* <p>
* This method will create a new {@link IcebergTableWriter} with the provided
* {@link IcebergWriteInstructions#tableDefinition()} and will use that writer to write the data to the table.
* Therefore, this method is not recommended if users want to write to the table multiple times. Instead, users
* should create a single {@link IcebergTableWriter} and use it to write multiple times.
*
*
* @param writeInstructions The instructions for customizations while writing.
*/
public void overwrite(@NotNull final IcebergWriteInstructions writeInstructions) {
final TableDefinition userDefinition = writeInstructions.tableDefinitionOrFirst();
final IcebergTableWriter newWriter =
new IcebergTableWriter(TableWriterOptions.builder().tableDefinition(userDefinition).build(), this);
newWriter.overwrite(writeInstructions);
}

/**
* Writes data from Deephaven tables to an Iceberg table without creating a new snapshot. This method returns a list
* of data files that were written. Users can use this list to create a transaction/snapshot if needed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.OverwriteFiles;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.Transaction;
Expand All @@ -39,9 +37,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import static io.deephaven.iceberg.base.IcebergUtils.allDataFiles;
import static io.deephaven.iceberg.base.IcebergUtils.partitionDataFromPaths;
import static io.deephaven.iceberg.base.IcebergUtils.verifyPartitioningColumns;
import static io.deephaven.iceberg.base.IcebergUtils.verifyRequiredFields;
Expand All @@ -53,9 +49,8 @@
public class IcebergTableWriter {

/**
* The Iceberg table adapter and table which will be written to by this instance.
* The Iceberg table which will be written to by this instance.
*/
private final IcebergTableAdapter tableAdapter;
private final org.apache.iceberg.Table table;

@Nullable
Expand Down Expand Up @@ -91,7 +86,6 @@ public class IcebergTableWriter {


IcebergTableWriter(final TableWriterOptions tableWriterOptions, final IcebergTableAdapter tableAdapter) {
this.tableAdapter = tableAdapter;
this.table = tableAdapter.icebergTable();

if (table instanceof HasTableOperations) {
Expand Down Expand Up @@ -215,21 +209,7 @@ private Map<String, Integer> lazyNameMappingDefault() {
*/
public void append(@NotNull final IcebergWriteInstructions writeInstructions) {
final List<DataFile> dataFilesWritten = writeDataFiles(writeInstructions);
commit(dataFilesWritten, false, writeInstructions);
}

/**
* Overwrite the existing Iceberg table with the provided Deephaven {@link IcebergWriteInstructions#tables()} in a
* single snapshot. This will delete all existing data and will not change the schema of the existing table. This
* method will not perform any compatibility checks between the existing schema and the provided Deephaven tables.
* <p>
* Overwriting a table while racing with other writers can lead to failure/undefined results.
*
* @param writeInstructions The instructions for customizations while writing.
*/
public void overwrite(@NotNull final IcebergWriteInstructions writeInstructions) {
final List<DataFile> dataFilesWritten = writeDataFiles(writeInstructions);
commit(dataFilesWritten, true, writeInstructions);
commit(dataFilesWritten);
}

/**
Expand Down Expand Up @@ -343,42 +323,15 @@ private String getDataLocation() {
* Commit the changes to the Iceberg table by creating a snapshot.
*/
private void commit(
@NotNull final Iterable<DataFile> dataFiles,
final boolean overwrite,
@NotNull final IcebergBaseInstructions writeInstructions) {
@NotNull final Iterable<DataFile> dataFiles) {
final Transaction icebergTransaction = table.newTransaction();

// Append the new data files to the table
final AppendFiles append = icebergTransaction.newAppend();
dataFiles.forEach(append::appendFile);
append.commit();

if (overwrite) {
// Fail if the table gets changed concurrently
final OverwriteFiles overwriteFiles = icebergTransaction.newOverwrite();

// Find the snapshot from where we will extract the data files in case of overwriting
final Snapshot referenceSnapshot;
{
final Snapshot snapshotFromInstructions = tableAdapter.getSnapshot(writeInstructions);
if (snapshotFromInstructions != null) {
referenceSnapshot = snapshotFromInstructions;
} else {
referenceSnapshot = table.currentSnapshot();
}
}

// Delete all the existing data files in the table
try (final Stream<DataFile> existingDataFiles = allDataFiles(table, referenceSnapshot)) {
existingDataFiles.forEach(overwriteFiles::deleteFile);
}
dataFiles.forEach(overwriteFiles::addFile);
overwriteFiles.commit();
} else {
// Append the new data files to the table
final AppendFiles append = icebergTransaction.newAppend();
dataFiles.forEach(append::appendFile);
append.commit();
}

// Commit the transaction, creating new snapshot for append/overwrite.
// Note that no new snapshot will be created for the schema change.
// Commit the transaction, creating new snapshot
icebergTransaction.commitTransaction();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,113 +180,6 @@ private void verifySnapshots(final TableIdentifier tableIdentifier, final List<S
assertThat(snapshots).map(Snapshot::operation).isEqualTo(expectedOperations);
}

@Test
void overwriteTablesBasicTest() {
final Table source = TableTools.emptyTable(10)
.update("intCol = (int) 2 * i + 10",
"doubleCol = (double) 2.5 * i + 10");
final TableIdentifier tableIdentifier = TableIdentifier.parse("MyNamespace.MyTable");
// Add some data to the table
final IcebergTableAdapter tableAdapter = catalogAdapter.createTable(tableIdentifier, source.getDefinition());
tableAdapter.append(instructionsBuilder()
.addTables(source)
.build());

// Overwrite with more data
final Table moreData = TableTools.emptyTable(5)
.update("intCol = (int) 3 * i + 20",
"doubleCol = (double) 3.5 * i + 20");
tableAdapter.overwrite(instructionsBuilder()
.addTables(moreData)
.build());
Table fromIceberg = tableAdapter.table();
assertTableEquals(moreData, fromIceberg);
verifySnapshots(tableIdentifier, List.of("append", "overwrite"));

final IcebergTableWriter tableWriter = tableAdapter.tableWriter(TableWriterOptions.builder()
.tableDefinition(source.getDefinition())
.build());

// Overwrite with an empty table
final Table emptyTable = TableTools.emptyTable(0)
.update("intCol = (int) 4 * i + 30",
"doubleCol = (double) 4.5 * i + 30");
tableWriter.overwrite(instructionsBuilder()
.addTables(emptyTable)
.build());
fromIceberg = tableAdapter.table();
assertTableEquals(emptyTable, fromIceberg);
verifySnapshots(tableIdentifier, List.of("append", "overwrite", "overwrite"));

// Overwrite with multiple tables in a single call
final Table someMoreData = TableTools.emptyTable(3)
.update("intCol = (int) 5 * i + 40",
"doubleCol = (double) 5.5 * i + 40");
tableWriter.overwrite(instructionsBuilder()
.addTables(someMoreData, moreData, emptyTable)
.build());
fromIceberg = tableAdapter.table();
final Table expected2 = TableTools.merge(someMoreData, moreData);
assertTableEquals(expected2, fromIceberg);
verifySnapshots(tableIdentifier, List.of("append", "overwrite", "overwrite", "overwrite"));
}

@Test
void overwriteWithDifferentDefinition() {
final Table source = TableTools.emptyTable(10)
.update("intCol = (int) 2 * i + 10",
"doubleCol = (double) 2.5 * i + 10");
final TableIdentifier tableIdentifier = TableIdentifier.parse("MyNamespace.MyTable");
final IcebergTableAdapter tableAdapter = catalogAdapter.createTable(tableIdentifier, source.getDefinition());
tableAdapter.append(instructionsBuilder()
.addTables(source)
.build());
{
final Table fromIceberg = tableAdapter.table();
assertTableEquals(source, fromIceberg);
verifySnapshots(tableIdentifier, List.of("append"));
}

final IcebergTableWriter tableWriter = tableAdapter.tableWriter(TableWriterOptions.builder()
.tableDefinition(source.getDefinition())
.build());

final Table differentSource = TableTools.emptyTable(10)
.update("intCol = (int) 2 * i + 10");
{
tableWriter.overwrite(instructionsBuilder()
.addTables(differentSource)
.build());
final Table fromIceberg = tableAdapter.table();
final Table expected = differentSource.update("doubleCol = NULL_DOUBLE");
assertTableEquals(expected, fromIceberg);
verifySnapshots(tableIdentifier, List.of("append", "overwrite"));
}

{
final Table moreData = TableTools.emptyTable(5)
.update("intCol = (int) 3 * i + 20");
tableWriter.append(instructionsBuilder()
.addTables(moreData)
.build());
final Table fromIceberg = tableAdapter.table();
final Table expected = TableTools.merge(differentSource, moreData).update("doubleCol = NULL_DOUBLE");
assertTableEquals(expected, fromIceberg);
verifySnapshots(tableIdentifier, List.of("append", "overwrite", "append"));
}

// Overwrite with an empty table
{
tableWriter.overwrite(IcebergParquetWriteInstructions.builder()
.addTables(TableTools.emptyTable(0))
.build());
final Table fromIceberg = tableAdapter.table();
assertThat(fromIceberg.size()).isEqualTo(0);
assertThat(tableAdapter.definition()).isEqualTo(source.getDefinition());
verifySnapshots(tableIdentifier, List.of("append", "overwrite", "append", "delete"));
}
}

@Test
void appendWithDifferentDefinition() {
final Table source = TableTools.emptyTable(10)
Expand Down
34 changes: 0 additions & 34 deletions py/server/deephaven/experimental/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ def __init__(self,
maximum_dictionary_keys: Optional[int] = None,
maximum_dictionary_size: Optional[int] = None,
target_page_size: Optional[int] = None,
snapshot_id: Optional[int] = None,
table_definition: Optional[TableDefinitionLike] = None,
data_instructions: Optional[s3.S3Instructions] = None):
"""
Expand All @@ -184,8 +183,6 @@ def __init__(self,
2^20 (1,048,576)
target_page_size (Optional[int]): the target page size in bytes, if not specified, defaults to
2^20 bytes (1 MiB)
snapshot_id (Optional[int]): When overwriting an iceberg table, the snapshot with this id will be used for
extracting all the data files to be deleted. By default, the most recent snapshot's ID will be used.
table_definition (Optional[TableDefinitionLike]): the TableDefinition to use when writing Iceberg data
files, instead of the one implied by the table being written itself. This definition can be used to skip
some columns or add additional columns with null values.
Expand Down Expand Up @@ -227,9 +224,6 @@ def __init__(self,
if target_page_size:
builder.targetPageSize(target_page_size)

if snapshot_id:
builder.snapshotId(snapshot_id)

if table_definition:
builder.tableDefinition(TableDefinition(table_definition).j_table_definition)

Expand Down Expand Up @@ -428,18 +422,6 @@ def append(self, instructions: IcebergParquetWriteInstructions):
"""
self.j_object.append(instructions.j_object)

def overwrite(self, instructions: IcebergParquetWriteInstructions):
"""
Overwrite the existing Iceberg table with the provided Deephaven tables from the write instructions in a
single snapshot. This will delete all existing data but will not change the schema of the existing table.
Overwriting a table while racing with other writers can lead to failure/undefined results. This
method will not perform any compatibility checks between the existing schema and the provided Deephaven tables.
Args:
instructions (IcebergParquetWriteInstructions): the customization instructions for write.
"""
self.j_object.overwrite(instructions.j_object)

@property
def j_object(self) -> jpy.JType:
return self.j_table_writer
Expand Down Expand Up @@ -539,22 +521,6 @@ def append(self, instructions: IcebergParquetWriteInstructions):
"""
self.j_object.append(instructions.j_object)

def overwrite(self, instructions: IcebergParquetWriteInstructions):
"""
Overwrite the existing Iceberg table with the provided Deephaven tables from the write instructions in a
single snapshot. This will delete all existing data but will not change the schema of the existing table.
Overwriting a table while racing with other writers can lead to failure/undefined results.
This method will create a new IcebergTableWriter with the provided table definition from the write instructions,
and use that writer to write the data to the table. Therefore, this method is not recommended if users want to
write to the iceberg table multiple times. Instead, users should create a single IcebergTableWriter and use it
to write multiple times.
Args:
instructions (IcebergParquetWriteInstructions): the customization instructions for write.
"""
self.j_object.overwrite(instructions.j_object)

@property
def j_object(self) -> jpy.JType:
return self.j_table_adapter
Expand Down

0 comments on commit 5cb0d5a

Please sign in to comment.