From 96c8d84e6ad20819b3ed5c70431705337cd5c4ae Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Fri, 26 Apr 2024 17:40:53 -0700 Subject: [PATCH] PARQUET-2171: (followup) add read metrics and hadoop conf integration for vector io reader --- .../main/java/org/apache/parquet/ParquetReadOptions.java | 2 ++ .../java/org/apache/parquet/hadoop/ParquetFileReader.java | 8 +++++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java index e737c799b0..8c05d0224b 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java @@ -23,6 +23,7 @@ import static org.apache.parquet.hadoop.ParquetInputFormat.BLOOM_FILTERING_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.OFF_HEAP_DECRYPT_BUFFER_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED; @@ -285,6 +286,7 @@ public Builder(ParquetConfiguration conf) { withCodecFactory(HadoopCodecs.newFactory(conf, 0)); withRecordFilter(getFilter(conf)); withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608)); + withUseHadoopVectoredIo(conf.getBoolean(HADOOP_VECTORED_IO_ENABLED, HADOOP_VECTORED_IO_ENABLED_DEFAULT)); String badRecordThresh = conf.get(BAD_RECORD_THRESHOLD_CONF_KEY); if (badRecordThresh != null) { set(BAD_RECORD_THRESHOLD_CONF_KEY, badRecordThresh); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 8776e85e66..1d8cce3d8c 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -2165,7 +2165,7 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx f.readFully(buffer); buffer.flip(); } - setReadMetrics(readStart); + setReadMetrics(readStart, length); // report in a counter the data we just scanned BenchmarkCounter.incrementBytesRead(length); @@ -2175,11 +2175,11 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx } } - private void setReadMetrics(long startNs) { + private void setReadMetrics(long startNs, long len) { ParquetMetricsCallback metricsCallback = options.getMetricsCallback(); if (metricsCallback != null) { long totalFileReadTimeNs = Math.max(System.nanoTime() - startNs, 0); - double sizeInMb = ((double) length) / (1024 * 1024); + double sizeInMb = ((double) len) / (1024 * 1024); double timeInSec = ((double) totalFileReadTimeNs) / 1000_0000_0000L; double throughput = sizeInMb / timeInSec; LOG.debug( @@ -2203,12 +2203,14 @@ private void setReadMetrics(long startNs) { public void readFromVectoredRange(ParquetFileRange currRange, ChunkListBuilder builder) throws IOException { ByteBuffer buffer; final long timeoutSeconds = HADOOP_VECTORED_READ_TIMEOUT_SECONDS; + long readStart = System.nanoTime(); try { LOG.debug( "Waiting for vectored read to finish for range {} with timeout {} seconds", currRange, timeoutSeconds); buffer = FutureIO.awaitFuture(currRange.getDataReadFuture(), timeoutSeconds, TimeUnit.SECONDS); + setReadMetrics(readStart, currRange.getLength()); // report in a counter the data we just scanned BenchmarkCounter.incrementBytesRead(currRange.getLength()); } catch (TimeoutException e) {