-
Notifications
You must be signed in to change notification settings - Fork 80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: use parquet library for reading metadata #6189
base: main
Are you sure you want to change the base?
Conversation
lastKey.getFileReader().getSchema(), | ||
lastKey.getMetadata().getFileMetaData().getKeyValueMetaData(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is one of the logic changes; previously, we were getting the schema from the file instead of from the metadata file.
tableLocationKey.getFileReader().getSchema(), | ||
tableLocationKey.getMetadata().getFileMetaData().getKeyValueMetaData(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
.sorted(Comparator.comparingInt(RowGroup::getOrdinal)) | ||
.toArray(RowGroup[]::new); | ||
final long maxRowCount = Arrays.stream(rowGroups).mapToLong(RowGroup::getNum_rows).max().orElse(0L); | ||
.mapToObj(rgi -> parquetMetadata.getBlocks().get(rgi)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also a change; previously the block metadata was coming from the file instead of the metadata file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm concerned about this. I think this is a merged list of blocks, so we might be including all the row groups from all the files, once per file.
regionParameters = new RegionedPageStore.Parameters( | ||
RegionedColumnSource.ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK, rowGroupCount, maxRowCount); | ||
|
||
parquetColumnNameToPath = new HashMap<>(); | ||
for (final ColumnDescriptor column : parquetFileReader.getSchema().getColumns()) { | ||
for (final ColumnDescriptor column : parquetMetadata.getFileMetaData().getSchema().getColumns()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto
return rowGroupReaders = IntStream.of(rowGroupIndices) | ||
.mapToObj(idx -> parquetFileReader.getRowGroup(idx, version)) | ||
.mapToObj(idx -> parquetFileReader.getRowGroup(parquetMetadata, idx, version)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto; this is a place where parquetFileReader was implicitly sourcing its own metadata.
@@ -154,7 +148,7 @@ public synchronized int[] getRowGroupIndices() { | |||
if (rowGroupIndices != null) { | |||
return rowGroupIndices; | |||
} | |||
final List<RowGroup> rowGroups = getFileReader().fileMetaData.getRow_groups(); | |||
final List<BlockMetaData> rowGroups = getFileReader().getMetadata().getBlocks(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: should this actually be calling getMetadata() instead? The javadoc does specifically call out file reader...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial review
final SeekableByteChannel ch = | ||
channelsProvider.getReadChannel(holder.get(), getURI()).position(dictionaryPageOffset)) { | ||
final SeekableByteChannel ch = channelsProvider.getReadChannel(holder.get(), getURI())) { | ||
ch.position(columnChunk.getStartingPos()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has a new check, that the dictionary offset is before the first data page offset. Do we think that's just about rejecting malformed metadata that claims a dictionary page that cannot exist?
@@ -284,6 +294,7 @@ public static Pair<List<ColumnDefinition<?>>, ParquetInstructions> convertSchema | |||
@NotNull final MessageType schema, | |||
@NotNull final Map<String, String> keyValueMetadata, | |||
@NotNull final ParquetInstructions readInstructionsIn) { | |||
// TODO: what is this warning about? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Marker
.sorted(Comparator.comparingInt(RowGroup::getOrdinal)) | ||
.toArray(RowGroup[]::new); | ||
final long maxRowCount = Arrays.stream(rowGroups).mapToLong(RowGroup::getNum_rows).max().orElse(0L); | ||
.mapToObj(rgi -> parquetMetadata.getBlocks().get(rgi)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm concerned about this. I think this is a merged list of blocks, so we might be including all the row groups from all the files, once per file.
No description provided.