Skip to content

Commit

Permalink
Address more review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
parthchandra committed Jan 3, 2024
1 parent c6ee55c commit 3a9050b
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -306,15 +306,15 @@ public DataPage visit(DataPageV2 dataPageV2) {

private void setDecompressMetrics(BytesInput bytes, long start) {
if (metricsCallback != null) {
long time = System.nanoTime() - start;
long time = Math.max(System.nanoTime() - start, 0);
long len = bytes.size();
double throughput = ((double) len / time) * ((double) 1000_000_000L) / (1024 * 1024);
LOG.debug(
"Decompress block: Length: {} MB, Time: {} msecs, throughput: {} MB/s",
len / (1024 * 1024),
time / 1000_000L,
throughput);
metricsCallback.setValueLong(ParquetFileReaderMetrics.DecompressTime.name(), time);
metricsCallback.setDuration(ParquetFileReaderMetrics.DecompressTime.name(), time);
metricsCallback.setValueLong(ParquetFileReaderMetrics.DecompressSize.name(), len);
metricsCallback.setValueDouble(ParquetFileReaderMetrics.DecompressThroughput.name(), throughput);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2012,7 +2012,7 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx
private void setReadMetrics(long startNs) {
ParquetMetricsCallback metricsCallback = options.getMetricsCallback();
if (metricsCallback != null) {
long totalFileReadTimeNs = System.nanoTime() - startNs;
long totalFileReadTimeNs = Math.max(System.nanoTime() - startNs, 0);
double sizeInMb = ((double) length) / (1024 * 1024);
double timeInSec = ((double) totalFileReadTimeNs) / 1000_0000_0000L;
double throughput = sizeInMb / timeInSec;
Expand All @@ -2021,7 +2021,7 @@ private void setReadMetrics(long startNs) {
sizeInMb,
timeInSec,
throughput);
metricsCallback.setValueLong(ParquetFileReaderMetrics.ReadTime.name(), totalFileReadTimeNs);
metricsCallback.setDuration(ParquetFileReaderMetrics.ReadTime.name(), totalFileReadTimeNs);
metricsCallback.setValueLong(ParquetFileReaderMetrics.ReadSize.name(), length);
metricsCallback.setValueDouble(ParquetFileReaderMetrics.ReadThroughput.name(), throughput);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,21 @@
*/
package org.apache.parquet.hadoop;

import org.apache.hadoop.classification.InterfaceStability;

/**
* a simple interface to pass basic metric values by name to any implementation. Typically an
* A simple interface to pass basic metric values by name to any implementation. Typically, an
* implementation of this interface will serve as a bridge to pass metric values on
* to the metrics system of a distributed engine (hadoop, spark, etc).
* <br/>
* Development Note: This interface should provide a default implementation for any new metric tracker
* added to allow for backward compatibility
* <br/>
* e.g.
* <br/>
* <code>default addMaximum(key, value) { } ; </code>
*/
@InterfaceStability.Unstable
public interface ParquetMetricsCallback {
void setValueInt(String name, int value);

Expand All @@ -31,4 +41,6 @@ public interface ParquetMetricsCallback {
void setValueFloat(String name, float value);

void setValueDouble(String name, double value);

void setDuration(String name, long value);
}

0 comments on commit 3a9050b

Please sign in to comment.