Skip to content

Commit

Permalink
Spark writers
Browse files Browse the repository at this point in the history
  • Loading branch information
pvary committed Feb 4, 2025
1 parent e17765f commit af9c30a
Show file tree
Hide file tree
Showing 12 changed files with 665 additions and 186 deletions.
25 changes: 25 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,31 @@ acceptedBreaks:
old: "class org.apache.iceberg.data.InternalRecordWrapper"
justification: "Moved from iceberg-data to iceberg-core, so this should not\
\ cause issues"
org.apache.iceberg:iceberg-orc:
- code: "java.method.exception.checkedAdded"
old: "method <T> org.apache.iceberg.deletes.EqualityDeleteWriter<T> org.apache.iceberg.orc.ORC.DeleteWriteBuilder::buildEqualityWriter()"
new: "method <T> org.apache.iceberg.deletes.EqualityDeleteWriter<T> org.apache.iceberg.orc.ORC.DeleteWriteBuilder::buildEqualityWriter()\
\ throws java.io.IOException"
justification: "Binary compatible and equivalent"
- code: "java.method.exception.checkedAdded"
old: "method <T> org.apache.iceberg.deletes.PositionDeleteWriter<T> org.apache.iceberg.orc.ORC.DeleteWriteBuilder::buildPositionWriter()"
new: "method <T> org.apache.iceberg.deletes.PositionDeleteWriter<T> org.apache.iceberg.orc.ORC.DeleteWriteBuilder::buildPositionWriter()\
\ throws java.io.IOException"
justification: "Binary compatible and equivalent"
- code: "java.method.exception.checkedAdded"
old: "method <T> org.apache.iceberg.io.DataWriter<T> org.apache.iceberg.orc.ORC.DataWriteBuilder::build()"
new: "method <T> org.apache.iceberg.io.DataWriter<T> org.apache.iceberg.io.FileFormatDataWriterBuilderBase<T\
\ extends org.apache.iceberg.io.FileFormatDataWriterBuilderBase<T extends\
\ org.apache.iceberg.io.FileFormatDataWriterBuilderBase<T>>>::build() throws\
\ java.io.IOException @ org.apache.iceberg.orc.ORC.DataWriteBuilder"
justification: "Binary compatible and equivalent"
- code: "java.method.movedToSuperClass"
old: "method <T> org.apache.iceberg.io.DataWriter<T> org.apache.iceberg.orc.ORC.DataWriteBuilder::build()"
new: "method <T> org.apache.iceberg.io.DataWriter<T> org.apache.iceberg.io.FileFormatDataWriterBuilderBase<T\
\ extends org.apache.iceberg.io.FileFormatDataWriterBuilderBase<T extends\
\ org.apache.iceberg.io.FileFormatDataWriterBuilderBase<T>>>::build() throws\
\ java.io.IOException @ org.apache.iceberg.orc.ORC.DataWriteBuilder"
justification: "Binary compatible and equivalent"
apache-iceberg-0.14.0:
org.apache.iceberg:iceberg-api:
- code: "java.class.defaultSerializationChanged"
Expand Down
210 changes: 166 additions & 44 deletions data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Map;
import org.apache.iceberg.DataFileWriterServiceRegistry;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionSpec;
Expand All @@ -39,11 +38,15 @@
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;

/** A base writer factory to be extended by query engine integrations. */
/**
* A base writer factory to be extended by query engine integrations.
*
* @deprecated use {@link RegistryBasedFileWriterFactory}
*/
@Deprecated
public abstract class BaseFileWriterFactory<T> implements FileWriterFactory<T> {
private final Table table;
private final FileFormat dataFileFormat;
private final Class<?> inputType;
private final Schema dataSchema;
private final SortOrder dataSortOrder;
private final FileFormat deleteFileFormat;
Expand All @@ -55,7 +58,6 @@ public abstract class BaseFileWriterFactory<T> implements FileWriterFactory<T> {
protected BaseFileWriterFactory(
Table table,
FileFormat dataFileFormat,
Class<?> inputType,
Schema dataSchema,
SortOrder dataSortOrder,
FileFormat deleteFileFormat,
Expand All @@ -65,7 +67,6 @@ protected BaseFileWriterFactory(
Schema positionDeleteRowSchema) {
this.table = table;
this.dataFileFormat = dataFileFormat;
this.inputType = inputType;
this.dataSchema = dataSchema;
this.dataSortOrder = dataSortOrder;
this.deleteFileFormat = deleteFileFormat;
Expand Down Expand Up @@ -93,12 +94,6 @@ protected BaseFileWriterFactory(

protected abstract void configurePositionDelete(ORC.DeleteWriteBuilder builder);

protected abstract <S> S rowSchemaType();

protected abstract <S> S equalityDeleteRowSchemaType();

protected abstract <S> S positionDeleteRowSchemaType();

@Override
public DataWriter<T> newDataWriter(
EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
Expand All @@ -107,17 +102,59 @@ public DataWriter<T> newDataWriter(
MetricsConfig metricsConfig = MetricsConfig.forTable(table);

try {
return DataFileWriterServiceRegistry.dataWriterBuilder(
dataFileFormat, inputType, file, rowSchemaType())
.schema(dataSchema)
.setAll(properties)
.metricsConfig(metricsConfig)
.withSpec(spec)
.withPartition(partition)
.withKeyMetadata(keyMetadata)
.withSortOrder(dataSortOrder)
.overwrite()
.build();
switch (dataFileFormat) {
case AVRO:
Avro.DataWriteBuilder avroBuilder =
Avro.writeData(file)
.schema(dataSchema)
.setAll(properties)
.metricsConfig(metricsConfig)
.withSpec(spec)
.withPartition(partition)
.withKeyMetadata(keyMetadata)
.withSortOrder(dataSortOrder)
.overwrite();

configureDataWrite(avroBuilder);

return avroBuilder.build();

case PARQUET:
Parquet.DataWriteBuilder parquetBuilder =
Parquet.writeData(file)
.schema(dataSchema)
.setAll(properties)
.metricsConfig(metricsConfig)
.withSpec(spec)
.withPartition(partition)
.withKeyMetadata(keyMetadata)
.withSortOrder(dataSortOrder)
.overwrite();

configureDataWrite(parquetBuilder);

return parquetBuilder.build();

case ORC:
ORC.DataWriteBuilder orcBuilder =
ORC.writeData(file)
.schema(dataSchema)
.setAll(properties)
.metricsConfig(metricsConfig)
.withSpec(spec)
.withPartition(partition)
.withKeyMetadata(keyMetadata)
.withSortOrder(dataSortOrder)
.overwrite();

configureDataWrite(orcBuilder);

return orcBuilder.build();

default:
throw new UnsupportedOperationException(
"Unsupported data file format: " + dataFileFormat);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand All @@ -131,18 +168,62 @@ public EqualityDeleteWriter<T> newEqualityDeleteWriter(
MetricsConfig metricsConfig = MetricsConfig.forTable(table);

try {
return DataFileWriterServiceRegistry.equalityDeleteWriterBuilder(
deleteFileFormat, inputType, file, equalityDeleteRowSchemaType())
.setAll(properties)
.metricsConfig(metricsConfig)
.schema(equalityDeleteRowSchema)
.equalityFieldIds(equalityFieldIds)
.withSpec(spec)
.withPartition(partition)
.withKeyMetadata(keyMetadata)
.withSortOrder(equalityDeleteSortOrder)
.overwrite()
.buildEqualityWriter();
switch (deleteFileFormat) {
case AVRO:
Avro.DeleteWriteBuilder avroBuilder =
Avro.writeDeletes(file)
.setAll(properties)
.metricsConfig(metricsConfig)
.rowSchema(equalityDeleteRowSchema)
.equalityFieldIds(equalityFieldIds)
.withSpec(spec)
.withPartition(partition)
.withKeyMetadata(keyMetadata)
.withSortOrder(equalityDeleteSortOrder)
.overwrite();

configureEqualityDelete(avroBuilder);

return avroBuilder.buildEqualityWriter();

case PARQUET:
Parquet.DeleteWriteBuilder parquetBuilder =
Parquet.writeDeletes(file)
.setAll(properties)
.metricsConfig(metricsConfig)
.rowSchema(equalityDeleteRowSchema)
.equalityFieldIds(equalityFieldIds)
.withSpec(spec)
.withPartition(partition)
.withKeyMetadata(keyMetadata)
.withSortOrder(equalityDeleteSortOrder)
.overwrite();

configureEqualityDelete(parquetBuilder);

return parquetBuilder.buildEqualityWriter();

case ORC:
ORC.DeleteWriteBuilder orcBuilder =
ORC.writeDeletes(file)
.setAll(properties)
.metricsConfig(metricsConfig)
.rowSchema(equalityDeleteRowSchema)
.equalityFieldIds(equalityFieldIds)
.withSpec(spec)
.withPartition(partition)
.withKeyMetadata(keyMetadata)
.withSortOrder(equalityDeleteSortOrder)
.overwrite();

configureEqualityDelete(orcBuilder);

return orcBuilder.buildEqualityWriter();

default:
throw new UnsupportedOperationException(
"Unsupported format for equality deletes: " + deleteFileFormat);
}
} catch (IOException e) {
throw new UncheckedIOException("Failed to create new equality delete writer", e);
}
Expand All @@ -156,16 +237,57 @@ public PositionDeleteWriter<T> newPositionDeleteWriter(
MetricsConfig metricsConfig = MetricsConfig.forPositionDelete(table);

try {
return DataFileWriterServiceRegistry.positionDeleteWriterBuilder(
deleteFileFormat, inputType, file, positionDeleteRowSchemaType())
.setAll(properties)
.metricsConfig(metricsConfig)
.schema(positionDeleteRowSchema)
.withSpec(spec)
.withPartition(partition)
.withKeyMetadata(keyMetadata)
.overwrite()
.buildPositionWriter();
switch (deleteFileFormat) {
case AVRO:
Avro.DeleteWriteBuilder avroBuilder =
Avro.writeDeletes(file)
.setAll(properties)
.metricsConfig(metricsConfig)
.rowSchema(positionDeleteRowSchema)
.withSpec(spec)
.withPartition(partition)
.withKeyMetadata(keyMetadata)
.overwrite();

configurePositionDelete(avroBuilder);

return avroBuilder.buildPositionWriter();

case PARQUET:
Parquet.DeleteWriteBuilder parquetBuilder =
Parquet.writeDeletes(file)
.setAll(properties)
.metricsConfig(metricsConfig)
.rowSchema(positionDeleteRowSchema)
.withSpec(spec)
.withPartition(partition)
.withKeyMetadata(keyMetadata)
.overwrite();

configurePositionDelete(parquetBuilder);

return parquetBuilder.buildPositionWriter();

case ORC:
ORC.DeleteWriteBuilder orcBuilder =
ORC.writeDeletes(file)
.setAll(properties)
.metricsConfig(metricsConfig)
.rowSchema(positionDeleteRowSchema)
.withSpec(spec)
.withPartition(partition)
.withKeyMetadata(keyMetadata)
.overwrite();

configurePositionDelete(orcBuilder);

return orcBuilder.buildPositionWriter();

default:
throw new UnsupportedOperationException(
"Unsupported format for position deletes: " + deleteFileFormat);
}

} catch (IOException e) {
throw new UncheckedIOException("Failed to create new position delete writer", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;

class GenericFileWriterFactory extends BaseFileWriterFactory<Record> {
class GenericFileWriterFactory extends RegistryBasedFileWriterFactory<Record, Schema> {

GenericFileWriterFactory(
Table table,
Expand All @@ -57,6 +58,10 @@ class GenericFileWriterFactory extends BaseFileWriterFactory<Record> {
equalityFieldIds,
equalityDeleteRowSchema,
equalityDeleteSortOrder,
positionDeleteRowSchema,
ImmutableMap.of(),
dataSchema,
equalityDeleteRowSchema,
positionDeleteRowSchema);
}

Expand Down Expand Up @@ -109,21 +114,6 @@ protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) {
builder.createWriterFunc(GenericOrcWriter::buildWriter);
}

@Override
protected Schema rowSchemaType() {
return dataSchema();
}

@Override
protected Schema equalityDeleteRowSchemaType() {
return equalityDeleteRowSchema();
}

@Override
protected Schema positionDeleteRowSchemaType() {
return positionDeleteRowSchema();
}

static class Builder {
private final Table table;
private FileFormat dataFileFormat;
Expand Down
Loading

0 comments on commit af9c30a

Please sign in to comment.