-
Notifications
You must be signed in to change notification settings - Fork 80
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Don't fail when reading non-identity partitioning field (#6477)
- Loading branch information
1 parent
fbad5e6
commit d8882a4
Showing
14 changed files
with
211 additions
and
34 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
108 changes: 108 additions & 0 deletions
108
extensions/iceberg/src/test/java/io/deephaven/iceberg/Pyiceberg2Test.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
// | ||
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending | ||
// | ||
package io.deephaven.iceberg; | ||
|
||
import io.deephaven.engine.table.ColumnDefinition; | ||
import io.deephaven.engine.table.Table; | ||
import io.deephaven.engine.table.TableDefinition; | ||
import io.deephaven.engine.testutil.TstUtils; | ||
import io.deephaven.engine.util.TableTools; | ||
import io.deephaven.iceberg.sqlite.DbResource; | ||
import io.deephaven.iceberg.util.IcebergCatalogAdapter; | ||
import io.deephaven.iceberg.util.IcebergTableAdapter; | ||
import org.apache.iceberg.PartitionField; | ||
import org.apache.iceberg.PartitionSpec; | ||
import org.apache.iceberg.Snapshot; | ||
import org.apache.iceberg.catalog.Namespace; | ||
import org.apache.iceberg.catalog.TableIdentifier; | ||
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.Tag; | ||
import org.junit.jupiter.api.Test; | ||
|
||
import java.time.LocalDateTime; | ||
import java.net.URISyntaxException; | ||
import java.util.List; | ||
import static io.deephaven.util.QueryConstants.NULL_DOUBLE; | ||
|
||
import static org.assertj.core.api.Assertions.assertThat; | ||
|
||
/** | ||
* This test shows that we can integrate with data written by <a href="https://py.iceberg.apache.org/">pyiceberg</a>. | ||
* See TESTING.md and generate-pyiceberg-2.py for more details. | ||
*/ | ||
@Tag("security-manager-allow") | ||
class Pyiceberg2Test { | ||
private static final Namespace NAMESPACE = Namespace.of("trading"); | ||
private static final TableIdentifier TRADING_DATA = TableIdentifier.of(NAMESPACE, "data"); | ||
|
||
// This will need to be updated if the data is regenerated | ||
private static final long SNAPSHOT_1_ID = 2806418501596315192L; | ||
|
||
private static final TableDefinition TABLE_DEFINITION = TableDefinition.of( | ||
ColumnDefinition.fromGenericType("datetime", LocalDateTime.class), | ||
ColumnDefinition.ofString("symbol").withPartitioning(), | ||
ColumnDefinition.ofDouble("bid"), | ||
ColumnDefinition.ofDouble("ask")); | ||
|
||
private IcebergCatalogAdapter catalogAdapter; | ||
|
||
@BeforeEach | ||
void setUp() throws URISyntaxException { | ||
catalogAdapter = DbResource.openCatalog("pyiceberg-2"); | ||
} | ||
|
||
@Test | ||
void catalogInfo() { | ||
assertThat(catalogAdapter.listNamespaces()).containsExactly(NAMESPACE); | ||
assertThat(catalogAdapter.listTables(NAMESPACE)).containsExactly(TRADING_DATA); | ||
|
||
final IcebergTableAdapter tableAdapter = catalogAdapter.loadTable(TRADING_DATA); | ||
final List<Snapshot> snapshots = tableAdapter.listSnapshots(); | ||
assertThat(snapshots).hasSize(1); | ||
{ | ||
final Snapshot snapshot = snapshots.get(0); | ||
assertThat(snapshot.parentId()).isNull(); | ||
assertThat(snapshot.schemaId()).isEqualTo(0); | ||
assertThat(snapshot.sequenceNumber()).isEqualTo(1L); | ||
assertThat(snapshot.snapshotId()).isEqualTo(SNAPSHOT_1_ID); | ||
} | ||
} | ||
|
||
@Test | ||
void testDefinition() { | ||
final IcebergTableAdapter tableAdapter = catalogAdapter.loadTable(TRADING_DATA); | ||
final TableDefinition td = tableAdapter.definition(); | ||
assertThat(td).isEqualTo(TABLE_DEFINITION); | ||
|
||
// Check the partition spec | ||
final PartitionSpec partitionSpec = tableAdapter.icebergTable().spec(); | ||
assertThat(partitionSpec.fields().size()).isEqualTo(2); | ||
final PartitionField firstPartitionField = partitionSpec.fields().get(0); | ||
assertThat(firstPartitionField.name()).isEqualTo("datetime_day"); | ||
assertThat(firstPartitionField.transform().toString()).isEqualTo("day"); | ||
|
||
final PartitionField secondPartitionField = partitionSpec.fields().get(1); | ||
assertThat(secondPartitionField.name()).isEqualTo("symbol"); | ||
assertThat(secondPartitionField.transform().toString()).isEqualTo("identity"); | ||
} | ||
|
||
@Test | ||
void testData() { | ||
final IcebergTableAdapter tableAdapter = catalogAdapter.loadTable(TRADING_DATA); | ||
final Table fromIceberg = tableAdapter.table(); | ||
assertThat(fromIceberg.size()).isEqualTo(5); | ||
final Table expectedData = TableTools.newTable(TABLE_DEFINITION, | ||
TableTools.col("datetime", | ||
LocalDateTime.of(2024, 11, 27, 10, 0, 0), | ||
LocalDateTime.of(2024, 11, 27, 10, 0, 0), | ||
LocalDateTime.of(2024, 11, 26, 10, 1, 0), | ||
LocalDateTime.of(2024, 11, 26, 10, 2, 0), | ||
LocalDateTime.of(2024, 11, 28, 10, 3, 0)), | ||
TableTools.stringCol("symbol", "AAPL", "MSFT", "GOOG", "AMZN", "MSFT"), | ||
TableTools.doubleCol("bid", 150.25, 150.25, 2800.75, 3400.5, NULL_DOUBLE), | ||
TableTools.doubleCol("ask", 151.0, 151.0, 2810.5, 3420.0, 250.0)); | ||
TstUtils.assertTableEquals(expectedData.sort("datetime", "symbol"), | ||
fromIceberg.sort("datetime", "symbol")); | ||
} | ||
} |
3 changes: 3 additions & 0 deletions
3
.../datetime_day=2024-11-26/symbol=AMZN/00000-0-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet
Git LFS file not shown
3 changes: 3 additions & 0 deletions
3
.../datetime_day=2024-11-26/symbol=GOOG/00000-1-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet
Git LFS file not shown
3 changes: 3 additions & 0 deletions
3
.../datetime_day=2024-11-27/symbol=AAPL/00000-2-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet
Git LFS file not shown
3 changes: 3 additions & 0 deletions
3
.../datetime_day=2024-11-27/symbol=MSFT/00000-3-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet
Git LFS file not shown
3 changes: 3 additions & 0 deletions
3
.../datetime_day=2024-11-28/symbol=MSFT/00000-4-d9c06748-9892-404f-a744-7bbfd06d0eeb.parquet
Git LFS file not shown
1 change: 1 addition & 0 deletions
1
...eberg-2/trading.db/data/metadata/00000-0956ea6c-b522-447f-a2f4-5c6e7b104783.metadata.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
{"location":"catalogs/pyiceberg-2/trading.db/data","table-uuid":"d1f874d4-c065-432d-969b-39efb3e9eb1c","last-updated-ms":1733868694938,"last-column-id":4,"schemas":[{"type":"struct","fields":[{"id":1,"name":"datetime","type":"timestamp","required":false},{"id":2,"name":"symbol","type":"string","required":false},{"id":3,"name":"bid","type":"double","required":false},{"id":4,"name":"ask","type":"double","required":false}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"day","name":"datetime_day"},{"source-id":2,"field-id":1001,"transform":"identity","name":"symbol"}]}],"default-spec-id":0,"last-partition-id":1001,"properties":{},"snapshots":[],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":2,"last-sequence-number":0} |
1 change: 1 addition & 0 deletions
1
...eberg-2/trading.db/data/metadata/00001-4e3fe6dc-5e3e-4da1-9da3-666cbad70ace.metadata.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
{"location":"catalogs/pyiceberg-2/trading.db/data","table-uuid":"d1f874d4-c065-432d-969b-39efb3e9eb1c","last-updated-ms":1733868695120,"last-column-id":4,"schemas":[{"type":"struct","fields":[{"id":1,"name":"datetime","type":"timestamp","required":false},{"id":2,"name":"symbol","type":"string","required":false},{"id":3,"name":"bid","type":"double","required":false},{"id":4,"name":"ask","type":"double","required":false}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"day","name":"datetime_day"},{"source-id":2,"field-id":1001,"transform":"identity","name":"symbol"}]}],"default-spec-id":0,"last-partition-id":1001,"properties":{},"current-snapshot-id":2806418501596315192,"snapshots":[{"snapshot-id":2806418501596315192,"sequence-number":1,"timestamp-ms":1733868695120,"manifest-list":"catalogs/pyiceberg-2/trading.db/data/metadata/snap-2806418501596315192-0-d9c06748-9892-404f-a744-7bbfd06d0eeb.avro","summary":{"operation":"append","added-files-size":"9816","added-data-files":"5","added-records":"5","changed-partition-count":"5","total-data-files":"5","total-delete-files":"0","total-records":"5","total-files-size":"9816","total-position-deletes":"0","total-equality-deletes":"0"},"schema-id":0}],"snapshot-log":[{"snapshot-id":2806418501596315192,"timestamp-ms":1733868695120}],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{"main":{"snapshot-id":2806418501596315192,"type":"branch"}},"format-version":2,"last-sequence-number":1} |
Binary file added
BIN
+5.94 KB
...atalogs/pyiceberg-2/trading.db/data/metadata/d9c06748-9892-404f-a744-7bbfd06d0eeb-m0.avro
Binary file not shown.
Binary file added
BIN
+1.76 KB
...ing.db/data/metadata/snap-2806418501596315192-0-d9c06748-9892-404f-a744-7bbfd06d0eeb.avro
Binary file not shown.
Binary file modified
BIN
+0 Bytes
(100%)
...ons/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/dh-iceberg-test.db
Binary file not shown.
4 changes: 4 additions & 0 deletions
4
...ceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/generate-pyiceberg-1.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
58 changes: 58 additions & 0 deletions
58
...ceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/generate-pyiceberg-2.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
''' | ||
See TESTING.md for how to run this script. | ||
''' | ||
|
||
import pyarrow as pa | ||
from datetime import datetime | ||
from pyiceberg.catalog.sql import SqlCatalog | ||
from pyiceberg.schema import Schema | ||
from pyiceberg.types import TimestampType, FloatType, DoubleType, StringType, NestedField, StructType | ||
from pyiceberg.partitioning import PartitionSpec, PartitionField | ||
from pyiceberg.transforms import DayTransform, IdentityTransform | ||
|
||
catalog = SqlCatalog( | ||
"pyiceberg-2", | ||
**{ | ||
"uri": f"sqlite:///dh-iceberg-test.db", | ||
"warehouse": f"catalogs/pyiceberg-2", | ||
}, | ||
) | ||
|
||
schema = Schema( | ||
NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=False), | ||
NestedField(field_id=2, name="symbol", field_type=StringType(), required=False), | ||
NestedField(field_id=3, name="bid", field_type=DoubleType(), required=False), | ||
NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False), | ||
) | ||
|
||
partition_spec = PartitionSpec( | ||
PartitionField( | ||
source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day", | ||
), | ||
PartitionField( | ||
source_id=2, field_id=1001, transform=IdentityTransform(), name="symbol", | ||
) | ||
) | ||
|
||
catalog.create_namespace("trading") | ||
|
||
tbl = catalog.create_table( | ||
identifier="trading.data", | ||
schema=schema, | ||
partition_spec=partition_spec, | ||
) | ||
|
||
# Define the data according to your Iceberg schema | ||
data = [ | ||
{"datetime": datetime(2024, 11, 27, 10, 0, 0), "symbol": "AAPL", "bid": 150.25, "ask": 151.0}, | ||
{"datetime": datetime(2024, 11, 27, 10, 0, 0), "symbol": "MSFT", "bid": 150.25, "ask": 151.0}, | ||
{"datetime": datetime(2024, 11, 26, 10, 1, 0), "symbol": "GOOG", "bid": 2800.75, "ask": 2810.5}, | ||
{"datetime": datetime(2024, 11, 26, 10, 2, 0), "symbol": "AMZN", "bid": 3400.5, "ask": 3420.0}, | ||
{"datetime": datetime(2024, 11, 28, 10, 3, 0), "symbol": "MSFT", "bid": None, "ask": 250.0}, | ||
] | ||
|
||
# Create a PyArrow Table | ||
table = pa.Table.from_pylist(data) | ||
|
||
# Append the table to the Iceberg table | ||
tbl.append(table) |