From 3a9050bf1a827f2f09d520876621e1047c75c398 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Tue, 2 Jan 2024 16:23:56 -0800 Subject: [PATCH] Address more review comments --- .../parquet/hadoop/ColumnChunkPageReadStore.java | 4 ++-- .../apache/parquet/hadoop/ParquetFileReader.java | 4 ++-- .../parquet/hadoop/ParquetMetricsCallback.java | 14 +++++++++++++- 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java index 501a1090dc..929b1519b1 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java @@ -306,7 +306,7 @@ public DataPage visit(DataPageV2 dataPageV2) { private void setDecompressMetrics(BytesInput bytes, long start) { if (metricsCallback != null) { - long time = System.nanoTime() - start; + long time = Math.max(System.nanoTime() - start, 0); long len = bytes.size(); double throughput = ((double) len / time) * ((double) 1000_000_000L) / (1024 * 1024); LOG.debug( @@ -314,7 +314,7 @@ private void setDecompressMetrics(BytesInput bytes, long start) { len / (1024 * 1024), time / 1000_000L, throughput); - metricsCallback.setValueLong(ParquetFileReaderMetrics.DecompressTime.name(), time); + metricsCallback.setDuration(ParquetFileReaderMetrics.DecompressTime.name(), time); metricsCallback.setValueLong(ParquetFileReaderMetrics.DecompressSize.name(), len); metricsCallback.setValueDouble(ParquetFileReaderMetrics.DecompressThroughput.name(), throughput); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index c0109e5977..e1420da35f 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -2012,7 +2012,7 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx private void setReadMetrics(long startNs) { ParquetMetricsCallback metricsCallback = options.getMetricsCallback(); if (metricsCallback != null) { - long totalFileReadTimeNs = System.nanoTime() - startNs; + long totalFileReadTimeNs = Math.max(System.nanoTime() - startNs, 0); double sizeInMb = ((double) length) / (1024 * 1024); double timeInSec = ((double) totalFileReadTimeNs) / 1000_0000_0000L; double throughput = sizeInMb / timeInSec; @@ -2021,7 +2021,7 @@ private void setReadMetrics(long startNs) { sizeInMb, timeInSec, throughput); - metricsCallback.setValueLong(ParquetFileReaderMetrics.ReadTime.name(), totalFileReadTimeNs); + metricsCallback.setDuration(ParquetFileReaderMetrics.ReadTime.name(), totalFileReadTimeNs); metricsCallback.setValueLong(ParquetFileReaderMetrics.ReadSize.name(), length); metricsCallback.setValueDouble(ParquetFileReaderMetrics.ReadThroughput.name(), throughput); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetMetricsCallback.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetMetricsCallback.java index 7ef0dd1265..a0dccdc3b8 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetMetricsCallback.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetMetricsCallback.java @@ -18,11 +18,21 @@ */ package org.apache.parquet.hadoop; +import org.apache.hadoop.classification.InterfaceStability; + /** - * a simple interface to pass basic metric values by name to any implementation. Typically an + * A simple interface to pass basic metric values by name to any implementation. Typically, an * implementation of this interface will serve as a bridge to pass metric values on * to the metrics system of a distributed engine (hadoop, spark, etc). + *
+ * Development Note: This interface should provide a default implementation for any new metric tracker + * added to allow for backward compatibility + *
+ * e.g. + *
+ * default addMaximum(key, value) { } ; */ +@InterfaceStability.Unstable public interface ParquetMetricsCallback { void setValueInt(String name, int value); @@ -31,4 +41,6 @@ public interface ParquetMetricsCallback { void setValueFloat(String name, float value); void setValueDouble(String name, double value); + + void setDuration(String name, long value); }