Skip to content

Commit

Permalink
Review with Devin and Jianfeng/Chip contd.
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Nov 22, 2024
1 parent 5cb0d5a commit aa49f7f
Show file tree
Hide file tree
Showing 14 changed files with 449 additions and 535 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.iceberg.util;

import io.deephaven.engine.table.Table;
import io.deephaven.engine.util.TableTools;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

class IcebergWriteInstructionsTest {

@Test
void testSetDhTables() {
final Table table1 = TableTools.emptyTable(3);
final Table table2 = TableTools.emptyTable(4);
final IcebergWriteInstructions instructions = IcebergWriteInstructions.builder()
.addTables(table1)
.addTables(table2)
.build();
assertThat(instructions.tables().size()).isEqualTo(2);
assertThat(instructions.tables().contains(table1)).isTrue();
assertThat(instructions.tables().contains(table2)).isTrue();
}

@Test
void testSetPartitionPaths() {
final Table table1 = TableTools.emptyTable(3);
final String pp1 = "P1C=1/PC2=2";
final Table table2 = TableTools.emptyTable(4);
final String pp2 = "P1C=2/PC2=3";
try {
final IcebergWriteInstructions instructions = IcebergWriteInstructions.builder()
.addPartitionPaths(pp1, pp2)
.build();
failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
} catch (final IllegalArgumentException e) {
assertThat(e).hasMessageContaining("Partition path must be provided for each table");
}

final IcebergWriteInstructions instructions = IcebergWriteInstructions.builder()
.addTables(table1, table2)
.addPartitionPaths(pp1, pp2)
.build();
assertThat(instructions.partitionPaths().size()).isEqualTo(2);
assertThat(instructions.partitionPaths().contains(pp1)).isTrue();
assertThat(instructions.partitionPaths().contains(pp2)).isTrue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
package io.deephaven.iceberg.util;

import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.util.TableTools;
import io.deephaven.parquet.table.ParquetInstructions;
import org.junit.jupiter.api.Test;

Expand All @@ -15,28 +13,36 @@
import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

class IcebergParquetWriteInstructionsTest {
class TableParquetWriterOptionsTest {

/**
* Create a new IcebergParquetWriteInstructions builder with an empty table.
* Create a new TableParquetWriterOptions builder with an empty table definition.
*/
private static IcebergParquetWriteInstructions.Builder instructions() {
return IcebergParquetWriteInstructions.builder().addTables(TableTools.emptyTable(0));
private static TableParquetWriterOptions.Builder instructions() {
return TableParquetWriterOptions.builder().tableDefinition(TableDefinition.of());
}

@Test
void defaults() {
final IcebergParquetWriteInstructions instructions = instructions().build();
assertThat(instructions.tableDefinition().isEmpty()).isTrue();
final TableParquetWriterOptions instructions = instructions().build();
assertThat(instructions.dataInstructions().isEmpty()).isTrue();
assertThat(instructions.compressionCodecName()).isEqualTo("SNAPPY");
assertThat(instructions.maximumDictionaryKeys()).isEqualTo(1048576);
assertThat(instructions.maximumDictionarySize()).isEqualTo(1048576);
assertThat(instructions.targetPageSize()).isEqualTo(65536);
assertThat(instructions.tables().isEmpty()).isFalse();
assertThat(instructions.partitionPaths().isEmpty()).isTrue();
assertThat(instructions.snapshot()).isEmpty();
assertThat(instructions.snapshotId()).isEmpty();
}

@Test
void testSetTableDefinition() {
final TableDefinition definition = TableDefinition.of(
ColumnDefinition.ofInt("PC1").withPartitioning(),
ColumnDefinition.ofInt("PC2").withPartitioning(),
ColumnDefinition.ofLong("I"));
assertThat(TableParquetWriterOptions.builder()
.tableDefinition(definition)
.build()
.tableDefinition())
.isEqualTo(definition);
}

@Test
Expand Down Expand Up @@ -114,7 +120,7 @@ void testMinTargetPageSize() {

@Test
void toParquetInstructionTest() {
final IcebergParquetWriteInstructions writeInstructions = instructions()
final TableParquetWriterOptions writeInstructions = instructions()
.compressionCodecName("GZIP")
.maximumDictionaryKeys(100)
.maximumDictionarySize(200)
Expand All @@ -138,49 +144,4 @@ void toParquetInstructionTest() {
assertThat(parquetInstructions.onWriteCompleted()).isEmpty();
assertThat(parquetInstructions.getTableDefinition()).hasValue(definition);
}

@Test
void testSetSnapshotID() {
final IcebergParquetWriteInstructions instructions = instructions()
.snapshotId(12345)
.build();
assertThat(instructions.snapshotId().getAsLong()).isEqualTo(12345);
}

@Test
void testSetDhTables() {
final Table table1 = TableTools.emptyTable(3);
final Table table2 = TableTools.emptyTable(4);
final IcebergParquetWriteInstructions instructions = IcebergParquetWriteInstructions.builder()
.addTables(table1)
.addTables(table2)
.build();
assertThat(instructions.tables().size()).isEqualTo(2);
assertThat(instructions.tables().contains(table1)).isTrue();
assertThat(instructions.tables().contains(table2)).isTrue();
}

@Test
void testSetPartitionPaths() {
final Table table1 = TableTools.emptyTable(3);
final String pp1 = "P1C=1/PC2=2";
final Table table2 = TableTools.emptyTable(4);
final String pp2 = "P1C=2/PC2=3";
try {
final IcebergParquetWriteInstructions instructions = IcebergParquetWriteInstructions.builder()
.addPartitionPaths(pp1, pp2)
.build();
failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
} catch (final IllegalArgumentException e) {
assertThat(e).hasMessageContaining("Partition path must be provided for each table");
}

final IcebergParquetWriteInstructions instructions = IcebergParquetWriteInstructions.builder()
.addTables(table1, table2)
.addPartitionPaths(pp1, pp2)
.build();
assertThat(instructions.partitionPaths().size()).isEqualTo(2);
assertThat(instructions.partitionPaths().contains(pp1)).isTrue();
assertThat(instructions.partitionPaths().contains(pp2)).isTrue();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,23 @@ public Catalog catalog() {
return catalog;
}

/**
* Create a new Iceberg table in the catalog with the given table identifier and definition.
* <p>
* All columns of type {@link ColumnDefinition.ColumnType#Partitioning partitioning} will be used to create the
* partition spec for the table.
*
* @param tableIdentifier The identifier string of the new table.
* @param definition The {@link TableDefinition} of the new table.
* @return The {@link IcebergTableAdapter table adapter} for the new Iceberg table.
* @throws AlreadyExistsException if the table already exists
*/
public IcebergTableAdapter createTable(
@NotNull final String tableIdentifier,
@NotNull final TableDefinition definition) {
return createTable(TableIdentifier.parse(tableIdentifier), definition);
}

/**
* Create a new Iceberg table in the catalog with the given table identifier and definition.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,22 @@
package io.deephaven.iceberg.util;

import io.deephaven.annotations.CopyableStyle;
import io.deephaven.engine.table.TableDefinition;
import org.apache.iceberg.Snapshot;
import org.immutables.value.Value;
import org.immutables.value.Value.Immutable;

import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;

/**
* This class provides instructions intended for reading Iceberg catalogs and tables. The default values documented in
* this class may change in the future. As such, callers may wish to explicitly set the values.
*/
@Immutable
@CopyableStyle
public abstract class IcebergReadInstructions implements IcebergBaseInstructions {
public abstract class IcebergReadInstructions {
/**
* The default {@link IcebergReadInstructions} to use when reading Iceberg data files. Providing this will use
* system defaults for cloud provider-specific parameters.
Expand All @@ -27,6 +30,17 @@ public static Builder builder() {
return ImmutableIcebergReadInstructions.builder();
}

/**
* The {@link TableDefinition} to use when reading Iceberg data files.
*/
public abstract Optional<TableDefinition> tableDefinition();

/**
* The data instructions to use for reading the Iceberg data files (might be S3Instructions or other cloud
* provider-specific instructions).
*/
public abstract Optional<Object> dataInstructions();

/**
* A {@link Map map} of rename instructions from Iceberg to Deephaven column names to use when reading the Iceberg
* data files.
Expand All @@ -47,23 +61,54 @@ public IcebergUpdateMode updateMode() {
return IcebergUpdateMode.staticMode();
}

/**
* The identifier of the snapshot to load for reading. If both this and {@link #snapshot()} are provided, the
* {@link Snapshot#snapshotId()} should match this. Otherwise, only one of them should be provided. If neither is
* provided, the latest snapshot will be loaded.
*/
public abstract OptionalLong snapshotId();

/**
* Return a copy of this instructions object with the snapshot ID replaced by {@code value}.
*/
public abstract IcebergReadInstructions withSnapshotId(long value);

/**
* The snapshot to load for reading. If both this and {@link #snapshotId()} are provided, the
* {@link Snapshot#snapshotId()} should match the {@link #snapshotId()}. Otherwise, only one of them should be
* provided. If neither is provided, the latest snapshot will be loaded.
*/
public abstract Optional<Snapshot> snapshot();

/**
* Return a copy of this instructions object with the snapshot replaced by {@code value}.
*/
public abstract IcebergReadInstructions withSnapshot(Snapshot value);

public interface Builder extends IcebergBaseInstructions.Builder<Builder> {
public interface Builder {
Builder tableDefinition(TableDefinition tableDefinition);

Builder dataInstructions(Object s3Instructions);

Builder putColumnRenames(String key, String value);

Builder putAllColumnRenames(Map<String, ? extends String> entries);

Builder updateMode(IcebergUpdateMode updateMode);

Builder snapshotId(long snapshotId);

Builder snapshot(Snapshot snapshot);

IcebergReadInstructions build();
}

@Value.Check
final void checkSnapshotId() {
if (snapshotId().isPresent() && snapshot().isPresent() &&
snapshotId().getAsLong() != snapshot().get().snapshotId()) {
throw new IllegalArgumentException("If both snapshotID and snapshot are provided, the snapshot Ids " +
"must match, found " + snapshotId().getAsLong() + " and " + snapshot().get().snapshotId());
}
}
}
Loading

0 comments on commit aa49f7f

Please sign in to comment.