Skip to content

Commit

Permalink
Core, Spark: Calling rewrite_position_delete_files fails on tables wi…
Browse files Browse the repository at this point in the history
…th more than 1k columns (apache#10020)
  • Loading branch information
szehon-ho authored Jun 12, 2024
1 parent bdd6225 commit b6c949c
Show file tree
Hide file tree
Showing 17 changed files with 541 additions and 34 deletions.
6 changes: 6 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,12 @@ acceptedBreaks:
old: "method void org.apache.iceberg.PositionDeletesTable.PositionDeletesBatchScan::<init>(org.apache.iceberg.Table,\
\ org.apache.iceberg.Schema, org.apache.iceberg.TableScanContext)"
justification: "Removing deprecated code"
"1.5.0":
org.apache.iceberg:iceberg-api:
- code: "java.class.defaultSerializationChanged"
old: "class org.apache.iceberg.types.Types.NestedField"
new: "class org.apache.iceberg.types.Types.NestedField"
justification: "new Constructor added"
apache-iceberg-0.14.0:
org.apache.iceberg:iceberg-api:
- code: "java.class.defaultSerializationChanged"
Expand Down
26 changes: 26 additions & 0 deletions api/src/main/java/org/apache/iceberg/PartitionSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -60,6 +61,7 @@ public class PartitionSpec implements Serializable {
private transient volatile ListMultimap<Integer, PartitionField> fieldsBySourceId = null;
private transient volatile Class<?>[] lazyJavaClasses = null;
private transient volatile StructType lazyPartitionType = null;
private transient volatile StructType lazyRawPartitionType = null;
private transient volatile List<PartitionField> fieldList = null;
private final int lastAssignedFieldId;

Expand Down Expand Up @@ -140,6 +142,30 @@ public StructType partitionType() {
return lazyPartitionType;
}

/**
* Returns a struct matching partition information as written into manifest files. See {@link
* #partitionType()} for a struct with field ID's potentially re-assigned to avoid conflict.
*/
public StructType rawPartitionType() {
if (schema.idsToOriginal().isEmpty()) {
// not re-assigned.
return partitionType();
}
if (lazyRawPartitionType == null) {
synchronized (this) {
if (lazyRawPartitionType == null) {
this.lazyRawPartitionType =
StructType.of(
partitionType().fields().stream()
.map(f -> f.withFieldId(schema.idsToOriginal().get(f.fieldId())))
.collect(Collectors.toList()));
}
}
}

return lazyRawPartitionType;
}

public Class<?>[] javaClasses() {
if (lazyJavaClasses == null) {
synchronized (this) {
Expand Down
70 changes: 68 additions & 2 deletions api/src/main/java/org/apache/iceberg/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Locale;
Expand All @@ -34,6 +35,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
import org.apache.iceberg.types.Type;
Expand Down Expand Up @@ -65,6 +67,8 @@ public class Schema implements Serializable {
private transient Map<Integer, Accessor<StructLike>> idToAccessor = null;
private transient Map<Integer, String> idToName = null;
private transient Set<Integer> identifierFieldIdSet = null;
private transient Map<Integer, Integer> idsToReassigned;
private transient Map<Integer, Integer> idsToOriginal;

public Schema(List<NestedField> columns, Map<String, Integer> aliases) {
this(columns, aliases, ImmutableSet.of());
Expand All @@ -83,21 +87,47 @@ public Schema(List<NestedField> columns, Set<Integer> identifierFieldIds) {
this(DEFAULT_SCHEMA_ID, columns, identifierFieldIds);
}

public Schema(List<NestedField> columns, Set<Integer> identifierFieldIds, TypeUtil.GetID getId) {
this(DEFAULT_SCHEMA_ID, columns, identifierFieldIds, getId);
}

public Schema(int schemaId, List<NestedField> columns) {
this(schemaId, columns, ImmutableSet.of());
}

public Schema(int schemaId, List<NestedField> columns, Set<Integer> identifierFieldIds) {
this(schemaId, columns, null, identifierFieldIds);
this(schemaId, columns, null, identifierFieldIds, null);
}

public Schema(
int schemaId,
List<NestedField> columns,
Set<Integer> identifierFieldIds,
TypeUtil.GetID getId) {
this(schemaId, columns, null, identifierFieldIds, getId);
}

public Schema(
int schemaId,
List<NestedField> columns,
Map<String, Integer> aliases,
Set<Integer> identifierFieldIds) {
this(schemaId, columns, aliases, identifierFieldIds, null);
}

public Schema(
int schemaId,
List<NestedField> columns,
Map<String, Integer> aliases,
Set<Integer> identifierFieldIds,
TypeUtil.GetID getID) {
this.schemaId = schemaId;
this.struct = StructType.of(columns);

this.idsToOriginal = Maps.newHashMap();
this.idsToReassigned = Maps.newHashMap();
List<NestedField> finalColumns = reassignIds(columns, getID);

this.struct = StructType.of(finalColumns);
this.aliasToId = aliases != null ? ImmutableBiMap.copyOf(aliases) : null;

// validate IdentifierField
Expand Down Expand Up @@ -507,4 +537,40 @@ public String toString() {
.map(this::identifierFieldToString)
.collect(Collectors.toList())));
}

/**
* The ID's of some fields will be re-assigned if GetID is specified for the Schema.
*
* @return map of original to reassigned field ids
*/
public Map<Integer, Integer> idsToReassigned() {
return idsToReassigned != null ? idsToReassigned : Collections.emptyMap();
}

/**
* The ID's of some fields will be re-assigned if GetID is specified for the Schema.
*
* @return map of reassigned to original field ids
*/
public Map<Integer, Integer> idsToOriginal() {
return idsToOriginal != null ? idsToOriginal : Collections.emptyMap();
}

private List<NestedField> reassignIds(List<NestedField> columns, TypeUtil.GetID getID) {
if (getID == null) {
return columns;
}
Type res =
TypeUtil.assignIds(
StructType.of(columns),
oldId -> {
int newId = getID.get(oldId);
if (newId != oldId) {
idsToReassigned.put(oldId, newId);
idsToOriginal.put(newId, oldId);
}
return newId;
});
return res.asStructType().fields();
}
}
99 changes: 99 additions & 0 deletions api/src/main/java/org/apache/iceberg/types/AssignIds.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.types;

import java.util.Iterator;
import java.util.List;
import java.util.function.Supplier;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

class AssignIds extends TypeUtil.CustomOrderSchemaVisitor<Type> {
private final TypeUtil.GetID getID;

AssignIds(TypeUtil.GetID getID) {
this.getID = getID;
}

private int idFor(int id) {
return getID.get(id);
}

@Override
public Type schema(Schema schema, Supplier<Type> future) {
return future.get();
}

@Override
public Type struct(Types.StructType struct, Iterable<Type> futures) {
List<Types.NestedField> fields = struct.fields();
int length = struct.fields().size();

// assign IDs for this struct's fields first
List<Integer> newIds = Lists.newArrayListWithExpectedSize(length);
for (Types.NestedField field : fields) {
newIds.add(idFor(field.fieldId()));
}

List<Types.NestedField> newFields = Lists.newArrayListWithExpectedSize(length);
Iterator<Type> types = futures.iterator();
for (int i = 0; i < length; i += 1) {
Types.NestedField field = fields.get(i);
Type type = types.next();
if (field.isOptional()) {
newFields.add(Types.NestedField.optional(newIds.get(i), field.name(), type, field.doc()));
} else {
newFields.add(Types.NestedField.required(newIds.get(i), field.name(), type, field.doc()));
}
}

return Types.StructType.of(newFields);
}

@Override
public Type field(Types.NestedField field, Supplier<Type> future) {
return future.get();
}

@Override
public Type list(Types.ListType list, Supplier<Type> future) {
int newId = idFor(list.elementId());
if (list.isElementOptional()) {
return Types.ListType.ofOptional(newId, future.get());
} else {
return Types.ListType.ofRequired(newId, future.get());
}
}

@Override
public Type map(Types.MapType map, Supplier<Type> keyFuture, Supplier<Type> valueFuture) {
int newKeyId = idFor(map.keyId());
int newValueId = idFor(map.valueId());
if (map.isValueOptional()) {
return Types.MapType.ofOptional(newKeyId, newValueId, keyFuture.get(), valueFuture.get());
} else {
return Types.MapType.ofRequired(newKeyId, newValueId, keyFuture.get(), valueFuture.get());
}
}

@Override
public Type primitive(Type.PrimitiveType primitive) {
return primitive;
}
}
16 changes: 16 additions & 0 deletions api/src/main/java/org/apache/iceberg/types/TypeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,17 @@ public static Schema reassignOrRefreshIds(
return new Schema(struct.fields(), refreshIdentifierFields(struct, schema));
}

/**
* Assigns fresh ids from the {@link GetID getId function} for all fields in a type.
*
* @param type a type
* @param getId an id assignment function
* @return an structurally identical type with new ids assigned by the getId function
*/
public static Type assignIds(Type type, GetID getId) {
return TypeUtil.visit(type, new AssignIds(getId));
}

public static Type find(Schema schema, Predicate<Type> predicate) {
return visit(schema, new FindTypeVisitor(predicate));
}
Expand Down Expand Up @@ -521,6 +532,11 @@ public interface NextID {
int get();
}

/** Interface for passing a function that assigns column IDs from the previous Id. */
public interface GetID {
int get(int oldId);
}

public static class SchemaVisitor<T> {
public void beforeField(Types.NestedField field) {}

Expand Down
4 changes: 4 additions & 0 deletions api/src/main/java/org/apache/iceberg/types/Types.java
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,10 @@ public NestedField asRequired() {
return new NestedField(false, id, name, type, doc);
}

public NestedField withFieldId(int newId) {
return new NestedField(isOptional, newId, name, type, doc);
}

public int fieldId() {
return id;
}
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,11 @@ static PartitionSpec transformSpec(Schema metadataTableSchema, PartitionSpec spe
.withSpecId(spec.specId())
.checkConflicts(false);

Map<Integer, Integer> reassignedFields = metadataTableSchema.idsToReassigned();

for (PartitionField field : spec.fields()) {
builder.add(field.fieldId(), field.fieldId(), field.name(), Transforms.identity());
int newFieldId = reassignedFields.getOrDefault(field.fieldId(), field.fieldId());
builder.add(newFieldId, newFieldId, field.name(), Transforms.identity());
}
return builder.build();
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/ManifestReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ protected ManifestReader(
this.spec = readPartitionSpec(file);
}

this.fileSchema = new Schema(DataFile.getType(spec.partitionType()).fields());
this.fileSchema = new Schema(DataFile.getType(spec.rawPartitionType()).fields());
}

private <T extends ContentFile<T>> PartitionSpec readPartitionSpec(InputFile inputFile) {
Expand Down
Loading

0 comments on commit b6c949c

Please sign in to comment.