Skip to content

Commit

Permalink
Added compatibility check for table defintions
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Nov 22, 2024
1 parent 95743c3 commit ff31d13
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,10 @@ public class IcebergTableWriter {
private final TableDefinition tableDefinition;

/**
* The column definitions for non-partitioning columns. All tables written by this writer are expected to have these
* columns. The partitioning column values will be provided separately by the user via
* {@link IcebergWriteInstructions}.
* The table definition consisting of non-partitioning columns from {@link #tableDefinition}. All tables written by
* this writer are expected to have a compatible definition with this.
*/
private final List<ColumnDefinition<?>> nonPartitioningColumns;
private final TableDefinition nonPartitioningTableDefinition;

/**
* The schema to use when in conjunction with the {@link #fieldIdToColumnName} to map Deephaven columns from
Expand Down Expand Up @@ -116,7 +115,7 @@ public class IcebergTableWriter {
this.tableSpec = table.spec();

this.tableDefinition = tableWriterOptions.tableDefinition();
this.nonPartitioningColumns = nonPartitioningColumnDefinitions(tableDefinition);
this.nonPartitioningTableDefinition = nonPartitioningTableDefinition(tableDefinition);
verifyRequiredFields(table.schema(), tableDefinition);
verifyPartitioningColumns(tableSpec, tableDefinition);

Expand Down Expand Up @@ -148,15 +147,15 @@ private static TableParquetWriterOptions verifyWriterOptions(
* Return a {@link TableDefinition} which contains only the non-partitioning columns from the provided table
* definition.
*/
private static List<ColumnDefinition<?>> nonPartitioningColumnDefinitions(
private static TableDefinition nonPartitioningTableDefinition(
@NotNull final TableDefinition tableDefinition) {
final List<ColumnDefinition<?>> nonPartitioningColumns = new ArrayList<>();
final Collection<ColumnDefinition<?>> nonPartitioningColumns = new ArrayList<>();
for (final ColumnDefinition<?> columnDefinition : tableDefinition.getColumns()) {
if (!columnDefinition.isPartitioning()) {
nonPartitioningColumns.add(columnDefinition);
}
}
return nonPartitioningColumns;
return TableDefinition.of(nonPartitioningColumns);
}

/**
Expand Down Expand Up @@ -274,7 +273,7 @@ public void append(@NotNull final IcebergWriteInstructions writeInstructions) {
* @param writeInstructions The instructions for customizations while writing.
*/
public List<DataFile> writeDataFiles(@NotNull final IcebergWriteInstructions writeInstructions) {
verifyTableDefinition(writeInstructions.tables(), nonPartitioningColumns);
verifyCompatible(writeInstructions.tables(), nonPartitioningTableDefinition);
final List<String> partitionPaths = writeInstructions.partitionPaths();
verifyPartitionPaths(tableSpec, partitionPaths);
final List<PartitionData> partitionData;
Expand All @@ -292,18 +291,13 @@ public List<DataFile> writeDataFiles(@NotNull final IcebergWriteInstructions wri
}

/**
* Verify that all the tables have the same column definitions definition as expected.
* Verify that all the tables are compatible with the provided table definition.
*/
private static void verifyTableDefinition(
private static void verifyCompatible(
@NotNull final Iterable<Table> tables,
@NotNull final List<ColumnDefinition<?>> expectedColumnDefinitions) {
@NotNull final TableDefinition expectedDefinition) {
for (final Table table : tables) {
final List<ColumnDefinition<?>> actualColumnDefinitions = table.getDefinition().getColumns();
if (!actualColumnDefinitions.equals(expectedColumnDefinitions)) {
throw new IllegalArgumentException("Table definition mismatch, all tables written by this writer " +
"are expected to have columns with definition: " + expectedColumnDefinitions +
", found: " + actualColumnDefinitions);
}
expectedDefinition.checkMutualCompatibility(table.getDefinition());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ void appendMultipleTablesWithDifferentDefinitionTest() {
.addTables(appendTable)
.build());
failBecauseExceptionWasNotThrown(UncheckedDeephavenException.class);
} catch (IllegalArgumentException e) {
} catch (TableDefinition.IncompatibleTableDefinitionException e) {
// Table definition mismatch between table writer and append table
assertThat(e).hasMessageContaining("Table definition");
}
Expand Down

0 comments on commit ff31d13

Please sign in to comment.