Skip to content

Commit

Permalink
set metricsCallback via ParquetReadOptions
Browse files Browse the repository at this point in the history
  • Loading branch information
parthchandra committed Dec 28, 2023
1 parent 7f25ceb commit c6ee55c
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.parquet.crypto.FileDecryptionProperties;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
import org.apache.parquet.hadoop.ParquetMetricsCallback;

public class HadoopReadOptions extends ParquetReadOptions {
private final Configuration conf;
Expand All @@ -49,7 +50,8 @@ private HadoopReadOptions(
int maxAllocationSize,
Map<String, String> properties,
Configuration conf,
FileDecryptionProperties fileDecryptionProperties) {
FileDecryptionProperties fileDecryptionProperties,
ParquetMetricsCallback metricsCallback) {
super(
useSignedStringMinMax,
useStatsFilter,
Expand All @@ -66,6 +68,7 @@ private HadoopReadOptions(
maxAllocationSize,
properties,
fileDecryptionProperties,
metricsCallback,
new HadoopParquetConfiguration(conf));
this.conf = conf;
}
Expand Down Expand Up @@ -127,7 +130,8 @@ public ParquetReadOptions build() {
maxAllocationSize,
properties,
conf,
fileDecryptionProperties);
fileDecryptionProperties,
metricsCallback);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.parquet.crypto.FileDecryptionProperties;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetMetricsCallback;
import org.apache.parquet.hadoop.util.HadoopCodecs;

// Internal use only
Expand Down Expand Up @@ -75,6 +76,7 @@ public class ParquetReadOptions {
private final Map<String, String> properties;
private final FileDecryptionProperties fileDecryptionProperties;
private final ParquetConfiguration conf;
private final ParquetMetricsCallback metricsCallback;

ParquetReadOptions(
boolean useSignedStringMinMax,
Expand All @@ -91,7 +93,8 @@ public class ParquetReadOptions {
ByteBufferAllocator allocator,
int maxAllocationSize,
Map<String, String> properties,
FileDecryptionProperties fileDecryptionProperties) {
FileDecryptionProperties fileDecryptionProperties,
ParquetMetricsCallback metricsCallback) {
this(
useSignedStringMinMax,
useStatsFilter,
Expand All @@ -108,6 +111,7 @@ public class ParquetReadOptions {
maxAllocationSize,
properties,
fileDecryptionProperties,
metricsCallback,
new HadoopParquetConfiguration());
}

Expand All @@ -127,6 +131,7 @@ public class ParquetReadOptions {
int maxAllocationSize,
Map<String, String> properties,
FileDecryptionProperties fileDecryptionProperties,
ParquetMetricsCallback metricsCallback,
ParquetConfiguration conf) {
this.useSignedStringMinMax = useSignedStringMinMax;
this.useStatsFilter = useStatsFilter;
Expand All @@ -143,6 +148,7 @@ public class ParquetReadOptions {
this.maxAllocationSize = maxAllocationSize;
this.properties = Collections.unmodifiableMap(properties);
this.fileDecryptionProperties = fileDecryptionProperties;
this.metricsCallback = metricsCallback;
this.conf = conf;
}

Expand Down Expand Up @@ -210,6 +216,10 @@ public FileDecryptionProperties getDecryptionProperties() {
return fileDecryptionProperties;
}

public ParquetMetricsCallback getMetricsCallback() {
return metricsCallback;
}

public boolean isEnabled(String property, boolean defaultValue) {
Optional<String> propValue = Optional.ofNullable(properties.get(property));
return propValue.map(Boolean::parseBoolean).orElse(defaultValue);
Expand Down Expand Up @@ -245,6 +255,7 @@ public static class Builder {
protected Map<String, String> properties = new HashMap<>();
protected FileDecryptionProperties fileDecryptionProperties = null;
protected ParquetConfiguration conf;
protected ParquetMetricsCallback metricsCallback;

public Builder() {
this(new HadoopParquetConfiguration());
Expand Down Expand Up @@ -391,6 +402,11 @@ public Builder withDecryption(FileDecryptionProperties fileDecryptionProperties)
return this;
}

public Builder withMetricsCallback(ParquetMetricsCallback metricsCallback) {
this.metricsCallback = metricsCallback;
return this;
}

public Builder set(String key, String value) {
properties.put(key, value);
return this;
Expand All @@ -407,6 +423,7 @@ public Builder copy(ParquetReadOptions options) {
withAllocator(options.allocator);
withPageChecksumVerification(options.usePageChecksumVerification);
withDecryption(options.fileDecryptionProperties);
withMetricsCallback(options.metricsCallback);
conf = options.conf;
for (Map.Entry<String, String> keyValue : options.properties.entrySet()) {
set(keyValue.getKey(), keyValue.getValue());
Expand Down Expand Up @@ -439,6 +456,7 @@ public ParquetReadOptions build() {
maxAllocationSize,
properties,
fileDecryptionProperties,
metricsCallback,
conf);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ static final class ColumnChunkPageReader implements PageReader {
private final BlockCipher.Decryptor blockDecryptor;
private final byte[] dataPageAAD;
private final byte[] dictionaryPageAAD;
ParquetMetricsCallback metricsCallback;
private final ParquetMetricsCallback metricsCallback;

ColumnChunkPageReader(
BytesInputDecompressor decompressor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,19 +123,10 @@ public class ParquetFileReader implements Closeable {

public static String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism";

private ParquetMetricsCallback metricsCallback;

private final ParquetMetadataConverter converter;

private final CRC32 crc;

/**
* set a callback to send back metrics info
*/
public void initMetrics(ParquetMetricsCallback callback) {
this.metricsCallback = callback;
}

/**
* for files provided, check if there's a summary file.
* If a summary file is found it is used otherwise the file footer is used.
Expand Down Expand Up @@ -832,6 +823,38 @@ public ParquetFileReader(Configuration conf, Path file, ParquetMetadata footer)
this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
}

/**
* @param conf the Hadoop Configuration
* @param file Path to a parquet file
* @param footer a {@link ParquetMetadata} footer already read from the file
* @throws IOException if the file can not be opened
* @deprecated will be removed in 2.0.0.
*/
public ParquetFileReader(Configuration conf, Path file, ParquetMetadata footer, ParquetReadOptions options)
throws IOException {
this.converter = new ParquetMetadataConverter(conf);
this.file = HadoopInputFile.fromPath(file, conf);
this.f = this.file.newStream();
this.fileMetaData = footer.getFileMetaData();
this.fileDecryptor = fileMetaData.getFileDecryptor();
this.options = options;
this.footer = footer;
try {
this.blocks = filterRowGroups(footer.getBlocks());
} catch (Exception e) {
// In case that filterRowGroups throws an exception in the constructor, the new stream
// should be closed. Otherwise, there's no way to close this outside.
f.close();
throw e;
}
this.blockIndexStores = listWithNulls(this.blocks.size());
this.blockRowRanges = listWithNulls(this.blocks.size());
for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
paths.put(ColumnPath.get(col.getPath()), col);
}
this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
}

public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOException {
this.converter = new ParquetMetadataConverter(options);
this.file = file;
Expand Down Expand Up @@ -1787,7 +1810,7 @@ public ColumnChunkPageReader readAllPages(
rowGroupOrdinal,
columnOrdinal,
options,
metricsCallback);
options.getMetricsCallback());
}

private boolean hasMorePages(long valuesCountReadSoFar, int dataPageCountReadSoFar) {
Expand Down Expand Up @@ -1987,6 +2010,7 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx
}

private void setReadMetrics(long startNs) {
ParquetMetricsCallback metricsCallback = options.getMetricsCallback();
if (metricsCallback != null) {
long totalFileReadTimeNs = System.nanoTime() - startNs;
double sizeInMb = ((double) length) / (1024 * 1024);
Expand Down

0 comments on commit c6ee55c

Please sign in to comment.