From e6647b7524bf22821609e52a74d1298e49f44de7 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Thu, 9 Dec 2021 13:08:56 -0800 Subject: [PATCH] PARQUET-2149: Async IO implementation for ParquetFileReader --- .../parquet/column/page/PageReader.java | 10 +- .../bytes/AsyncMultiBufferInputStream.java | 166 ++++++ .../parquet/bytes/ByteBufferInputStream.java | 7 + .../parquet/bytes/MultiBufferInputStream.java | 9 +- .../bytes/SequenceByteBufferInputStream.java | 268 ++++++++++ .../TestAsyncMultiBufferInputStream.java | 253 +++++++++ .../org/apache/parquet/HadoopReadOptions.java | 13 + .../apache/parquet/ParquetReadOptions.java | 79 ++- .../parquet/crypto/InternalFileDecryptor.java | 5 +- .../hadoop/ColumnChunkPageReadStore.java | 46 +- .../apache/parquet/hadoop/FilePageReader.java | 392 ++++++++++++++ .../parquet/hadoop/ParquetFileReader.java | 498 ++++++++++-------- .../parquet/hadoop/ParquetInputFormat.java | 16 +- .../apache/parquet/hadoop/ParquetReader.java | 27 +- .../TestPropertiesDrivenEncryption.java | 52 +- .../hadoop/TestColumnChunkPageWriteStore.java | 55 +- .../parquet/hadoop/TestMultipleWriteRead.java | 42 +- 17 files changed, 1661 insertions(+), 277 deletions(-) create mode 100644 parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java create mode 100644 parquet-common/src/main/java/org/apache/parquet/bytes/SequenceByteBufferInputStream.java create mode 100644 parquet-common/src/test/java/org/apache/parquet/bytes/TestAsyncMultiBufferInputStream.java create mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/hadoop/FilePageReader.java diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java b/parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java index 0b4321ca78..214385ce31 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java @@ -18,10 +18,13 @@ */ package org.apache.parquet.column.page; +import java.io.Closeable; +import java.io.IOException; + /** * Reader for a sequence a page from a given column chunk */ -public interface PageReader { +public interface PageReader extends Closeable { /** * @return the dictionary page in that chunk or null if none @@ -37,4 +40,9 @@ public interface PageReader { * @return the next page in that chunk or null if after the last page */ DataPage readPage(); + + /** + * Close the page reader. By default it is no-op. + */ + default void close() throws IOException {} } diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java new file mode 100644 index 0000000000..c50aa58c7e --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/AsyncMultiBufferInputStream.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.LongAccumulator; +import java.util.concurrent.atomic.LongAdder; +import org.apache.parquet.io.SeekableInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class AsyncMultiBufferInputStream extends MultiBufferInputStream { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncMultiBufferInputStream.class); + + private int fetchIndex = 0; + private final SeekableInputStream fileInputStream; + private int readIndex = 0; + private ExecutorService threadPool; + private LinkedBlockingQueue> readFutures; + private boolean closed = false; + + private LongAdder totalTimeBlocked = new LongAdder(); + private LongAdder totalCountBlocked = new LongAdder(); + private LongAccumulator maxTimeBlocked = new LongAccumulator(Long::max, 0L); + + AsyncMultiBufferInputStream( + ExecutorService threadPool, SeekableInputStream fileInputStream, List buffers) { + super(buffers); + this.fileInputStream = fileInputStream; + this.threadPool = threadPool; + readFutures = new LinkedBlockingQueue<>(buffers.size()); + if (LOG.isDebugEnabled()) { + LOG.debug("ASYNC: Begin read into buffers "); + for (ByteBuffer buf : buffers) { + LOG.debug("ASYNC: buffer {} ", buf); + } + } + fetchAll(); + } + + private void checkState() { + if (closed) { + throw new RuntimeException("Stream is closed"); + } + } + + private void fetchAll() { + checkState(); + submitReadTask(0); + } + + private void submitReadTask(int bufferNo) { + ByteBuffer buffer = buffers.get(bufferNo); + try { + readFutures.put(threadPool.submit(() -> { + readOneBuffer(buffer); + if (bufferNo < buffers.size() - 1) { + submitReadTask(bufferNo + 1); + } + return null; + })); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + private void readOneBuffer(ByteBuffer buffer) { + long startTime = System.nanoTime(); + try { + fileInputStream.readFully(buffer); + buffer.flip(); + long readCompleted = System.nanoTime(); + long timeSpent = readCompleted - startTime; + LOG.debug("ASYNC Stream: READ - {}", timeSpent / 1000.0); + fetchIndex++; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public boolean nextBuffer() { + checkState(); + // hack: parent constructor can call this method before this class is fully initialized. + // Just return without doing anything. + if (readFutures == null) { + return false; + } + if (readIndex < buffers.size()) { + long start = System.nanoTime(); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("ASYNC (next): Getting next buffer"); + } + Future future = readFutures.take(); + future.get(); + long timeSpent = System.nanoTime() - start; + totalCountBlocked.add(1); + totalTimeBlocked.add(timeSpent); + maxTimeBlocked.accumulate(timeSpent); + if (LOG.isDebugEnabled()) { + LOG.debug("ASYNC (next): {}: Time blocked for read {} ns", this, timeSpent); + } + } catch (InterruptedException | ExecutionException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + LOG.error("Async (next): exception while getting next buffer: ", e); + throw new RuntimeException(e); + } + readIndex++; + } + return super.nextBuffer(); + } + + public void close() { + LOG.debug( + "ASYNC Stream: Blocked: {} {} {}", + totalTimeBlocked.longValue() / 1000.0, + totalCountBlocked.longValue(), + maxTimeBlocked.longValue() / 1000.0); + Future readResult; + while (!readFutures.isEmpty()) { + try { + readResult = readFutures.poll(); + readResult.get(); + if (!readResult.isDone() && !readResult.isCancelled()) { + readResult.cancel(true); + } else { + readResult.get(1, TimeUnit.MILLISECONDS); + } + } catch (ExecutionException | InterruptedException | TimeoutException e) { + // Do nothing + } + } + closed = true; + } +} diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java index 786d5059ff..69bbc369cf 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java @@ -25,7 +25,9 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ExecutorService; import org.apache.parquet.ShouldNeverHappenException; +import org.apache.parquet.io.SeekableInputStream; public class ByteBufferInputStream extends InputStream { @@ -48,6 +50,11 @@ public static ByteBufferInputStream wrap(List buffers) { } } + public static ByteBufferInputStream wrapAsync( + ExecutorService threadPool, SeekableInputStream fileInputStream, List buffers) { + return new AsyncMultiBufferInputStream(threadPool, fileInputStream, buffers); + } + ByteBufferInputStream() { delegate = null; } diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java index 711d160c78..7c56685a8f 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java @@ -31,7 +31,7 @@ class MultiBufferInputStream extends ByteBufferInputStream { private static final ByteBuffer EMPTY = ByteBuffer.allocate(0); - private final List buffers; + protected final List buffers; private final long length; private Iterator iterator; @@ -42,6 +42,11 @@ class MultiBufferInputStream extends ByteBufferInputStream { private long markLimit = 0; private List markBuffers = new ArrayList<>(); + @Override + public String toString() { + return "MultiBufferInputStream{" + "length=" + length + ", current=" + current + ", position=" + position + '}'; + } + MultiBufferInputStream(List buffers) { this.buffers = buffers; @@ -303,7 +308,7 @@ public boolean markSupported() { return true; } - private boolean nextBuffer() { + protected boolean nextBuffer() { if (!iterator.hasNext()) { this.current = null; return false; diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/SequenceByteBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/SequenceByteBufferInputStream.java new file mode 100644 index 0000000000..4deb3b9d0f --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/SequenceByteBufferInputStream.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +/** + * A bare minimum implementation of a {@link java.io.SequenceInputStream} that wraps an + * ordered collection of ByteBufferInputStreams. + *

+ * This class, as implemented, is intended only for a specific use in the ParquetFileReader and + * throws {@link UnsupportedOperationException} in unimplemented methods to catch any unintended + * use in other cases. + *

+ * Even though this class is derived from ByteBufferInputStream it explicitly does not support any + * byte buffer related methods like slice. It does, however support sliceBuffers which is a + * curious case of reading data from underlying streams + *

+ * Even though this class changes the state of the underlying streams (by reading from them) + * it does not own them and so the close method does not close the streams. To avoid resource + * leaks the calling code should close the underlying streams + */ +public class SequenceByteBufferInputStream extends ByteBufferInputStream { + + Collection collection; + Iterator iterator; + ByteBufferInputStream current; + long position = 0; + + @Override + public String toString() { + return "SequenceByteBufferInputStream{" + "collection=" + + collection + ", current=" + + current + ", position=" + + position + '}'; + } + + public SequenceByteBufferInputStream(Collection collection) { + this.collection = collection; + iterator = collection.iterator(); + current = iterator.hasNext() ? iterator.next() : null; + if (current == null) { + throw new UnsupportedOperationException( + "Initializing SequenceByteBufferInputStream with an empty collection is not supported"); + } + } + + @Override + public long position() { + return position; + } + + @Override + public int read(ByteBuffer out) { + int len = out.remaining(); + if (len <= 0) { + return 0; + } + if (current == null) { + return -1; + } + int totalBytesRead = 0; + while (totalBytesRead < len) { + int bytesRead = current.read(out); + if (bytesRead == -1) { + if (iterator.hasNext()) { + current = iterator.next(); + } else { + break; + } + } else { + totalBytesRead += bytesRead; + } + } + position += totalBytesRead; + return totalBytesRead; + } + + @Override + public ByteBuffer slice(int length) throws EOFException { + throw new UnsupportedOperationException("slice is not supported"); + } + + @Override + /** + * This is a blocking call. Use with care when using in asynchronous mode. + */ + public List sliceBuffers(long len) throws EOFException { + if (len <= 0) { + return Collections.emptyList(); + } + + if (current == null) { + throw new EOFException(); + } + + List buffers = new ArrayList<>(); + long bytesAccumulated = 0; + while (bytesAccumulated < len) { + // This is not strictly according to the input stream contract, but once again the + // underlying implementations of ByteBufferInputStream return the available bytes + // based on the size of the underlying buffers rather than the bytes currently read + // into the buffers. This works for us because the underlying implementations will + // actually fill the buffers with the data before returning the slices we ask for + // (which is why this is a blocking call) + if (current.available() > 0) { + int bufLen = (int) Math.min(len - bytesAccumulated, current.available()); + List currentSlices = current.sliceBuffers(bufLen); + buffers.addAll(currentSlices); + bytesAccumulated += bufLen; + + // update state; the bytes are considered read + this.position += bufLen; + } else { + if (iterator.hasNext()) { + current = iterator.next(); + } else { + // there are no more streams + throw new EOFException(); + } + } + } + position += bytesAccumulated; + return buffers; + } + + @Override + public ByteBufferInputStream sliceStream(long length) throws EOFException { + throw new UnsupportedOperationException("sliceStream is not supported"); + } + + @Override + public List remainingBuffers() { + throw new UnsupportedOperationException("remainingBuffers is not supported"); + } + + @Override + public ByteBufferInputStream remainingStream() { + throw new UnsupportedOperationException("remainingStream is not supported"); + } + + @Override + public int read() throws IOException { + int val; + while (true) { + try { + val = current.read() & 0xFF; // as unsigned + position += 1; + break; + } catch (EOFException e) { + if (iterator.hasNext()) { + current = iterator.next(); + } else { + return -1; + } + } + } + return val; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (len <= 0) { + if (len < 0) { + throw new IndexOutOfBoundsException("Read length must be greater than 0: " + len); + } + return 0; + } + if (current == null) { + return -1; + } + int totalBytes = 0; + while (totalBytes < len) { + int bytesRead = current.read(b, off + totalBytes, len - totalBytes); + if (bytesRead == -1) { + if (iterator.hasNext()) { + current = iterator.next(); + } else { + break; + } + } else { + totalBytes += bytesRead; + } + } + position += totalBytes; + return totalBytes; + } + + @Override + public long skip(long n) { + if (n <= 0) { + if (n < 0) { + throw new IndexOutOfBoundsException("Skip length must be greater than 0: " + n); + } + return 0; + } + if (current == null) { + return -1; + } + long totalBytesSkipped = 0; + while (totalBytesSkipped < n) { + long bytesSkipped = current.skip(n - totalBytesSkipped); + // the contract for skip specifies that skip may return 0 if EOF is reached in which case + // we have no good way of knowing the end of stream has been reached. We depend on the + // fact that all implementations of ByteBufferInputStream return -1 on EOF + if (bytesSkipped == -1) { + if (iterator.hasNext()) { + current = iterator.next(); + } else { + break; + } + } else { + totalBytesSkipped += bytesSkipped; + } + } + position += totalBytesSkipped; + return totalBytesSkipped; + } + + @Override + public int available() { + return current.available(); + } + + @Override + public void close() throws IOException { + super.close(); + } + + @Override + public void mark(int readlimit) { + throw new UnsupportedOperationException("mark is not supported"); + } + + @Override + public void reset() throws IOException { + throw new UnsupportedOperationException("reset is not supported"); + } + + @Override + public boolean markSupported() { + return false; + } +} diff --git a/parquet-common/src/test/java/org/apache/parquet/bytes/TestAsyncMultiBufferInputStream.java b/parquet-common/src/test/java/org/apache/parquet/bytes/TestAsyncMultiBufferInputStream.java new file mode 100644 index 0000000000..289fa576d3 --- /dev/null +++ b/parquet-common/src/test/java/org/apache/parquet/bytes/TestAsyncMultiBufferInputStream.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.parquet.io.DelegatingSeekableInputStream; +import org.apache.parquet.io.SeekableInputStream; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestAsyncMultiBufferInputStream extends TestMultiBufferInputStream { + private static final List DATA = Arrays.asList( + ByteBuffer.allocate(9), + ByteBuffer.allocate(4), + ByteBuffer.allocate(0), + ByteBuffer.allocate(12), + ByteBuffer.allocate(1), + ByteBuffer.allocate(7), + ByteBuffer.allocate(2)); + + private static final List WRITEDATA = Arrays.asList( + ByteBuffer.wrap(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8}), + ByteBuffer.wrap(new byte[] {9, 10, 11, 12}), + ByteBuffer.wrap(new byte[] {}), + ByteBuffer.wrap(new byte[] {13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24}), + ByteBuffer.wrap(new byte[] {25}), + ByteBuffer.wrap(new byte[] {26, 27, 28, 29, 30, 31, 32}), + ByteBuffer.wrap(new byte[] {33, 34})); + + private static Path tempPath; + private static String filename; + private static SeekableInputStream seekableInputStream; + private static ExecutorService threadPool = Executors.newFixedThreadPool( + Runtime.getRuntime().availableProcessors() * 2, r -> new Thread(r, "test-parquet-async-io")); + + @BeforeClass + public static void init() throws IOException { + tempPath = Files.createTempDirectory("asyncstream"); + filename = tempPath.toAbsolutePath() + "/async_mbinput"; + OutputStream outputStream = new FileOutputStream(filename); + for (ByteBuffer writedatum : WRITEDATA) { + outputStream.write(writedatum.array()); + } + } + + @AfterClass + public static void close() throws IOException { + threadPool.shutdownNow(); + Files.deleteIfExists(Paths.get(filename)); + Files.deleteIfExists(tempPath); + } + + @Override + protected ByteBufferInputStream newStream() { + try { + seekableInputStream = new LocalFSInputStream(new FileInputStream(filename)); + } catch (FileNotFoundException e) { + throw new RuntimeException("Failed to initialize test input stream.", e); + } + AsyncMultiBufferInputStream stream = new AsyncMultiBufferInputStream(threadPool, seekableInputStream, DATA); + stream.nextBuffer(); + return stream; + } + + @Override + protected void checkOriginalData() { + for (ByteBuffer buffer : DATA) { + Assert.assertEquals("Position should not change", 0, buffer.position()); + Assert.assertEquals("Limit should not change", buffer.array().length, buffer.limit()); + } + } + + @Test + public void testSliceData() throws Exception { + ByteBufferInputStream stream = newStream(); + int length = stream.available(); + + List buffers = new ArrayList<>(); + // slice the stream into 3 8-byte buffers and 1 2-byte buffer + while (stream.available() > 0) { + int bytesToSlice = Math.min(stream.available(), 8); + buffers.add(stream.slice(bytesToSlice)); + } + + Assert.assertEquals("Position should be at end", length, stream.position()); + Assert.assertEquals("Should produce 5 buffers", 5, buffers.size()); + + int i = 0; + + // one is a view of the first buffer because it is smaller + ByteBuffer one = buffers.get(0); + Assert.assertSame( + "Should be a duplicate of the first array", + one.array(), + DATA.get(0).array()); + Assert.assertEquals(8, one.remaining()); + Assert.assertEquals(0, one.position()); + Assert.assertEquals(8, one.limit()); + Assert.assertEquals(9, one.capacity()); + for (; i < 8; i += 1) { + Assert.assertEquals("Should produce correct values", i, one.get()); + } + + // two should be a copy of the next 8 bytes + ByteBuffer two = buffers.get(1); + Assert.assertEquals(8, two.remaining()); + Assert.assertEquals(0, two.position()); + Assert.assertEquals(8, two.limit()); + Assert.assertEquals(8, two.capacity()); + for (; i < 16; i += 1) { + Assert.assertEquals("Should produce correct values", i, two.get()); + } + + // three is a copy of part of the 4th buffer + ByteBuffer three = buffers.get(2); + Assert.assertSame( + "Should be a duplicate of the fourth array", + three.array(), + DATA.get(3).array()); + Assert.assertEquals(8, three.remaining()); + Assert.assertEquals(3, three.position()); + Assert.assertEquals(11, three.limit()); + Assert.assertEquals(12, three.capacity()); + for (; i < 24; i += 1) { + Assert.assertEquals("Should produce correct values", i, three.get()); + } + + // four should be a copy of the next 8 bytes + ByteBuffer four = buffers.get(3); + Assert.assertEquals(8, four.remaining()); + Assert.assertEquals(0, four.position()); + Assert.assertEquals(8, four.limit()); + Assert.assertEquals(8, four.capacity()); + for (; i < 32; i += 1) { + Assert.assertEquals("Should produce correct values", i, four.get()); + } + + // five should be a copy of the next 8 bytes + ByteBuffer five = buffers.get(4); + Assert.assertEquals(3, five.remaining()); + Assert.assertEquals(0, five.position()); + Assert.assertEquals(3, five.limit()); + Assert.assertEquals(3, five.capacity()); + for (; i < 35; i += 1) { + Assert.assertEquals("Should produce correct values", i, five.get()); + } + } + + public static class LocalFSInputStream extends DelegatingSeekableInputStream { + + long pos = 0; + private final InputStream stream; + + public LocalFSInputStream(InputStream stream) { + super(stream); + this.stream = stream; + } + + @Override + public long getPos() throws IOException { + return pos; + } + + @Override + public void seek(long newPos) throws IOException { + long skipBytes; + if (newPos < pos) { + stream.reset(); + skipBytes = newPos; + } else { + skipBytes = newPos - pos; + } + while (skipBytes > 0) { + long n = stream.skip(skipBytes); + if (n == 0) break; + skipBytes -= n; + } + pos = newPos - skipBytes; + } + + @Override + public int read() throws IOException { + pos++; + return super.read(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + pos += len; + return super.read(b, off, len); + } + + @Override + public void readFully(byte[] bytes) throws IOException { + pos += bytes.length; + super.readFully(bytes); + } + + @Override + public void readFully(byte[] bytes, int start, int len) throws IOException { + pos += len - start; + super.readFully(bytes, start, len); + } + + @Override + public int read(ByteBuffer buf) throws IOException { + int n = super.read(buf); + if (n > 0) { + pos += n; + } + return n; + } + + @Override + public void readFully(ByteBuffer buf) throws IOException { + pos += buf.remaining(); + super.readFully(buf); + } + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java index e277fcde5f..68daf78e65 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java @@ -20,6 +20,7 @@ package org.apache.parquet; import java.util.Map; +import java.util.concurrent.ExecutorService; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.bytes.ByteBufferAllocator; @@ -44,6 +45,8 @@ private HadoopReadOptions( boolean useBloomFilter, boolean useOffHeapDecryptBuffer, boolean useHadoopVectoredIo, + boolean enableAsyncIOReader, + boolean enableParallelColumnReader, FilterCompat.Filter recordFilter, MetadataFilter metadataFilter, CompressionCodecFactory codecFactory, @@ -52,6 +55,8 @@ private HadoopReadOptions( Map properties, Configuration conf, FileDecryptionProperties fileDecryptionProperties, + ExecutorService ioThreadPool, + ExecutorService processThreadPool, ParquetMetricsCallback metricsCallback) { super( useSignedStringMinMax, @@ -63,6 +68,8 @@ private HadoopReadOptions( useBloomFilter, useOffHeapDecryptBuffer, useHadoopVectoredIo, + enableAsyncIOReader, + enableParallelColumnReader, recordFilter, metadataFilter, codecFactory, @@ -70,6 +77,8 @@ private HadoopReadOptions( maxAllocationSize, properties, fileDecryptionProperties, + ioThreadPool, + processThreadPool, metricsCallback, new HadoopParquetConfiguration(conf)); this.conf = conf; @@ -126,6 +135,8 @@ public ParquetReadOptions build() { useBloomFilter, useOffHeapDecryptBuffer, useHadoopVectoredIo, + enableAsyncIOReader, + enableParallelColumnReader, recordFilter, metadataFilter, codecFactory, @@ -134,6 +145,8 @@ public ParquetReadOptions build() { properties, conf, fileDecryptionProperties, + ioThreadPool, + processThreadPool, metricsCallback); } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java index 8c05d0224b..4878fe3858 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java @@ -23,6 +23,8 @@ import static org.apache.parquet.hadoop.ParquetInputFormat.BLOOM_FILTERING_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.ENABLE_ASYNC_IO_READER; +import static org.apache.parquet.hadoop.ParquetInputFormat.ENABLE_PARALLEL_COLUMN_READER; import static org.apache.parquet.hadoop.ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.OFF_HEAP_DECRYPT_BUFFER_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED; @@ -36,6 +38,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutorService; import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.HeapByteBufferAllocator; import org.apache.parquet.compression.CompressionCodecFactory; @@ -61,6 +64,8 @@ public class ParquetReadOptions { private static final boolean PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT = false; private static final boolean BLOOM_FILTER_ENABLED_DEFAULT = true; private static final boolean USE_OFF_HEAP_DECRYPT_BUFFER_DEFAULT = false; + private static final boolean ENABLE_ASYNC_IO_READER_DEFAULT = false; + private static final boolean ENABLE_PARALLEL_COLUMN_READER_DEFAULT = false; private final boolean useSignedStringMinMax; private final boolean useStatsFilter; @@ -71,10 +76,16 @@ public class ParquetReadOptions { private final boolean useBloomFilter; private final boolean useOffHeapDecryptBuffer; private final boolean useHadoopVectoredIo; + private final boolean enableAsyncIOReader; + private final boolean enableParallelColumnReader; private final FilterCompat.Filter recordFilter; private final ParquetMetadataConverter.MetadataFilter metadataFilter; private final CompressionCodecFactory codecFactory; private final ByteBufferAllocator allocator; + // Thread pool to read column chunk data from storage. + private final ExecutorService ioThreadPool; + // Thread pool to process pages for multiple columns in parallel. + private final ExecutorService processThreadPool; private final int maxAllocationSize; private final Map properties; private final FileDecryptionProperties fileDecryptionProperties; @@ -91,6 +102,8 @@ public class ParquetReadOptions { boolean useBloomFilter, boolean useOffHeapDecryptBuffer, boolean useHadoopVectoredIo, + boolean enableAsyncIOReader, + boolean enableParallelColumnReader, FilterCompat.Filter recordFilter, ParquetMetadataConverter.MetadataFilter metadataFilter, CompressionCodecFactory codecFactory, @@ -98,6 +111,8 @@ public class ParquetReadOptions { int maxAllocationSize, Map properties, FileDecryptionProperties fileDecryptionProperties, + ExecutorService ioThreadPool, + ExecutorService processThreadPool, ParquetMetricsCallback metricsCallback) { this( useSignedStringMinMax, @@ -109,6 +124,8 @@ public class ParquetReadOptions { useBloomFilter, useOffHeapDecryptBuffer, useHadoopVectoredIo, + enableAsyncIOReader, + enableParallelColumnReader, recordFilter, metadataFilter, codecFactory, @@ -116,6 +133,8 @@ public class ParquetReadOptions { maxAllocationSize, properties, fileDecryptionProperties, + ioThreadPool, + processThreadPool, metricsCallback, new HadoopParquetConfiguration()); } @@ -130,6 +149,8 @@ public class ParquetReadOptions { boolean useBloomFilter, boolean useOffHeapDecryptBuffer, boolean useHadoopVectoredIo, + boolean enableAsyncIOReader, + boolean enableParallelColumnReader, FilterCompat.Filter recordFilter, ParquetMetadataConverter.MetadataFilter metadataFilter, CompressionCodecFactory codecFactory, @@ -137,6 +158,8 @@ public class ParquetReadOptions { int maxAllocationSize, Map properties, FileDecryptionProperties fileDecryptionProperties, + ExecutorService ioThreadPool, + ExecutorService processThreadPool, ParquetMetricsCallback metricsCallback, ParquetConfiguration conf) { this.useSignedStringMinMax = useSignedStringMinMax; @@ -148,6 +171,8 @@ public class ParquetReadOptions { this.useBloomFilter = useBloomFilter; this.useOffHeapDecryptBuffer = useOffHeapDecryptBuffer; this.useHadoopVectoredIo = useHadoopVectoredIo; + this.enableAsyncIOReader = enableAsyncIOReader; + this.enableParallelColumnReader = enableParallelColumnReader; this.recordFilter = recordFilter; this.metadataFilter = metadataFilter; this.codecFactory = codecFactory; @@ -155,6 +180,8 @@ public class ParquetReadOptions { this.maxAllocationSize = maxAllocationSize; this.properties = Collections.unmodifiableMap(properties); this.fileDecryptionProperties = fileDecryptionProperties; + this.ioThreadPool = ioThreadPool; + this.processThreadPool = processThreadPool; this.metricsCallback = metricsCallback; this.conf = conf; } @@ -187,6 +214,14 @@ public boolean useOffHeapDecryptBuffer() { return useOffHeapDecryptBuffer; } + public boolean isAsyncIOReaderEnabled() { + return enableAsyncIOReader; + } + + public boolean isParallelColumnReaderEnabled() { + return enableParallelColumnReader; + } + public boolean usePageChecksumVerification() { return usePageChecksumVerification; } @@ -227,6 +262,14 @@ public FileDecryptionProperties getDecryptionProperties() { return fileDecryptionProperties; } + public ExecutorService getIOThreadPool() { + return ioThreadPool; + } + + public ExecutorService getProcessThreadPool() { + return processThreadPool; + } + public ParquetMetricsCallback getMetricsCallback() { return metricsCallback; } @@ -258,6 +301,8 @@ public static class Builder { protected boolean usePageChecksumVerification = PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT; protected boolean useBloomFilter = BLOOM_FILTER_ENABLED_DEFAULT; protected boolean useOffHeapDecryptBuffer = USE_OFF_HEAP_DECRYPT_BUFFER_DEFAULT; + protected boolean enableAsyncIOReader = ENABLE_ASYNC_IO_READER_DEFAULT; + protected boolean enableParallelColumnReader = ENABLE_PARALLEL_COLUMN_READER_DEFAULT; protected FilterCompat.Filter recordFilter = null; protected ParquetMetadataConverter.MetadataFilter metadataFilter = NO_FILTER; // the page size parameter isn't used when only using the codec factory to get decompressors @@ -266,6 +311,8 @@ public static class Builder { protected int maxAllocationSize = ALLOCATION_SIZE_DEFAULT; protected Map properties = new HashMap<>(); protected FileDecryptionProperties fileDecryptionProperties = null; + protected ExecutorService ioThreadPool = null; + protected ExecutorService processThreadPool = null; protected ParquetConfiguration conf; protected ParquetMetricsCallback metricsCallback; @@ -283,6 +330,9 @@ public Builder(ParquetConfiguration conf) { usePageChecksumVerification(conf.getBoolean(PAGE_VERIFY_CHECKSUM_ENABLED, usePageChecksumVerification)); useBloomFilter(conf.getBoolean(BLOOM_FILTERING_ENABLED, true)); useOffHeapDecryptBuffer(conf.getBoolean(OFF_HEAP_DECRYPT_BUFFER_ENABLED, false)); + enableAsyncIOReader(conf.getBoolean(ENABLE_ASYNC_IO_READER, ENABLE_ASYNC_IO_READER_DEFAULT)); + enableParallelColumnReader( + conf.getBoolean(ENABLE_PARALLEL_COLUMN_READER, ENABLE_PARALLEL_COLUMN_READER_DEFAULT)); withCodecFactory(HadoopCodecs.newFactory(conf, 0)); withRecordFilter(getFilter(conf)); withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608)); @@ -375,6 +425,16 @@ public Builder useBloomFilter(boolean useBloomFilter) { return this; } + public Builder enableAsyncIOReader(boolean enableAsyncIOReader) { + this.enableAsyncIOReader = enableAsyncIOReader; + return this; + } + + public Builder enableParallelColumnReader(boolean enableParallelColumnReader) { + this.enableParallelColumnReader = enableParallelColumnReader; + return this; + } + public Builder withRecordFilter(FilterCompat.Filter rowGroupFilter) { this.recordFilter = rowGroupFilter; return this; @@ -420,8 +480,13 @@ public Builder withDecryption(FileDecryptionProperties fileDecryptionProperties) return this; } - public Builder withMetricsCallback(ParquetMetricsCallback metricsCallback) { - this.metricsCallback = metricsCallback; + public Builder withIOThreadPool(ExecutorService ioThreadPool) { + this.ioThreadPool = ioThreadPool; + return this; + } + + public Builder withProcessThreadPool(ExecutorService processThreadPool) { + this.processThreadPool = processThreadPool; return this; } @@ -438,12 +503,14 @@ public Builder copy(ParquetReadOptions options) { withRecordFilter(options.recordFilter); withUseHadoopVectoredIo(options.useHadoopVectoredIo); withMetadataFilter(options.metadataFilter); + enableAsyncIOReader(options.enableAsyncIOReader); + enableParallelColumnReader(options.enableParallelColumnReader); withCodecFactory(options.codecFactory); withAllocator(options.allocator); withPageChecksumVerification(options.usePageChecksumVerification); withDecryption(options.fileDecryptionProperties); - withMetricsCallback(options.metricsCallback); - conf = options.conf; + withIOThreadPool(options.ioThreadPool); + withProcessThreadPool(options.processThreadPool); for (Map.Entry keyValue : options.properties.entrySet()) { set(keyValue.getKey(), keyValue.getValue()); } @@ -469,6 +536,8 @@ public ParquetReadOptions build() { useBloomFilter, useOffHeapDecryptBuffer, useHadoopVectoredIo, + enableAsyncIOReader, + enableParallelColumnReader, recordFilter, metadataFilter, codecFactory, @@ -476,6 +545,8 @@ public ParquetReadOptions build() { maxAllocationSize, properties, fileDecryptionProperties, + ioThreadPool, + processThreadPool, metricsCallback, conf); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileDecryptor.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileDecryptor.java index 993eca02ea..40bfed23f9 100755 --- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileDecryptor.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileDecryptor.java @@ -60,10 +60,7 @@ public InternalFileDecryptor(FileDecryptionProperties fileDecryptionProperties) private BlockCipher.Decryptor getThriftModuleDecryptor(byte[] columnKey) { if (null == columnKey) { // Decryptor with footer key - if (null == aesGcmDecryptorWithFooterKey) { - aesGcmDecryptorWithFooterKey = ModuleCipherFactory.getDecryptor(AesMode.GCM, footerKey); - } - return aesGcmDecryptorWithFooterKey; + return ModuleCipherFactory.getDecryptor(AesMode.GCM, footerKey); } else { // Decryptor with column key return ModuleCipherFactory.getDecryptor(AesMode.GCM, columnKey); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java index c7fc22b29f..65d9e92609 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java @@ -19,14 +19,13 @@ package org.apache.parquet.hadoop; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.ByteBuffer; -import java.util.ArrayDeque; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.PrimitiveIterator; -import java.util.Queue; +import java.util.concurrent.LinkedBlockingDeque; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.ByteBufferReleaser; @@ -54,7 +53,7 @@ * The name is kind of confusing since it references three different "entities" * in our format: columns, chunks, and pages */ -class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore { +public class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore { private static final Logger LOG = LoggerFactory.getLogger(ColumnChunkPageReadStore.class); /** @@ -64,11 +63,13 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore * This implementation is provided with a list of pages, each of which * is decompressed and passed through. */ - static final class ColumnChunkPageReader implements PageReader { + public static final class ColumnChunkPageReader implements PageReader { + + private static final Logger LOG = LoggerFactory.getLogger(ColumnChunkPageReader.class); private final BytesInputDecompressor decompressor; private final long valueCount; - private final Queue compressedPages; + private final LinkedBlockingDeque> compressedPages; private final DictionaryPage compressedDictionaryPage; // null means no page synchronization is required; firstRowIndex will not be returned by the pages private final OffsetIndex offsetIndex; @@ -80,31 +81,33 @@ static final class ColumnChunkPageReader implements PageReader { private final byte[] dataPageAAD; private final byte[] dictionaryPageAAD; private final ByteBufferReleaser releaser; + private final FilePageReader filePageReader; ColumnChunkPageReader( BytesInputDecompressor decompressor, - List compressedPages, + LinkedBlockingDeque> compressedPages, DictionaryPage compressedDictionaryPage, OffsetIndex offsetIndex, + long valueCount, long rowCount, BlockCipher.Decryptor blockDecryptor, byte[] fileAAD, int rowGroupOrdinal, int columnOrdinal, + FilePageReader filePageReader, ParquetReadOptions options) { this.decompressor = decompressor; - this.compressedPages = new ArrayDeque(compressedPages); + this.compressedPages = compressedPages; this.compressedDictionaryPage = compressedDictionaryPage; - long count = 0; - for (DataPage p : compressedPages) { - count += p.getValueCount(); - } - this.valueCount = count; + this.valueCount = valueCount; this.offsetIndex = offsetIndex; this.rowCount = rowCount; this.options = options; this.releaser = new ByteBufferReleaser(options.getAllocator()); this.blockDecryptor = blockDecryptor; + + this.filePageReader = filePageReader; + if (null != blockDecryptor) { dataPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPage, rowGroupOrdinal, columnOrdinal, 0); @@ -116,6 +119,11 @@ static final class ColumnChunkPageReader implements PageReader { } } + @Override + public void close() throws IOException { + this.filePageReader.close(); + } + private int getPageOrdinal(int currentPageIndex) { if (null == offsetIndex) { return currentPageIndex; @@ -131,7 +139,12 @@ public long getTotalValueCount() { @Override public DataPage readPage() { - final DataPage compressedPage = compressedPages.poll(); + final DataPage compressedPage; + try { + compressedPage = compressedPages.take().orElse(null); + } catch (InterruptedException e) { + throw new RuntimeException("Error reading parquet page data.", e); + } if (compressedPage == null) { return null; } @@ -412,6 +425,11 @@ void setReleaser(ByteBufferReleaser releaser) { public void close() { for (ColumnChunkPageReader reader : readers.values()) { reader.releaseBuffers(); + try { + reader.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } releaser.close(); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/FilePageReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/FilePageReader.java new file mode 100644 index 0000000000..4cae8f6bba --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/FilePageReader.java @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; +import java.util.concurrent.atomic.LongAdder; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.page.DataPage; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.page.DataPageV2; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor; +import org.apache.parquet.crypto.AesCipher; +import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType; +import org.apache.parquet.format.BlockCipher; +import org.apache.parquet.format.BlockCipher.Decryptor; +import org.apache.parquet.format.DataPageHeader; +import org.apache.parquet.format.DataPageHeaderV2; +import org.apache.parquet.format.DictionaryPageHeader; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.hadoop.ParquetFileReader.Chunk; +import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.schema.PrimitiveType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Encapsulates the reading of a single page. + */ +public class FilePageReader implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(FilePageReader.class); + + private final ParquetFileReader parquetFileReader; + private final Chunk chunk; + private final int currentBlock; + private final BlockCipher.Decryptor headerBlockDecryptor; + private final BlockCipher.Decryptor pageBlockDecryptor; + private final byte[] aadPrefix; + private final int rowGroupOrdinal; + private final int columnOrdinal; + + // state + private final LinkedBlockingDeque> pagesInChunk = new LinkedBlockingDeque<>(); + private DictionaryPage dictionaryPage = null; + private int pageIndex = 0; + private long valuesCountReadSoFar = 0; + private int dataPageCountReadSoFar = 0; + + // derived + private final PrimitiveType type; + private final byte[] dataPageAAD; + private byte[] dataPageHeaderAAD = null; + + private final BytesInputDecompressor decompressor; + + private final ConcurrentLinkedQueue> readFutures = new ConcurrentLinkedQueue<>(); + + private final LongAdder totalTimeReadOnePage = new LongAdder(); + private final LongAdder totalCountReadOnePage = new LongAdder(); + private final LongAccumulator maxTimeReadOnePage = new LongAccumulator(Long::max, 0L); + private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder(); + private final LongAdder totalCountBlockedPagesInChunk = new LongAdder(); + private final LongAccumulator maxTimeBlockedPagesInChunk = new LongAccumulator(Long::max, 0L); + + public FilePageReader( + ParquetFileReader parquetFileReader, + Chunk chunk, + int currentBlock, + Decryptor headerBlockDecryptor, + Decryptor pageBlockDecryptor, + byte[] aadPrefix, + int rowGroupOrdinal, + int columnOrdinal, + BytesInputDecompressor decompressor) { + this.parquetFileReader = parquetFileReader; + this.chunk = chunk; + this.currentBlock = currentBlock; + this.headerBlockDecryptor = headerBlockDecryptor; + this.pageBlockDecryptor = pageBlockDecryptor; + this.aadPrefix = aadPrefix; + this.rowGroupOrdinal = rowGroupOrdinal; + this.columnOrdinal = columnOrdinal; + this.decompressor = decompressor; + + this.type = parquetFileReader + .getFileMetaData() + .getSchema() + .getType(chunk.getDescriptor().getCol().getPath()) + .asPrimitiveType(); + + if (null != headerBlockDecryptor) { + dataPageHeaderAAD = AesCipher.createModuleAAD( + aadPrefix, + ModuleType.DataPageHeader, + rowGroupOrdinal, + columnOrdinal, + chunk.getPageOrdinal(dataPageCountReadSoFar)); + } + if (null != pageBlockDecryptor) { + dataPageAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPage, rowGroupOrdinal, columnOrdinal, 0); + } else { + dataPageAAD = null; + } + } + + public DictionaryPage getDictionaryPage() { + return this.dictionaryPage; + } + + public LinkedBlockingDeque> getPagesInChunk() { + return this.pagesInChunk; + } + + void readAllRemainingPagesAsync() { + readFutures.offer(parquetFileReader.options.getProcessThreadPool().submit(new FilePageReaderTask(this))); + } + + void readAllRemainingPages() throws IOException { + while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) { + readOnePage(); + } + if (chunk.offsetIndex == null + && valuesCountReadSoFar != chunk.getDescriptor().getMetadata().getValueCount()) { + // Would be nice to have a CorruptParquetFileException or something as a subclass? + throw new IOException( + "Expected " + chunk.getDescriptor().getMetadata().getValueCount() + + " values in column chunk at " + parquetFileReader.getPath() + + " offset " + + chunk.descriptor.getMetadata().getFirstDataPageOffset() + " but got " + + valuesCountReadSoFar + " values instead over " + pagesInChunk.size() + + " pages ending at file offset " + + (chunk.getDescriptor().getFileOffset() + chunk.stream.position())); + } + try { + pagesInChunk.put(Optional.empty()); // add a marker for end of data + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while reading page data", e); + } + } + + void readOnePage() throws IOException { + long startRead = System.nanoTime(); + try { + byte[] pageHeaderAAD = dataPageHeaderAAD; + if (null != headerBlockDecryptor) { + // Important: this verifies file integrity (makes sure dictionary page had not been removed) + if (null == dictionaryPage + && chunk.getDescriptor().getMetadata().hasDictionaryPage()) { + pageHeaderAAD = AesCipher.createModuleAAD( + aadPrefix, ModuleType.DictionaryPageHeader, rowGroupOrdinal, columnOrdinal, -1); + } else { + int pageOrdinal = chunk.getPageOrdinal(dataPageCountReadSoFar); + AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal); + } + } + PageHeader pageHeader = chunk.readPageHeader(headerBlockDecryptor, pageHeaderAAD); + int uncompressedPageSize = pageHeader.getUncompressed_page_size(); + int compressedPageSize = pageHeader.getCompressed_page_size(); + final BytesInput pageBytes; + switch (pageHeader.type) { + case DICTIONARY_PAGE: + // there is only one dictionary page per column chunk + if (dictionaryPage != null) { + throw new ParquetDecodingException("more than one dictionary page in column " + + chunk.getDescriptor().getCol()); + } + pageBytes = chunk.readAsBytesInput(compressedPageSize); + if (parquetFileReader.options.usePageChecksumVerification() && pageHeader.isSetCrc()) { + chunk.verifyCrc( + pageHeader.getCrc(), + pageBytes, + "could not verify dictionary page integrity, CRC checksum verification failed"); + } + DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header(); + DictionaryPage compressedDictionaryPage = new DictionaryPage( + pageBytes, + uncompressedPageSize, + dicHeader.getNum_values(), + parquetFileReader.converter.getEncoding(dicHeader.getEncoding())); + // Copy crc to new page, used for testing + if (pageHeader.isSetCrc()) { + compressedDictionaryPage.setCrc(pageHeader.getCrc()); + } + dictionaryPage = compressedDictionaryPage; + break; + case DATA_PAGE: + DataPageHeader dataHeaderV1 = pageHeader.getData_page_header(); + pageBytes = chunk.readAsBytesInput(compressedPageSize); + if (null != pageBlockDecryptor) { + AesCipher.quickUpdatePageAAD(dataPageAAD, chunk.getPageOrdinal(pageIndex)); + } + if (parquetFileReader.options.usePageChecksumVerification() && pageHeader.isSetCrc()) { + chunk.verifyCrc( + pageHeader.getCrc(), + pageBytes, + "could not verify page integrity, CRC checksum verification failed"); + } + DataPageV1 dataPageV1 = new DataPageV1( + pageBytes, + dataHeaderV1.getNum_values(), + uncompressedPageSize, + parquetFileReader.converter.fromParquetStatistics( + parquetFileReader.getFileMetaData().getCreatedBy(), + dataHeaderV1.getStatistics(), + type), + parquetFileReader.converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()), + parquetFileReader.converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()), + parquetFileReader.converter.getEncoding(dataHeaderV1.getEncoding())); + // Copy crc to new page, used for testing + if (pageHeader.isSetCrc()) { + dataPageV1.setCrc(pageHeader.getCrc()); + } + writePageToChunk(dataPageV1); + valuesCountReadSoFar += dataHeaderV1.getNum_values(); + ++dataPageCountReadSoFar; + pageIndex++; + break; + case DATA_PAGE_V2: + DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2(); + int dataSize = compressedPageSize + - dataHeaderV2.getRepetition_levels_byte_length() + - dataHeaderV2.getDefinition_levels_byte_length(); + if (null != pageBlockDecryptor) { + AesCipher.quickUpdatePageAAD(dataPageAAD, chunk.getPageOrdinal(pageIndex)); + } + final BytesInput repetitionLevels = + chunk.readAsBytesInput(dataHeaderV2.getRepetition_levels_byte_length()); + final BytesInput definitionLevels = + chunk.readAsBytesInput(dataHeaderV2.getDefinition_levels_byte_length()); + final BytesInput values = chunk.readAsBytesInput(dataSize); + if (parquetFileReader.options.usePageChecksumVerification() && pageHeader.isSetCrc()) { + pageBytes = BytesInput.concat(repetitionLevels, definitionLevels, values); + chunk.verifyCrc( + pageHeader.getCrc(), + pageBytes, + "could not verify page integrity, CRC checksum verification failed"); + } + DataPage dataPageV2 = new DataPageV2( + dataHeaderV2.getNum_rows(), + dataHeaderV2.getNum_nulls(), + dataHeaderV2.getNum_values(), + repetitionLevels, + definitionLevels, + parquetFileReader.converter.getEncoding(dataHeaderV2.getEncoding()), + values, + uncompressedPageSize, + parquetFileReader.converter.fromParquetStatistics( + parquetFileReader.getFileMetaData().getCreatedBy(), + dataHeaderV2.getStatistics(), + type), + dataHeaderV2.isIs_compressed()); + // Copy crc to new page, used for testing + if (pageHeader.isSetCrc()) { + dataPageV2.setCrc(pageHeader.getCrc()); + } + writePageToChunk(dataPageV2); + valuesCountReadSoFar += dataHeaderV2.getNum_values(); + ++dataPageCountReadSoFar; + pageIndex++; + break; + default: + LOG.debug("skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize); + chunk.stream.skipFully(compressedPageSize); + break; + } + } catch (Exception e) { + LOG.info( + "Exception while reading one more page for: [{} ]: {} ", + Thread.currentThread().getName(), + this); + throw e; + } finally { + long timeSpentInRead = System.nanoTime() - startRead; + totalCountReadOnePage.add(1); + totalTimeReadOnePage.add(timeSpentInRead); + maxTimeReadOnePage.accumulate(timeSpentInRead); + } + } + + private void writePageToChunk(DataPage page) { + long writeStart = System.nanoTime(); + try { + pagesInChunk.put(Optional.of(page)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while reading page data", e); + } + long timeSpent = System.nanoTime() - writeStart; + totalTimeBlockedPagesInChunk.add(timeSpent); + totalCountBlockedPagesInChunk.add(1); + maxTimeBlockedPagesInChunk.accumulate(timeSpent); + } + + private boolean hasMorePages(long valuesCountReadSoFar, int dataPageCountReadSoFar) { + return chunk.offsetIndex == null + ? valuesCountReadSoFar < chunk.getDescriptor().getMetadata().getValueCount() + : dataPageCountReadSoFar < chunk.offsetIndex.getPageCount(); + } + + @Override + public void close() throws IOException { + Future readResult; + while (!readFutures.isEmpty()) { + try { + readResult = readFutures.poll(); + readResult.get(); + if (!readResult.isDone() && !readResult.isCancelled()) { + readResult.cancel(true); + } else { + readResult.get(1, TimeUnit.MILLISECONDS); + } + } catch (Exception e) { + // Do nothing + } + } + if (LOG.isDebugEnabled()) { + String mode = parquetFileReader.isParallelColumnReaderEnabled() ? "ASYNC" : "SYNC"; + LOG.debug( + "File Page Reader stats: {}, {}, {}, {}, {}, {}, {}", + mode, + totalTimeReadOnePage.longValue() / 1000.0, + totalCountReadOnePage.longValue(), + maxTimeReadOnePage.longValue() / 1000.0, + totalTimeBlockedPagesInChunk.longValue() / 1000.0, + totalCountBlockedPagesInChunk.longValue(), + maxTimeBlockedPagesInChunk.longValue() / 1000.0); + } + } + + @Override + public String toString() { + return "PageReader{" + "chunk=" + + chunk + ", currentBlock=" + + currentBlock + ", headerBlockDecryptor=" + + headerBlockDecryptor + ", pageBlockDecryptor=" + + pageBlockDecryptor + ", aadPrefix=" + + Arrays.toString(aadPrefix) + ", rowGroupOrdinal=" + + rowGroupOrdinal + ", columnOrdinal=" + + columnOrdinal + ", pagesInChunk=" + + pagesInChunk + ", dictionaryPage=" + + dictionaryPage + ", pageIndex=" + + pageIndex + ", valuesCountReadSoFar=" + + valuesCountReadSoFar + ", dataPageCountReadSoFar=" + + dataPageCountReadSoFar + ", type=" + + type + ", dataPageAAD=" + + Arrays.toString(dataPageAAD) + ", dataPageHeaderAAD=" + + Arrays.toString(dataPageHeaderAAD) + ", decompressor=" + + decompressor + '}'; + } + + private static class FilePageReaderTask implements Callable { + + final FilePageReader filePageReader; + + FilePageReaderTask(FilePageReader filePageReader) { + this.filePageReader = filePageReader; + } + + @Override + public Void call() throws Exception { + filePageReader.readAllRemainingPages(); + return null; + } + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 1d8cce3d8c..d3c309d8b8 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -48,12 +48,14 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -69,10 +71,9 @@ import org.apache.parquet.bytes.ByteBufferReleaser; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.ReusingByteBufferAllocator; +import org.apache.parquet.bytes.SequenceByteBufferInputStream; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.DataPage; -import org.apache.parquet.column.page.DataPageV1; -import org.apache.parquet.column.page.DataPageV2; import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.page.DictionaryPageReadStore; import org.apache.parquet.column.page.PageReadStore; @@ -89,8 +90,6 @@ import org.apache.parquet.filter2.compat.RowGroupFilter; import org.apache.parquet.format.BlockCipher; import org.apache.parquet.format.BloomFilterHeader; -import org.apache.parquet.format.DataPageHeader; -import org.apache.parquet.format.DataPageHeaderV2; import org.apache.parquet.format.DictionaryPageHeader; import org.apache.parquet.format.FileCryptoMetaData; import org.apache.parquet.format.PageHeader; @@ -118,7 +117,6 @@ import org.apache.parquet.io.ParquetFileRange; import org.apache.parquet.io.SeekableInputStream; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.util.AutoCloseables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -134,7 +132,7 @@ public class ParquetFileReader implements Closeable { public static final long HADOOP_VECTORED_READ_TIMEOUT_SECONDS = 300; - private final ParquetMetadataConverter converter; + final ParquetMetadataConverter converter; private final CRC32 crc; private final ReusingByteBufferAllocator crcAllocator; @@ -144,7 +142,7 @@ public class ParquetFileReader implements Closeable { * If a summary file is found it is used otherwise the file footer is used. * * @param configuration the hadoop conf to connect to the file system; - * @param partFiles the part files to read + * @param partFiles the part files to read * @return the footers for those files using the summary file if possible. * @throws IOException if there is an exception while reading footers * @deprecated metadata files are not recommended and will be removed in 2.0.0 @@ -164,7 +162,7 @@ private static MetadataFilter filter(boolean skipRowGroups) { * If a summary file is found it is used otherwise the file footer is used. * * @param configuration the hadoop conf to connect to the file system; - * @param partFiles the part files to read + * @param partFiles the part files to read * @param skipRowGroups to skipRowGroups in the footers * @return the footers for those files using the summary file if possible. * @throws IOException if there is an exception while reading footers @@ -265,7 +263,7 @@ private static List runAllInParallel(int parallelism, List> t /** * @param configuration the conf to access the File System - * @param partFiles the files to read + * @param partFiles the files to read * @return the footers * @throws IOException if an exception was raised while reading footers * @deprecated metadata files are not recommended and will be removed in 2.0.0 @@ -281,12 +279,12 @@ public static List