Skip to content

Commit

Permalink
Merge pull request apache#258 from Parquet/optimize_scan
Browse files Browse the repository at this point in the history
Optimize scan
  • Loading branch information
julienledem committed Dec 21, 2013
2 parents e91cda9 + 3c91e46 commit cc8375c
Showing 1 changed file with 232 additions and 78 deletions.
310 changes: 232 additions & 78 deletions parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -297,17 +297,16 @@ public static final ParquetMetadata readFooter(Configuration configuration, File
f.close();
}
}
private CodecFactory codecFactory;

private final CodecFactory codecFactory;
private final List<BlockMetaData> blocks;
private final FSDataInputStream f;
private final Path filePath;
private int currentBlock = 0;
private Map<ColumnPath, ColumnDescriptor> paths = new HashMap<ColumnPath, ColumnDescriptor>();
private final Map<ColumnPath, ColumnDescriptor> paths = new HashMap<ColumnPath, ColumnDescriptor>();

/**
*
* @param f the Parquet file
* @param f the Parquet file (will be opened for read in this constructor)
* @param blocks the blocks to read
* @param colums the columns to read (their path)
* @param codecClassName the codec used to compress the blocks
Expand Down Expand Up @@ -338,41 +337,170 @@ public PageReadStore readNextRowGroup() throws IOException {
throw new RuntimeException("Illegal row group of 0 rows");
}
ColumnChunkPageReadStore columnChunkPageReadStore = new ColumnChunkPageReadStore(block.getRowCount());
// prepare the list of consecutive chunks to read them in one scan
List<ConsecutiveChunkList> allChunks = new ArrayList<ConsecutiveChunkList>();
ConsecutiveChunkList currentChunks = null;
for (ColumnChunkMetaData mc : block.getColumns()) {
ColumnPath pathKey = mc.getPath();
BenchmarkCounter.incrementTotalBytes(mc.getTotalSize());
ColumnDescriptor columnDescriptor = paths.get(pathKey);
if (columnDescriptor != null) {
BenchmarkCounter.incrementBytesRead(mc.getTotalSize());
List<Page> pagesInChunk = new ArrayList<Page>();
List<DictionaryPage> dictionaryPagesInChunk = new ArrayList<DictionaryPage>();
readColumnChunkPages(columnDescriptor, mc, pagesInChunk, dictionaryPagesInChunk);
if (dictionaryPagesInChunk.size() > 1) {
throw new ParquetDecodingException("more than one dictionary page: " + dictionaryPagesInChunk);
long startingPos = getStartingPos(mc);
// first chunk or not consecutive => new list
if (currentChunks == null || currentChunks.endPos() != startingPos) {
currentChunks = new ConsecutiveChunkList(startingPos);
allChunks.add(currentChunks);
}
BytesDecompressor decompressor = codecFactory.getDecompressor(mc.getCodec());
ColumnChunkPageReader columnChunkPageReader = new ColumnChunkPageReader(decompressor, pagesInChunk, dictionaryPagesInChunk.size() == 0 ? null : dictionaryPagesInChunk.get(0));
columnChunkPageReadStore.addColumn(columnDescriptor, columnChunkPageReader);
currentChunks.addChunk(new ChunkDescriptor(columnDescriptor, mc, startingPos, (int)mc.getTotalSize()));
}
}
// actually read all the chunks
for (ConsecutiveChunkList consecutiveChunks : allChunks) {
final List<Chunk> chunks = consecutiveChunks.readAll(f);
for (Chunk chunk : chunks) {
columnChunkPageReadStore.addColumn(chunk.descriptor.col, chunk.readAllPages());
}
}
++currentBlock;
return columnChunkPageReadStore;
}

private static class Chunk extends ByteArrayInputStream{
/**
* @param mc the metadata for that chunk
* @return the offset of the first byte in the chunk
*/
private long getStartingPos(ColumnChunkMetaData mc) {
long startingPos = mc.getFirstDataPageOffset();
if (mc.getDictionaryPageOffset() > 0 && mc.getDictionaryPageOffset() < startingPos) {
// if there's a dictionary and it's before the first data page, start from there
startingPos = mc.getDictionaryPageOffset();
}
return startingPos;
}

private final FSDataInputStream f;
@Override
public void close() throws IOException {
f.close();
this.codecFactory.release();
}

private Chunk(FSDataInputStream f, int size) throws IOException {
super(new byte[size]);
this.f = f;
f.readFully(this.buf);
/**
* The data for a column chunk
*
* @author Julien Le Dem
*
*/
private class Chunk extends ByteArrayInputStream {

private final ChunkDescriptor descriptor;

/**
*
* @param descriptor descriptor for the chunk
* @param data contains the chunk data at offset
* @param offset where the chunk starts in offset
*/
public Chunk(ChunkDescriptor descriptor, byte[] data, int offset) {
super(data);
this.descriptor = descriptor;
this.pos = offset;
}

/**
* Read all of the pages in a given column chunk.
* @return the list of pages
*/
public ColumnChunkPageReader readAllPages() throws IOException {
List<Page> pagesInChunk = new ArrayList<Page>();
DictionaryPage dictionaryPage = null;
long valuesCountReadSoFar = 0;
while (valuesCountReadSoFar < descriptor.metadata.getValueCount()) {
PageHeader pageHeader = readPageHeader(this);
switch (pageHeader.type) {
case DICTIONARY_PAGE:
// there is only one dictionary page per column chunk
if (dictionaryPage != null) {
throw new ParquetDecodingException("more than one dictionary page in column " + descriptor.col);
}
dictionaryPage =
new DictionaryPage(
this.readAsBytesInput(pageHeader.compressed_page_size),
pageHeader.uncompressed_page_size,
pageHeader.dictionary_page_header.num_values,
parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding)
);
break;
case DATA_PAGE:
pagesInChunk.add(
new Page(
this.readAsBytesInput(pageHeader.compressed_page_size),
pageHeader.data_page_header.num_values,
pageHeader.uncompressed_page_size,
parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
));
valuesCountReadSoFar += pageHeader.data_page_header.num_values;
break;
default:
if (DEBUG) LOG.debug("skipping page of type " + pageHeader.type + " of size " + pageHeader.compressed_page_size);
this.skip(pageHeader.compressed_page_size);
break;
}
}
if (valuesCountReadSoFar != descriptor.metadata.getValueCount()) {
// Would be nice to have a CorruptParquetFileException or something as a subclass?
throw new IOException(
"Expected " + descriptor.metadata.getValueCount() + " values in column chunk at " +
filePath + " offset " + descriptor.metadata.getFirstDataPageOffset() +
" but got " + valuesCountReadSoFar + " values instead over " + pagesInChunk.size()
+ " pages ending at file offset " + (descriptor.fileOffset + pos()));
}
BytesDecompressor decompressor = codecFactory.getDecompressor(descriptor.metadata.getCodec());
return new ColumnChunkPageReader(decompressor, pagesInChunk, dictionaryPage);
}

/**
* @return the current position in the chunk
*/
public int pos() {
return this.pos;
}

/**
* @param size the size of the page
* @return the page
* @throws IOException
*/
public BytesInput readAsBytesInput(int size) throws IOException {
final BytesInput r = BytesInput.from(this.buf, this.pos, size);
this.pos += size;
return r;
}

}

/**
* deals with a now fixed bug where compressedLength was missing a few bytes.
*
* @author Julien Le Dem
*
*/
private class WorkaroundChunk extends Chunk {

private final FSDataInputStream f;

/**
* @param descriptor the descriptor of the chunk
* @param data contains the data of the chunk at offset
* @param offset where the chunk starts in data
* @param f the file stream positioned at the end of this chunk
*/
private WorkaroundChunk(ChunkDescriptor descriptor, byte[] data, int offset, FSDataInputStream f) {
super(descriptor, data, offset);
this.f = f;
}

public BytesInput readAsBytesInput(int size) throws IOException {
if (pos + size > count) {
// this is to workaround a bug where the compressedLength
Expand All @@ -382,78 +510,104 @@ public BytesInput readAsBytesInput(int size) throws IOException {
int l1 = count - pos;
int l2 = size - l1;
LOG.info("completed the column chunk with " + l2 + " bytes");
return BytesInput.concat(this.readAsBytesInput(l1), BytesInput.copy(BytesInput.from(f, l2)));
return BytesInput.concat(super.readAsBytesInput(l1), BytesInput.copy(BytesInput.from(f, l2)));
}
final BytesInput r = BytesInput.from(this.buf, this.pos, size);
this.pos += size;
return r;
return super.readAsBytesInput(size);
}

}


/**
* Read all of the pages in a given column chunk.
* @return the list of pages
* information needed to read a column chunk
*/
private void readColumnChunkPages(ColumnDescriptor columnDescriptor, ColumnChunkMetaData metadata, List<Page> pagesInChunk, List<DictionaryPage> dictionaryPagesInChunk)
throws IOException {
long startingPos = metadata.getFirstDataPageOffset();
if (metadata.getDictionaryPageOffset() > 0 && metadata.getDictionaryPageOffset() < startingPos) {
// if there's a dictionary and it's before the first data page, start from there
startingPos = metadata.getDictionaryPageOffset();
private static class ChunkDescriptor {

private final ColumnDescriptor col;
private final ColumnChunkMetaData metadata;
private final long fileOffset;
private final int size;

/**
* @param col column this chunk is part of
* @param metadata metadata for the column
* @param fileOffset offset in the file where this chunk starts
* @param size size of the chunk
*/
private ChunkDescriptor(
ColumnDescriptor col,
ColumnChunkMetaData metadata,
long fileOffset,
int size) {
super();
this.col = col;
this.metadata = metadata;
this.fileOffset = fileOffset;
this.size = size;
}
if (f.getPos() != startingPos) {
f.seek(startingPos);
}

/**
* describes a list of consecutive column chunks to be read at once.
*
* @author Julien Le Dem
*/
private class ConsecutiveChunkList {

private final long offset;
private int length;
private final List<ChunkDescriptor> chunks = new ArrayList<ChunkDescriptor>();

/**
* @param offset where the first chunk starts
*/
ConsecutiveChunkList(long offset) {
this.offset = offset;
}
if (DEBUG) {
LOG.debug(f.getPos() + ": start column chunk " + metadata.getPath() +
" " + metadata.getType() + " count=" + metadata.getValueCount());

/**
* adds a chunk to the list.
* It must be consecutive to the previous chunk
* @param descriptor
*/
public void addChunk(ChunkDescriptor descriptor) {
chunks.add(descriptor);
length += descriptor.size;
}
Chunk chunk = new Chunk(f, (int)metadata.getTotalSize());
long valuesCountReadSoFar = 0;
while (valuesCountReadSoFar < metadata.getValueCount()) {
PageHeader pageHeader = readPageHeader(chunk);
switch (pageHeader.type) {
case DICTIONARY_PAGE:
dictionaryPagesInChunk.add(
new DictionaryPage(
chunk.readAsBytesInput(pageHeader.compressed_page_size),
pageHeader.uncompressed_page_size,
pageHeader.dictionary_page_header.num_values,
parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding)
));
break;
case DATA_PAGE:
pagesInChunk.add(
new Page(
chunk.readAsBytesInput(pageHeader.compressed_page_size),
pageHeader.data_page_header.num_values,
pageHeader.uncompressed_page_size,
parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
));
valuesCountReadSoFar += pageHeader.data_page_header.num_values;
break;
default:
if (DEBUG) LOG.debug("skipping page of type " + pageHeader.type + " of size " + pageHeader.compressed_page_size);
chunk.skip(pageHeader.compressed_page_size);
break;

/**
* @param f file to read the chunks from
* @return the chunks
* @throws IOException
*/
public List<Chunk> readAll(FSDataInputStream f) throws IOException {
List<Chunk> result = new ArrayList<Chunk>(chunks.size());
f.seek(offset);
byte[] chunksBytes = new byte[length];
f.readFully(chunksBytes);
// report in a counter the data we just scanned
BenchmarkCounter.incrementBytesRead(length);
int currentChunkOffset = 0;
for (int i = 0; i < chunks.size(); i++) {
ChunkDescriptor descriptor = chunks.get(i);
if (i < chunks.size() - 1) {
result.add(new Chunk(descriptor, chunksBytes, currentChunkOffset));
} else {
// because of a bug, the last chunk might be larger than descriptor.size
result.add(new WorkaroundChunk(descriptor, chunksBytes, currentChunkOffset, f));
}
currentChunkOffset += descriptor.size;
}
return result ;
}
if (valuesCountReadSoFar != metadata.getValueCount()) {
// Would be nice to have a CorruptParquetFileException or something as a subclass?
throw new IOException(
"Expected " + metadata.getValueCount() + " values in column chunk at " +
filePath + " offset " + metadata.getFirstDataPageOffset() +
" but got " + valuesCountReadSoFar + " values instead over " + pagesInChunk.size()
+ " pages ending at file offset " + (startingPos + chunk.pos()));

/**
* @return the position following the last byte of these chunks
*/
public long endPos() {
return offset + length;
}
}

@Override
public void close() throws IOException {
f.close();
this.codecFactory.release();
}

}

0 comments on commit cc8375c

Please sign in to comment.