From b6e88ff82c6d651c1ec45420a8df868e7dcfd0c5 Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Mon, 3 Feb 2025 17:13:26 +0100 Subject: [PATCH] Spark writers --- .palantir/revapi.yml | 25 ++ .../iceberg/data/BaseFileWriterFactory.java | 210 ++++++++++++---- .../data/GenericFileWriterFactory.java | 22 +- .../data/RegistryBasedFileWriterFactory.java | 220 +++++++++++++++++ .../flink/sink/FlinkAppenderFactory.java | 2 +- .../flink/sink/FlinkFileWriterFactory.java | 93 +++---- gradle.properties | 2 +- .../main/java/org/apache/iceberg/orc/ORC.java | 6 + .../org/apache/iceberg/parquet/Parquet.java | 6 + .../spark/source/SparkFileWriterFactory.java | 229 +++++++++++++----- .../org.apache.iceberg.DataFileWriterService | 18 ++ .../org.apache.iceberg.DataFileWriterService | 18 ++ 12 files changed, 665 insertions(+), 186 deletions(-) create mode 100644 data/src/main/java/org/apache/iceberg/data/RegistryBasedFileWriterFactory.java create mode 100644 spark/v3.5/spark/src/main/resources/META-INF/services/org.apache.iceberg.DataFileWriterService create mode 100644 spark/v3.5/spark/src/test/resources/META-INF/services/org.apache.iceberg.DataFileWriterService diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 7a44456823f6..26b0e47b7038 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1180,6 +1180,31 @@ acceptedBreaks: old: "class org.apache.iceberg.data.InternalRecordWrapper" justification: "Moved from iceberg-data to iceberg-core, so this should not\ \ cause issues" + org.apache.iceberg:iceberg-orc: + - code: "java.method.exception.checkedAdded" + old: "method org.apache.iceberg.deletes.EqualityDeleteWriter org.apache.iceberg.orc.ORC.DeleteWriteBuilder::buildEqualityWriter()" + new: "method org.apache.iceberg.deletes.EqualityDeleteWriter org.apache.iceberg.orc.ORC.DeleteWriteBuilder::buildEqualityWriter()\ + \ throws java.io.IOException" + justification: "Binary compatible and equivalent" + - code: "java.method.exception.checkedAdded" + old: "method org.apache.iceberg.deletes.PositionDeleteWriter org.apache.iceberg.orc.ORC.DeleteWriteBuilder::buildPositionWriter()" + new: "method org.apache.iceberg.deletes.PositionDeleteWriter org.apache.iceberg.orc.ORC.DeleteWriteBuilder::buildPositionWriter()\ + \ throws java.io.IOException" + justification: "Binary compatible and equivalent" + - code: "java.method.exception.checkedAdded" + old: "method org.apache.iceberg.io.DataWriter org.apache.iceberg.orc.ORC.DataWriteBuilder::build()" + new: "method org.apache.iceberg.io.DataWriter org.apache.iceberg.io.FileFormatDataWriterBuilderBase>>::build() throws\ + \ java.io.IOException @ org.apache.iceberg.orc.ORC.DataWriteBuilder" + justification: "Binary compatible and equivalent" + - code: "java.method.movedToSuperClass" + old: "method org.apache.iceberg.io.DataWriter org.apache.iceberg.orc.ORC.DataWriteBuilder::build()" + new: "method org.apache.iceberg.io.DataWriter org.apache.iceberg.io.FileFormatDataWriterBuilderBase>>::build() throws\ + \ java.io.IOException @ org.apache.iceberg.orc.ORC.DataWriteBuilder" + justification: "Binary compatible and equivalent" apache-iceberg-0.14.0: org.apache.iceberg:iceberg-api: - code: "java.class.defaultSerializationChanged" diff --git a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java index cebc5c7952a5..0cf7b46034b4 100644 --- a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.Map; -import org.apache.iceberg.DataFileWriterServiceRegistry; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionSpec; @@ -39,11 +38,15 @@ import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; -/** A base writer factory to be extended by query engine integrations. */ +/** + * A base writer factory to be extended by query engine integrations. + * + * @deprecated use {@link RegistryBasedFileWriterFactory} + */ +@Deprecated public abstract class BaseFileWriterFactory implements FileWriterFactory { private final Table table; private final FileFormat dataFileFormat; - private final Class inputType; private final Schema dataSchema; private final SortOrder dataSortOrder; private final FileFormat deleteFileFormat; @@ -55,7 +58,6 @@ public abstract class BaseFileWriterFactory implements FileWriterFactory { protected BaseFileWriterFactory( Table table, FileFormat dataFileFormat, - Class inputType, Schema dataSchema, SortOrder dataSortOrder, FileFormat deleteFileFormat, @@ -65,7 +67,6 @@ protected BaseFileWriterFactory( Schema positionDeleteRowSchema) { this.table = table; this.dataFileFormat = dataFileFormat; - this.inputType = inputType; this.dataSchema = dataSchema; this.dataSortOrder = dataSortOrder; this.deleteFileFormat = deleteFileFormat; @@ -93,12 +94,6 @@ protected BaseFileWriterFactory( protected abstract void configurePositionDelete(ORC.DeleteWriteBuilder builder); - protected abstract S rowSchemaType(); - - protected abstract S equalityDeleteRowSchemaType(); - - protected abstract S positionDeleteRowSchemaType(); - @Override public DataWriter newDataWriter( EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { @@ -107,17 +102,59 @@ public DataWriter newDataWriter( MetricsConfig metricsConfig = MetricsConfig.forTable(table); try { - return DataFileWriterServiceRegistry.dataWriterBuilder( - dataFileFormat, inputType, file, rowSchemaType()) - .schema(dataSchema) - .setAll(properties) - .metricsConfig(metricsConfig) - .withSpec(spec) - .withPartition(partition) - .withKeyMetadata(keyMetadata) - .withSortOrder(dataSortOrder) - .overwrite() - .build(); + switch (dataFileFormat) { + case AVRO: + Avro.DataWriteBuilder avroBuilder = + Avro.writeData(file) + .schema(dataSchema) + .setAll(properties) + .metricsConfig(metricsConfig) + .withSpec(spec) + .withPartition(partition) + .withKeyMetadata(keyMetadata) + .withSortOrder(dataSortOrder) + .overwrite(); + + configureDataWrite(avroBuilder); + + return avroBuilder.build(); + + case PARQUET: + Parquet.DataWriteBuilder parquetBuilder = + Parquet.writeData(file) + .schema(dataSchema) + .setAll(properties) + .metricsConfig(metricsConfig) + .withSpec(spec) + .withPartition(partition) + .withKeyMetadata(keyMetadata) + .withSortOrder(dataSortOrder) + .overwrite(); + + configureDataWrite(parquetBuilder); + + return parquetBuilder.build(); + + case ORC: + ORC.DataWriteBuilder orcBuilder = + ORC.writeData(file) + .schema(dataSchema) + .setAll(properties) + .metricsConfig(metricsConfig) + .withSpec(spec) + .withPartition(partition) + .withKeyMetadata(keyMetadata) + .withSortOrder(dataSortOrder) + .overwrite(); + + configureDataWrite(orcBuilder); + + return orcBuilder.build(); + + default: + throw new UnsupportedOperationException( + "Unsupported data file format: " + dataFileFormat); + } } catch (IOException e) { throw new UncheckedIOException(e); } @@ -131,18 +168,62 @@ public EqualityDeleteWriter newEqualityDeleteWriter( MetricsConfig metricsConfig = MetricsConfig.forTable(table); try { - return DataFileWriterServiceRegistry.equalityDeleteWriterBuilder( - deleteFileFormat, inputType, file, equalityDeleteRowSchemaType()) - .setAll(properties) - .metricsConfig(metricsConfig) - .schema(equalityDeleteRowSchema) - .equalityFieldIds(equalityFieldIds) - .withSpec(spec) - .withPartition(partition) - .withKeyMetadata(keyMetadata) - .withSortOrder(equalityDeleteSortOrder) - .overwrite() - .buildEqualityWriter(); + switch (deleteFileFormat) { + case AVRO: + Avro.DeleteWriteBuilder avroBuilder = + Avro.writeDeletes(file) + .setAll(properties) + .metricsConfig(metricsConfig) + .rowSchema(equalityDeleteRowSchema) + .equalityFieldIds(equalityFieldIds) + .withSpec(spec) + .withPartition(partition) + .withKeyMetadata(keyMetadata) + .withSortOrder(equalityDeleteSortOrder) + .overwrite(); + + configureEqualityDelete(avroBuilder); + + return avroBuilder.buildEqualityWriter(); + + case PARQUET: + Parquet.DeleteWriteBuilder parquetBuilder = + Parquet.writeDeletes(file) + .setAll(properties) + .metricsConfig(metricsConfig) + .rowSchema(equalityDeleteRowSchema) + .equalityFieldIds(equalityFieldIds) + .withSpec(spec) + .withPartition(partition) + .withKeyMetadata(keyMetadata) + .withSortOrder(equalityDeleteSortOrder) + .overwrite(); + + configureEqualityDelete(parquetBuilder); + + return parquetBuilder.buildEqualityWriter(); + + case ORC: + ORC.DeleteWriteBuilder orcBuilder = + ORC.writeDeletes(file) + .setAll(properties) + .metricsConfig(metricsConfig) + .rowSchema(equalityDeleteRowSchema) + .equalityFieldIds(equalityFieldIds) + .withSpec(spec) + .withPartition(partition) + .withKeyMetadata(keyMetadata) + .withSortOrder(equalityDeleteSortOrder) + .overwrite(); + + configureEqualityDelete(orcBuilder); + + return orcBuilder.buildEqualityWriter(); + + default: + throw new UnsupportedOperationException( + "Unsupported format for equality deletes: " + deleteFileFormat); + } } catch (IOException e) { throw new UncheckedIOException("Failed to create new equality delete writer", e); } @@ -156,16 +237,57 @@ public PositionDeleteWriter newPositionDeleteWriter( MetricsConfig metricsConfig = MetricsConfig.forPositionDelete(table); try { - return DataFileWriterServiceRegistry.positionDeleteWriterBuilder( - deleteFileFormat, inputType, file, positionDeleteRowSchemaType()) - .setAll(properties) - .metricsConfig(metricsConfig) - .schema(positionDeleteRowSchema) - .withSpec(spec) - .withPartition(partition) - .withKeyMetadata(keyMetadata) - .overwrite() - .buildPositionWriter(); + switch (deleteFileFormat) { + case AVRO: + Avro.DeleteWriteBuilder avroBuilder = + Avro.writeDeletes(file) + .setAll(properties) + .metricsConfig(metricsConfig) + .rowSchema(positionDeleteRowSchema) + .withSpec(spec) + .withPartition(partition) + .withKeyMetadata(keyMetadata) + .overwrite(); + + configurePositionDelete(avroBuilder); + + return avroBuilder.buildPositionWriter(); + + case PARQUET: + Parquet.DeleteWriteBuilder parquetBuilder = + Parquet.writeDeletes(file) + .setAll(properties) + .metricsConfig(metricsConfig) + .rowSchema(positionDeleteRowSchema) + .withSpec(spec) + .withPartition(partition) + .withKeyMetadata(keyMetadata) + .overwrite(); + + configurePositionDelete(parquetBuilder); + + return parquetBuilder.buildPositionWriter(); + + case ORC: + ORC.DeleteWriteBuilder orcBuilder = + ORC.writeDeletes(file) + .setAll(properties) + .metricsConfig(metricsConfig) + .rowSchema(positionDeleteRowSchema) + .withSpec(spec) + .withPartition(partition) + .withKeyMetadata(keyMetadata) + .overwrite(); + + configurePositionDelete(orcBuilder); + + return orcBuilder.buildPositionWriter(); + + default: + throw new UnsupportedOperationException( + "Unsupported format for position deletes: " + deleteFileFormat); + } + } catch (IOException e) { throw new UncheckedIOException("Failed to create new position delete writer", e); } diff --git a/data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java b/data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java index f8a563c25d89..91f059634b13 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java @@ -34,8 +34,9 @@ import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -class GenericFileWriterFactory extends BaseFileWriterFactory { +class GenericFileWriterFactory extends RegistryBasedFileWriterFactory { GenericFileWriterFactory( Table table, @@ -57,6 +58,10 @@ class GenericFileWriterFactory extends BaseFileWriterFactory { equalityFieldIds, equalityDeleteRowSchema, equalityDeleteSortOrder, + positionDeleteRowSchema, + ImmutableMap.of(), + dataSchema, + equalityDeleteRowSchema, positionDeleteRowSchema); } @@ -109,21 +114,6 @@ protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) { builder.createWriterFunc(GenericOrcWriter::buildWriter); } - @Override - protected Schema rowSchemaType() { - return dataSchema(); - } - - @Override - protected Schema equalityDeleteRowSchemaType() { - return equalityDeleteRowSchema(); - } - - @Override - protected Schema positionDeleteRowSchemaType() { - return positionDeleteRowSchema(); - } - static class Builder { private final Table table; private FileFormat dataFileFormat; diff --git a/data/src/main/java/org/apache/iceberg/data/RegistryBasedFileWriterFactory.java b/data/src/main/java/org/apache/iceberg/data/RegistryBasedFileWriterFactory.java new file mode 100644 index 000000000000..19251b60f0e7 --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/RegistryBasedFileWriterFactory.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Map; +import org.apache.iceberg.DataFileWriterServiceRegistry; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.FileWriterFactory; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; + +/** A base writer factory to be extended by query engine integrations. */ +public abstract class RegistryBasedFileWriterFactory implements FileWriterFactory { + private final Table table; + private final FileFormat dataFileFormat; + private final Class inputType; + private final Schema dataSchema; + private final SortOrder dataSortOrder; + private final FileFormat deleteFileFormat; + private final int[] equalityFieldIds; + private final Schema equalityDeleteRowSchema; + private final SortOrder equalityDeleteSortOrder; + private final Schema positionDeleteRowSchema; + private final Map writeProperties; + private final S rowSchemaType; + private final S equalityDeleteSchemaType; + private final S positionalDeleteSchemaType; + + protected RegistryBasedFileWriterFactory( + Table table, + FileFormat dataFileFormat, + Class inputType, + Schema dataSchema, + SortOrder dataSortOrder, + FileFormat deleteFileFormat, + int[] equalityFieldIds, + Schema equalityDeleteRowSchema, + SortOrder equalityDeleteSortOrder, + Schema positionDeleteRowSchema, + Map writeProperties, + S rowSchemaType, + S equalityDeleteSchemaType, + S positionalDeleteSchemaType) { + this.table = table; + this.dataFileFormat = dataFileFormat; + this.inputType = inputType; + this.dataSchema = dataSchema; + this.dataSortOrder = dataSortOrder; + this.deleteFileFormat = deleteFileFormat; + this.equalityFieldIds = equalityFieldIds; + this.equalityDeleteRowSchema = equalityDeleteRowSchema; + this.equalityDeleteSortOrder = equalityDeleteSortOrder; + this.positionDeleteRowSchema = positionDeleteRowSchema; + this.writeProperties = writeProperties != null ? writeProperties : ImmutableMap.of(); + this.rowSchemaType = rowSchemaType; + this.equalityDeleteSchemaType = equalityDeleteSchemaType; + this.positionalDeleteSchemaType = positionalDeleteSchemaType; + } + + @Deprecated + protected abstract void configureDataWrite(Avro.DataWriteBuilder builder); + + @Deprecated + protected abstract void configureEqualityDelete(Avro.DeleteWriteBuilder builder); + + @Deprecated + protected abstract void configurePositionDelete(Avro.DeleteWriteBuilder builder); + + @Deprecated + protected abstract void configureDataWrite(Parquet.DataWriteBuilder builder); + + @Deprecated + protected abstract void configureEqualityDelete(Parquet.DeleteWriteBuilder builder); + + @Deprecated + protected abstract void configurePositionDelete(Parquet.DeleteWriteBuilder builder); + + @Deprecated + protected abstract void configureDataWrite(ORC.DataWriteBuilder builder); + + @Deprecated + protected abstract void configureEqualityDelete(ORC.DeleteWriteBuilder builder); + + @Deprecated + protected abstract void configurePositionDelete(ORC.DeleteWriteBuilder builder); + + protected S rowSchemaType() { + return rowSchemaType; + } + + protected S equalityDeleteRowSchemaType() { + return equalityDeleteSchemaType; + } + + protected S positionDeleteRowSchemaType() { + return positionalDeleteSchemaType; + } + + @Override + public DataWriter newDataWriter( + EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { + EncryptionKeyMetadata keyMetadata = file.keyMetadata(); + Map properties = table.properties(); + MetricsConfig metricsConfig = MetricsConfig.forTable(table); + + try { + return DataFileWriterServiceRegistry.dataWriterBuilder( + dataFileFormat, inputType, file, rowSchemaType()) + .schema(dataSchema) + .setAll(properties) + .setAll(writeProperties) + .metricsConfig(metricsConfig) + .withSpec(spec) + .withPartition(partition) + .withKeyMetadata(keyMetadata) + .withSortOrder(dataSortOrder) + .overwrite() + .build(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public EqualityDeleteWriter newEqualityDeleteWriter( + EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { + EncryptionKeyMetadata keyMetadata = file.keyMetadata(); + Map properties = table.properties(); + MetricsConfig metricsConfig = MetricsConfig.forTable(table); + + try { + return DataFileWriterServiceRegistry.equalityDeleteWriterBuilder( + deleteFileFormat, inputType, file, equalityDeleteRowSchemaType()) + .setAll(properties) + .setAll(writeProperties) + .metricsConfig(metricsConfig) + .schema(equalityDeleteRowSchema) + .equalityFieldIds(equalityFieldIds) + .withSpec(spec) + .withPartition(partition) + .withKeyMetadata(keyMetadata) + .withSortOrder(equalityDeleteSortOrder) + .overwrite() + .buildEqualityWriter(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to create new equality delete writer", e); + } + } + + @Override + public PositionDeleteWriter newPositionDeleteWriter( + EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { + EncryptionKeyMetadata keyMetadata = file.keyMetadata(); + Map properties = table.properties(); + MetricsConfig metricsConfig = MetricsConfig.forPositionDelete(table); + + try { + return DataFileWriterServiceRegistry.positionDeleteWriterBuilder( + deleteFileFormat, inputType, file, positionDeleteRowSchemaType()) + .setAll(properties) + .setAll(writeProperties) + .metricsConfig(metricsConfig) + .schema(positionDeleteRowSchema) + .withSpec(spec) + .withPartition(partition) + .withKeyMetadata(keyMetadata) + .overwrite() + .buildPositionWriter(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to create new position delete writer", e); + } + } + + protected Schema dataSchema() { + return dataSchema; + } + + protected Schema equalityDeleteRowSchema() { + return equalityDeleteRowSchema; + } + + protected Schema positionDeleteRowSchema() { + return positionDeleteRowSchema; + } + + protected Map writeProperties() { + return writeProperties; + } +} diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index d753ec50e9b8..077b93b4c687 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -209,7 +209,7 @@ public FileFormatEqualityDeleteWriterBuilder equalityDeleteWriterBuilder( @Override public FileFormatPositionDeleteWriterBuilder positionDeleteWriterBuilder( EncryptedOutputFile outputFile, RowType rowType) { - int rowFieldIndex = rowType.getFieldIndex(DELETE_FILE_ROW_FIELD_NAME); + int rowFieldIndex = rowType != null ? rowType.getFieldIndex(DELETE_FILE_ROW_FIELD_NAME) : -1; return Avro.writeDeletes(outputFile.encryptingOutputFile()) .createWriterFunc( ignore -> diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java index 577bf13da178..2d07d9b0f5a8 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java @@ -33,7 +33,7 @@ import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.data.BaseFileWriterFactory; +import org.apache.iceberg.data.RegistryBasedFileWriterFactory; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.data.FlinkAvroWriter; import org.apache.iceberg.flink.data.FlinkOrcWriter; @@ -42,12 +42,10 @@ import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -class FlinkFileWriterFactory extends BaseFileWriterFactory implements Serializable { - private RowType dataFlinkType; - private RowType equalityDeleteFlinkType; - private RowType positionDeleteFlinkType; - +class FlinkFileWriterFactory extends RegistryBasedFileWriterFactory + implements Serializable { FlinkFileWriterFactory( Table table, FileFormat dataFileFormat, @@ -72,11 +70,19 @@ class FlinkFileWriterFactory extends BaseFileWriterFactory implements S equalityFieldIds, equalityDeleteRowSchema, equalityDeleteSortOrder, - positionDeleteRowSchema); - - this.dataFlinkType = dataFlinkType; - this.equalityDeleteFlinkType = equalityDeleteFlinkType; - this.positionDeleteFlinkType = positionDeleteFlinkType; + positionDeleteRowSchema, + ImmutableMap.of(), + dataFlinkType == null ? FlinkSchemaUtil.convert(dataSchema) : dataFlinkType, + equalityDeleteFlinkType == null + ? equalityDeleteRowSchema == null + ? null + : FlinkSchemaUtil.convert(equalityDeleteRowSchema) + : equalityDeleteFlinkType, + positionDeleteFlinkType == null + ? positionDeleteRowSchema == null + ? null + : FlinkSchemaUtil.convert(DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema)) + : positionDeleteFlinkType); } static Builder builderFor(Table table) { @@ -85,107 +91,62 @@ static Builder builderFor(Table table) { @Override protected void configureDataWrite(Avro.DataWriteBuilder builder) { - builder.createWriterFunc(ignore -> new FlinkAvroWriter(dataFlinkType())); + builder.createWriterFunc(ignore -> new FlinkAvroWriter(rowSchemaType())); } @Override protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) { - builder.createWriterFunc(ignored -> new FlinkAvroWriter(equalityDeleteFlinkType())); + builder.createWriterFunc(ignored -> new FlinkAvroWriter(equalityDeleteRowSchemaType())); } @Override protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) { - int rowFieldIndex = positionDeleteFlinkType().getFieldIndex(DELETE_FILE_ROW_FIELD_NAME); + int rowFieldIndex = positionDeleteRowSchemaType().getFieldIndex(DELETE_FILE_ROW_FIELD_NAME); if (rowFieldIndex >= 0) { // FlinkAvroWriter accepts just the Flink type of the row ignoring the path and pos RowType positionDeleteRowFlinkType = - (RowType) positionDeleteFlinkType().getTypeAt(rowFieldIndex); + (RowType) positionDeleteRowSchemaType().getTypeAt(rowFieldIndex); builder.createWriterFunc(ignored -> new FlinkAvroWriter(positionDeleteRowFlinkType)); } } @Override protected void configureDataWrite(Parquet.DataWriteBuilder builder) { - builder.createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(dataFlinkType(), msgType)); + builder.createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(rowSchemaType(), msgType)); } @Override protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) { builder.createWriterFunc( - msgType -> FlinkParquetWriters.buildWriter(equalityDeleteFlinkType(), msgType)); + msgType -> FlinkParquetWriters.buildWriter(equalityDeleteRowSchemaType(), msgType)); } @Override protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) { builder.createWriterFunc( - msgType -> FlinkParquetWriters.buildWriter(positionDeleteFlinkType(), msgType)); + msgType -> FlinkParquetWriters.buildWriter(positionDeleteRowSchemaType(), msgType)); builder.transformPaths(path -> StringData.fromString(path.toString())); } @Override protected void configureDataWrite(ORC.DataWriteBuilder builder) { builder.createWriterFunc( - (iSchema, typDesc) -> FlinkOrcWriter.buildWriter(dataFlinkType(), iSchema)); + (iSchema, typDesc) -> FlinkOrcWriter.buildWriter(rowSchemaType(), iSchema)); } @Override protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) { builder.createWriterFunc( - (iSchema, typDesc) -> FlinkOrcWriter.buildWriter(equalityDeleteFlinkType(), iSchema)); + (iSchema, typDesc) -> FlinkOrcWriter.buildWriter(equalityDeleteRowSchemaType(), iSchema)); } @Override protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) { builder.createWriterFunc( - (iSchema, typDesc) -> FlinkOrcWriter.buildWriter(positionDeleteFlinkType(), iSchema)); + (iSchema, typDesc) -> FlinkOrcWriter.buildWriter(positionDeleteRowSchemaType(), iSchema)); builder.transformPaths(path -> StringData.fromString(path.toString())); } - @Override - protected RowType rowSchemaType() { - return dataFlinkType(); - } - - @Override - protected RowType equalityDeleteRowSchemaType() { - return equalityDeleteFlinkType(); - } - - @Override - protected RowType positionDeleteRowSchemaType() { - return positionDeleteFlinkType(); - } - - private RowType dataFlinkType() { - if (dataFlinkType == null) { - Preconditions.checkNotNull(dataSchema(), "Data schema must not be null"); - this.dataFlinkType = FlinkSchemaUtil.convert(dataSchema()); - } - - return dataFlinkType; - } - - private RowType equalityDeleteFlinkType() { - if (equalityDeleteFlinkType == null) { - Preconditions.checkNotNull( - equalityDeleteRowSchema(), "Equality delete schema must not be null"); - this.equalityDeleteFlinkType = FlinkSchemaUtil.convert(equalityDeleteRowSchema()); - } - - return equalityDeleteFlinkType; - } - - private RowType positionDeleteFlinkType() { - if (positionDeleteFlinkType == null) { - // wrap the optional row schema into the position delete schema that contains path and - // position - Schema positionDeleteSchema = DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema()); - this.positionDeleteFlinkType = FlinkSchemaUtil.convert(positionDeleteSchema); - } - - return positionDeleteFlinkType; - } - static class Builder { private final Table table; private FileFormat dataFileFormat; diff --git a/gradle.properties b/gradle.properties index f6371942e83b..5bf92ef9a379 100644 --- a/gradle.properties +++ b/gradle.properties @@ -16,7 +16,7 @@ jmhOutputPath=build/reports/jmh/human-readable-output.txt jmhJsonOutputPath=build/reports/jmh/results.json jmhIncludeRegex=.* -systemProp.defaultFlinkVersions=1.20 +systemProp.defaultFlinkVersions=1.19,1.20 systemProp.knownFlinkVersions=1.18,1.19,1.20 systemProp.defaultSparkVersions=3.5 systemProp.knownSparkVersions=3.3,3.4,3.5 diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java index 620799cff5f1..2899b436e6ad 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java @@ -140,6 +140,12 @@ private WriteBuilder(OutputFile file) { } } + @Deprecated + public WriteBuilder metadata(String property, String value) { + meta(property, value); + return this; + } + public WriteBuilder createWriterFunc( BiFunction> writerFunction) { this.createWriterFunc = writerFunction; diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index c170deaf986a..8aa756a4bcbc 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -184,6 +184,12 @@ public WriteBuilder createWriterFunc( return this; } + @Override + public WriteBuilder overwrite(boolean enabled) { + this.writeMode = enabled ? ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE; + return this; + } + public WriteBuilder writerVersion(WriterVersion version) { this.writerVersion = version; return this; diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java index cb2617920c59..1e7e0e93f10c 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java @@ -24,17 +24,22 @@ import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT; import java.util.Map; +import org.apache.iceberg.DataFileWriterService; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.data.BaseFileWriterFactory; +import org.apache.iceberg.data.RegistryBasedFileWriterFactory; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.io.FileFormatAppenderBuilder; +import org.apache.iceberg.io.FileFormatDataWriterBuilder; +import org.apache.iceberg.io.FileFormatEqualityDeleteWriterBuilder; +import org.apache.iceberg.io.FileFormatPositionDeleteWriterBuilder; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.SparkAvroWriter; import org.apache.iceberg.spark.data.SparkOrcWriter; @@ -44,11 +49,7 @@ import org.apache.spark.sql.types.StructType; import org.apache.spark.unsafe.types.UTF8String; -class SparkFileWriterFactory extends BaseFileWriterFactory { - private StructType dataSparkType; - private StructType equalityDeleteSparkType; - private StructType positionDeleteSparkType; - private final Map writeProperties; +class SparkFileWriterFactory extends RegistryBasedFileWriterFactory { SparkFileWriterFactory( Table table, @@ -75,12 +76,21 @@ class SparkFileWriterFactory extends BaseFileWriterFactory { equalityFieldIds, equalityDeleteRowSchema, equalityDeleteSortOrder, - positionDeleteRowSchema); - - this.dataSparkType = dataSparkType; - this.equalityDeleteSparkType = equalityDeleteSparkType; - this.positionDeleteSparkType = positionDeleteSparkType; - this.writeProperties = writeProperties != null ? writeProperties : ImmutableMap.of(); + positionDeleteRowSchema, + writeProperties, + dataSparkType == null + ? dataSchema == null ? null : SparkSchemaUtil.convert(dataSchema) + : dataSparkType, + equalityDeleteSparkType == null + ? equalityDeleteRowSchema == null + ? null + : SparkSchemaUtil.convert(equalityDeleteRowSchema) + : equalityDeleteSparkType, + positionDeleteSparkType == null + ? positionDeleteRowSchema == null + ? null + : SparkSchemaUtil.convert(DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema)) + : positionDeleteSparkType); } static Builder builderFor(Table table) { @@ -89,97 +99,68 @@ static Builder builderFor(Table table) { @Override protected void configureDataWrite(Avro.DataWriteBuilder builder) { - builder.createWriterFunc(ignored -> new SparkAvroWriter(dataSparkType())); - builder.setAll(writeProperties); + builder.createWriterFunc(ignored -> new SparkAvroWriter(rowSchemaType())); + builder.setAll(writeProperties()); } @Override protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) { - builder.createWriterFunc(ignored -> new SparkAvroWriter(equalityDeleteSparkType())); - builder.setAll(writeProperties); + builder.createWriterFunc(ignored -> new SparkAvroWriter(equalityDeleteRowSchemaType())); + builder.setAll(writeProperties()); } @Override protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) { boolean withRow = - positionDeleteSparkType().getFieldIndex(DELETE_FILE_ROW_FIELD_NAME).isDefined(); + positionDeleteRowSchemaType().getFieldIndex(DELETE_FILE_ROW_FIELD_NAME).isDefined(); if (withRow) { // SparkAvroWriter accepts just the Spark type of the row ignoring the path and pos - StructField rowField = positionDeleteSparkType().apply(DELETE_FILE_ROW_FIELD_NAME); + StructField rowField = positionDeleteRowSchemaType().apply(DELETE_FILE_ROW_FIELD_NAME); StructType positionDeleteRowSparkType = (StructType) rowField.dataType(); builder.createWriterFunc(ignored -> new SparkAvroWriter(positionDeleteRowSparkType)); } - builder.setAll(writeProperties); + builder.setAll(writeProperties()); } @Override protected void configureDataWrite(Parquet.DataWriteBuilder builder) { - builder.createWriterFunc(msgType -> SparkParquetWriters.buildWriter(dataSparkType(), msgType)); - builder.setAll(writeProperties); + builder.createWriterFunc(msgType -> SparkParquetWriters.buildWriter(rowSchemaType(), msgType)); + builder.setAll(writeProperties()); } @Override protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) { builder.createWriterFunc( - msgType -> SparkParquetWriters.buildWriter(equalityDeleteSparkType(), msgType)); - builder.setAll(writeProperties); + msgType -> SparkParquetWriters.buildWriter(equalityDeleteRowSchemaType(), msgType)); + builder.setAll(writeProperties()); } @Override protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) { builder.createWriterFunc( - msgType -> SparkParquetWriters.buildWriter(positionDeleteSparkType(), msgType)); + msgType -> SparkParquetWriters.buildWriter(positionDeleteRowSchemaType(), msgType)); builder.transformPaths(path -> UTF8String.fromString(path.toString())); - builder.setAll(writeProperties); + builder.setAll(writeProperties()); } @Override protected void configureDataWrite(ORC.DataWriteBuilder builder) { builder.createWriterFunc(SparkOrcWriter::new); - builder.setAll(writeProperties); + builder.setAll(writeProperties()); } @Override protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) { builder.createWriterFunc(SparkOrcWriter::new); - builder.setAll(writeProperties); + builder.setAll(writeProperties()); } @Override protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) { builder.createWriterFunc(SparkOrcWriter::new); builder.transformPaths(path -> UTF8String.fromString(path.toString())); - builder.setAll(writeProperties); - } - - private StructType dataSparkType() { - if (dataSparkType == null) { - Preconditions.checkNotNull(dataSchema(), "Data schema must not be null"); - this.dataSparkType = SparkSchemaUtil.convert(dataSchema()); - } - - return dataSparkType; - } - - private StructType equalityDeleteSparkType() { - if (equalityDeleteSparkType == null) { - Preconditions.checkNotNull( - equalityDeleteRowSchema(), "Equality delete schema must not be null"); - this.equalityDeleteSparkType = SparkSchemaUtil.convert(equalityDeleteRowSchema()); - } - - return equalityDeleteSparkType; - } - - private StructType positionDeleteSparkType() { - if (positionDeleteSparkType == null) { - // wrap the optional row schema into the position delete schema containing path and position - Schema positionDeleteSchema = DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema()); - this.positionDeleteSparkType = SparkSchemaUtil.convert(positionDeleteSchema); - } - - return positionDeleteSparkType; + builder.setAll(writeProperties()); } static class Builder { @@ -294,4 +275,136 @@ SparkFileWriterFactory build() { writeProperties); } } + + public static class AvroWriterService implements DataFileWriterService { + @Override + public FileFormat format() { + return FileFormat.AVRO; + } + + @Override + public Class returnType() { + return InternalRow.class; + } + + @Override + public FileFormatAppenderBuilder appenderBuilder( + EncryptedOutputFile outputFile, StructType rowType) { + return Avro.write(outputFile).createWriterFunc(ignore -> new SparkAvroWriter(rowType)); + } + + @Override + public FileFormatDataWriterBuilder dataWriterBuilder( + EncryptedOutputFile outputFile, StructType rowType) { + return Avro.writeData(outputFile.encryptingOutputFile()) + .createWriterFunc(ignore -> new SparkAvroWriter(rowType)); + } + + @Override + public FileFormatEqualityDeleteWriterBuilder equalityDeleteWriterBuilder( + EncryptedOutputFile outputFile, StructType rowType) { + return Avro.writeDeletes(outputFile.encryptingOutputFile()) + .createWriterFunc(ignore -> new SparkAvroWriter(rowType)); + } + + @Override + public FileFormatPositionDeleteWriterBuilder positionDeleteWriterBuilder( + EncryptedOutputFile outputFile, StructType rowType) { + Avro.DeleteWriteBuilder builder = Avro.writeDeletes(outputFile.encryptingOutputFile()); + boolean withRow = + rowType != null && rowType.getFieldIndex(DELETE_FILE_ROW_FIELD_NAME).isDefined(); + if (withRow) { + // SparkAvroWriter accepts just the Spark type of the row ignoring the path and pos + StructField rowField = rowType.apply(DELETE_FILE_ROW_FIELD_NAME); + StructType positionDeleteRowSparkType = (StructType) rowField.dataType(); + builder.createWriterFunc(ignored -> new SparkAvroWriter(positionDeleteRowSparkType)); + } else { + builder.createWriterFunc(ignore -> new SparkAvroWriter(rowType)); + } + + return builder; + } + } + + public static class ORCWriterService implements DataFileWriterService { + @Override + public FileFormat format() { + return FileFormat.ORC; + } + + @Override + public Class returnType() { + return InternalRow.class; + } + + @Override + public FileFormatAppenderBuilder appenderBuilder( + EncryptedOutputFile outputFile, StructType rowType) { + return ORC.write(outputFile).createWriterFunc(SparkOrcWriter::new); + } + + @Override + public FileFormatDataWriterBuilder dataWriterBuilder( + EncryptedOutputFile outputFile, StructType rowType) { + return ORC.writeData(outputFile.encryptingOutputFile()).createWriterFunc(SparkOrcWriter::new); + } + + @Override + public FileFormatEqualityDeleteWriterBuilder equalityDeleteWriterBuilder( + EncryptedOutputFile outputFile, StructType rowType) { + return ORC.writeDeletes(outputFile.encryptingOutputFile()) + .transformPaths(path -> UTF8String.fromString(path.toString())) + .createWriterFunc(SparkOrcWriter::new); + } + + @Override + public FileFormatPositionDeleteWriterBuilder positionDeleteWriterBuilder( + EncryptedOutputFile outputFile, StructType rowType) { + return ORC.writeDeletes(outputFile.encryptingOutputFile()) + .transformPaths(path -> UTF8String.fromString(path.toString())) + .createWriterFunc(SparkOrcWriter::new); + } + } + + public static class ParquetWriterService implements DataFileWriterService { + @Override + public FileFormat format() { + return FileFormat.PARQUET; + } + + @Override + public Class returnType() { + return InternalRow.class; + } + + @Override + public FileFormatAppenderBuilder appenderBuilder( + EncryptedOutputFile outputFile, StructType rowType) { + return Parquet.write(outputFile) + .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(rowType, msgType)); + } + + @Override + public FileFormatDataWriterBuilder dataWriterBuilder( + EncryptedOutputFile outputFile, StructType rowType) { + return Parquet.writeData(outputFile.encryptingOutputFile()) + .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(rowType, msgType)); + } + + @Override + public FileFormatEqualityDeleteWriterBuilder equalityDeleteWriterBuilder( + EncryptedOutputFile outputFile, StructType rowType) { + return Parquet.writeDeletes(outputFile.encryptingOutputFile()) + .transformPaths(path -> UTF8String.fromString(path.toString())) + .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(rowType, msgType)); + } + + @Override + public FileFormatPositionDeleteWriterBuilder positionDeleteWriterBuilder( + EncryptedOutputFile outputFile, StructType rowType) { + return Parquet.writeDeletes(outputFile.encryptingOutputFile()) + .transformPaths(path -> UTF8String.fromString(path.toString())) + .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(rowType, msgType)); + } + } } diff --git a/spark/v3.5/spark/src/main/resources/META-INF/services/org.apache.iceberg.DataFileWriterService b/spark/v3.5/spark/src/main/resources/META-INF/services/org.apache.iceberg.DataFileWriterService new file mode 100644 index 000000000000..94a11d5eb12a --- /dev/null +++ b/spark/v3.5/spark/src/main/resources/META-INF/services/org.apache.iceberg.DataFileWriterService @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.iceberg.spark.source.SparkFileWriterFactory$ParquetWriterService +org.apache.iceberg.spark.source.SparkFileWriterFactory$AvroWriterService +org.apache.iceberg.spark.source.SparkFileWriterFactory$ORCWriterService \ No newline at end of file diff --git a/spark/v3.5/spark/src/test/resources/META-INF/services/org.apache.iceberg.DataFileWriterService b/spark/v3.5/spark/src/test/resources/META-INF/services/org.apache.iceberg.DataFileWriterService new file mode 100644 index 000000000000..94a11d5eb12a --- /dev/null +++ b/spark/v3.5/spark/src/test/resources/META-INF/services/org.apache.iceberg.DataFileWriterService @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.iceberg.spark.source.SparkFileWriterFactory$ParquetWriterService +org.apache.iceberg.spark.source.SparkFileWriterFactory$AvroWriterService +org.apache.iceberg.spark.source.SparkFileWriterFactory$ORCWriterService \ No newline at end of file