Skip to content

Commit

Permalink
Support Iceberg's timestamp with nano-precision data type
Browse files Browse the repository at this point in the history
NOTE On hold, because Iceberg code as of 1.7.1 does not yet fully support that new data type, causing test failures.

Fixes projectnessie#10236
  • Loading branch information
snazy committed Jan 21, 2025
1 parent e129315 commit ab821ba
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.stringType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.structType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.timeType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.timestampNanosType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.timestampNanosTzType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.timestampType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.timestamptzType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.timestampTzType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.uuidType;

import java.util.Iterator;
Expand Down Expand Up @@ -98,7 +100,9 @@ public static Stream<IcebergType> icebergTypes(IntSupplier idSupplier) {
decimalType(10, 3),
fixedType(42),
timestampType(),
timestamptzType());
timestampTzType(),
timestampNanosType(),
timestampNanosTzType());
}

public static IcebergSchema icebergSchemaAllTypes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.TYPE_MAP;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.TYPE_STRUCT;
import static org.projectnessie.catalog.model.id.NessieId.transientNessieId;
import static org.projectnessie.catalog.model.schema.types.NessieType.DEFAULT_TIME_PRECISION;
import static org.projectnessie.catalog.model.schema.types.NessieType.MICROS_TIME_PRECISION;
import static org.projectnessie.catalog.model.schema.types.NessieType.NANOS_TIME_PRECISION;
import static org.projectnessie.catalog.model.snapshot.NessieViewRepresentation.NessieViewSQLRepresentation.nessieViewSQLRepresentation;
import static org.projectnessie.catalog.model.snapshot.TableFormat.ICEBERG;
import static org.projectnessie.catalog.model.statistics.NessiePartitionStatisticsFile.partitionStatisticsFile;
Expand Down Expand Up @@ -295,18 +296,23 @@ public static IcebergType nessieTypeToIcebergType(NessieTypeSpec type) {
return IcebergType.dateType();
case TIME:
NessieTimeTypeSpec time = (NessieTimeTypeSpec) type;
if (time.precision() != DEFAULT_TIME_PRECISION || time.withTimeZone()) {
if (time.precision() != MICROS_TIME_PRECISION || time.withTimeZone()) {
throw new IllegalArgumentException("Data type not supported in Iceberg: " + type);
}
return IcebergType.timeType();
case TIMESTAMP:
NessieTimestampTypeSpec timestamp = (NessieTimestampTypeSpec) type;
if (timestamp.precision() != DEFAULT_TIME_PRECISION) {
throw new IllegalArgumentException("Data type not supported in Iceberg: " + type);
switch (timestamp.precision()) {
case MICROS_TIME_PRECISION:
return timestamp.withTimeZone()
? IcebergType.timestampTzType()
: IcebergType.timestampType();
case NANOS_TIME_PRECISION:
return timestamp.withTimeZone()
? IcebergType.timestampNanosTzType()
: IcebergType.timestampNanosType();
}
return timestamp.withTimeZone()
? IcebergType.timestamptzType()
: IcebergType.timestampType();
throw new IllegalArgumentException("Data type not supported in Iceberg: " + type);
case BINARY:
return IcebergType.binaryType();
case DECIMAL:
Expand Down Expand Up @@ -357,9 +363,13 @@ static NessieTypeSpec icebergTypeToNessieType(
IcebergType type, Map<Integer, NessieField> icebergFields) {
switch (type.type()) {
case IcebergType.TYPE_TIMESTAMP_TZ:
return NessieType.timestampType(true);
return NessieType.timestampType(MICROS_TIME_PRECISION, true);
case IcebergType.TYPE_TIMESTAMP:
return NessieType.timestampType(false);
return NessieType.timestampType(MICROS_TIME_PRECISION, false);
case IcebergType.TYPE_TIMESTAMP_NS_TZ:
return NessieType.timestampType(NANOS_TIME_PRECISION, true);
case IcebergType.TYPE_TIMESTAMP_NS:
return NessieType.timestampType(NANOS_TIME_PRECISION, false);
case IcebergType.TYPE_BOOLEAN:
return NessieType.booleanType();
case IcebergType.TYPE_UUID:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright (C) 2023 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.catalog.formats.iceberg.types;

import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;

/** Iceberg timestamp, nanosecond precision. */
public final class IcebergTimestampNanosType extends IcebergPrimitiveType {
private static final Schema TIMESTAMP_NS_SCHEMA =
LogicalTypes.timestampNanos().addToSchema(Schema.create(Schema.Type.LONG));
private static final Schema TIMESTAMPTZ_NS_SCHEMA =
LogicalTypes.timestampNanos().addToSchema(Schema.create(Schema.Type.LONG));
public static final String ADJUST_TO_UTC_PROP = "adjust-to-utc";

static {
TIMESTAMP_NS_SCHEMA.addProp(ADJUST_TO_UTC_PROP, false);
TIMESTAMPTZ_NS_SCHEMA.addProp(ADJUST_TO_UTC_PROP, true);
}

private final boolean adjustToUTC;

IcebergTimestampNanosType(boolean adjustToUTC) {
this.adjustToUTC = adjustToUTC;
}

public boolean adjustToUTC() {
return adjustToUTC;
}

@Override
public String type() {
return adjustToUTC() ? TYPE_TIMESTAMP_NS_TZ : TYPE_TIMESTAMP_NS;
}

@Override
public Schema avroSchema(int fieldId) {
return adjustToUTC() ? TIMESTAMPTZ_NS_SCHEMA : TIMESTAMP_NS_SCHEMA;
}

@Override
public byte[] serializeSingleValue(Object value) {
return IcebergLongType.serializeLong((Long) value);
}

@Override
public Object deserializeSingleValue(byte[] value) {
return IcebergLongType.deserializeLong(value);
}

@Override
public int hashCode() {
return type().hashCode() ^ (adjustToUTC ? 1 : 0);
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof IcebergTimestampNanosType)) {
return false;
}
if (obj == this) {
return true;
}
IcebergTimestampNanosType o = (IcebergTimestampNanosType) obj;
return o.adjustToUTC == adjustToUTC;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;

/** Iceberg timestamp, microsecond precision. */
public final class IcebergTimestampType extends IcebergPrimitiveType {
private static final Schema TIMESTAMP_SCHEMA =
LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public interface IcebergType {
String TYPE_BINARY = "binary";
String TYPE_TIMESTAMP = "timestamp";
String TYPE_TIMESTAMP_TZ = "timestamptz";
String TYPE_TIMESTAMP_NS = "timestamp_ns";
String TYPE_TIMESTAMP_NS_TZ = "timestamptz_ns";
String TYPE_FIXED = "fixed";
String TYPE_DECIMAL = "decimal";
String TYPE_STRUCT = "struct";
Expand Down Expand Up @@ -89,14 +91,22 @@ static IcebergBinaryType binaryType() {
return IcebergTypes.BINARY;
}

static IcebergTimestampType timestamptzType() {
static IcebergTimestampType timestampTzType() {
return IcebergTypes.TIMESTAMPTZ;
}

static IcebergTimestampType timestampType() {
return IcebergTypes.TIMESTAMP;
}

static IcebergTimestampNanosType timestampNanosTzType() {
return IcebergTypes.TIMESTAMPTZ_NS;
}

static IcebergTimestampNanosType timestampNanosType() {
return IcebergTypes.TIMESTAMP_NS;
}

static IcebergFixedType fixedType(int length) {
return ImmutableIcebergFixedType.of(length);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,35 +45,41 @@ final class IcebergTypes {
static final IcebergBinaryType BINARY = new IcebergBinaryType();
static final IcebergTimestampType TIMESTAMPTZ = new IcebergTimestampType(true);
static final IcebergTimestampType TIMESTAMP = new IcebergTimestampType(false);
static final IcebergTimestampNanosType TIMESTAMPTZ_NS = new IcebergTimestampNanosType(true);
static final IcebergTimestampNanosType TIMESTAMP_NS = new IcebergTimestampNanosType(false);

private IcebergTypes() {}

static IcebergPrimitiveType primitiveFromString(String primitiveType) {
switch (primitiveType) {
case IcebergBooleanType.TYPE_BOOLEAN:
case IcebergType.TYPE_BOOLEAN:
return IcebergType.booleanType();
case IcebergUuidType.TYPE_UUID:
case IcebergType.TYPE_UUID:
return IcebergType.uuidType();
case IcebergIntegerType.TYPE_INT:
case IcebergType.TYPE_INT:
return IcebergType.integerType();
case IcebergLongType.TYPE_LONG:
case IcebergType.TYPE_LONG:
return IcebergType.longType();
case IcebergFloatType.TYPE_FLOAT:
case IcebergType.TYPE_FLOAT:
return IcebergType.floatType();
case IcebergDoubleType.TYPE_DOUBLE:
case IcebergType.TYPE_DOUBLE:
return IcebergType.doubleType();
case IcebergDateType.TYPE_DATE:
case IcebergType.TYPE_DATE:
return IcebergType.dateType();
case IcebergTimeType.TYPE_TIME:
case IcebergType.TYPE_TIME:
return IcebergType.timeType();
case IcebergStringType.TYPE_STRING:
case IcebergType.TYPE_STRING:
return IcebergType.stringType();
case IcebergBinaryType.TYPE_BINARY:
case IcebergType.TYPE_BINARY:
return IcebergType.binaryType();
case IcebergTimestampType.TYPE_TIMESTAMP_TZ:
return IcebergType.timestamptzType();
case IcebergTimestampType.TYPE_TIMESTAMP:
case IcebergType.TYPE_TIMESTAMP_TZ:
return IcebergType.timestampTzType();
case IcebergType.TYPE_TIMESTAMP:
return IcebergType.timestampType();
case IcebergType.TYPE_TIMESTAMP_NS_TZ:
return IcebergType.timestampNanosTzType();
case IcebergType.TYPE_TIMESTAMP_NS:
return IcebergType.timestampNanosType();
default:
Matcher m = DECIMAL_PATTERN.matcher(primitiveType);
if (m.matches()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.mapType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.stringType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.structType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.timestamptzType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.timestampTzType;
import static org.projectnessie.catalog.model.id.NessieIdHasher.nessieIdHasher;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -367,7 +367,7 @@ static Stream<Arguments> icebergNested() {
nestedField(102, "topic", false, stringType(), null),
nestedField(103, "partition", false, integerType(), null),
nestedField(104, "offset", false, longType(), null),
nestedField(105, "timestamp", false, timestamptzType(), null),
nestedField(105, "timestamp", false, timestampTzType(), null),
nestedField(106, "timestampType", false, integerType(), null),
nestedField(
107,
Expand Down Expand Up @@ -433,7 +433,7 @@ static Stream<Arguments> icebergNested() {
nestedField(3, "topic", false, stringType(), null),
nestedField(4, "partition", false, integerType(), null),
nestedField(5, "offset", false, longType(), null),
nestedField(6, "timestamp", false, timestamptzType(), null),
nestedField(6, "timestamp", false, timestampTzType(), null),
nestedField(7, "timestampType", false, integerType(), null),
nestedField(
8,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.stringType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.structType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.timeType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.timestampNanosType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.timestampNanosTzType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.timestampType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.timestamptzType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.timestampTzType;
import static org.projectnessie.catalog.formats.iceberg.types.IcebergType.uuidType;

import java.util.stream.Stream;
Expand Down Expand Up @@ -75,7 +77,9 @@ static Stream<Arguments> types() {
arguments(dateType(), "\"date\""),
arguments(timeType(), "\"time\""),
arguments(timestampType(), "\"timestamp\""),
arguments(timestamptzType(), "\"timestamptz\""),
arguments(timestampTzType(), "\"timestamptz\""),
arguments(timestampNanosType(), "\"timestamp_ns\""),
arguments(timestampNanosTzType(), "\"timestamptz_ns\""),
arguments(uuidType(), "\"uuid\""),
arguments(fixedType(42), "\"fixed[42]\""),
arguments(decimalType(33, 11), "\"decimal(33, 11)\""),
Expand Down Expand Up @@ -153,6 +157,8 @@ static Stream<Arguments> icebergTypes() {
arguments(Types.DecimalType.of(10, 3), decimalType(10, 3)),
arguments(Types.FixedType.ofLength(42), fixedType(42)),
arguments(Types.TimestampType.withoutZone(), timestampType()),
arguments(Types.TimestampType.withZone(), timestamptzType()));
arguments(Types.TimestampType.withZone(), timestampTzType()),
arguments(Types.TimestampNanoType.withoutZone(), timestampNanosType()),
arguments(Types.TimestampNanoType.withZone(), timestampNanosTzType()));
}
}
Loading

0 comments on commit ab821ba

Please sign in to comment.