diff --git a/src/backend/cdb/cdbparquetrowgroup.c b/src/backend/cdb/cdbparquetrowgroup.c index 743815d831..b7f506b9ce 100644 --- a/src/backend/cdb/cdbparquetrowgroup.c +++ b/src/backend/cdb/cdbparquetrowgroup.c @@ -211,6 +211,33 @@ ParquetRowGroupReader_ScanNextTuple( TupleTableSlot *slot) { Assert(slot); + + int natts = slot->tts_tupleDescriptor->natts; + Assert(natts <= tupDesc->natts); + + Datum *values = slot_get_values(slot); + bool *nulls = slot_get_isnull(slot); + + bool useBloomFilter = false; + int joinKeyCount = 0; + int *joinKeySet = NULL; + if (rfState != NULL && rfState->hasRuntimeFilter && !rfState->stopRuntimeFilter) + { + useBloomFilter = true; + + joinKeyCount = list_length(rfState->joinkeys); + Assert(joinKeyCount <= natts); + joinKeySet = palloc(sizeof(int) * joinKeyCount); + + ListCell *hk; + int i = 0; + foreach(hk, rfState->joinkeys) + { + AttrNumber attrno = (AttrNumber) lfirst(hk); + joinKeySet[i++] = attrno -1; + } + } + while (rowGroupReader->rowRead < rowGroupReader->rowCount) { @@ -219,12 +246,9 @@ ParquetRowGroupReader_ScanNextTuple( */ rowGroupReader->rowRead++; - int natts = slot->tts_tupleDescriptor->natts; - Assert(natts <= tupDesc->natts); - - Datum *values = slot_get_values(slot); - bool *nulls = slot_get_isnull(slot); - + /* + * Step 1: fetch those columns as hash join keys + */ int colReaderIndex = 0; for (int i = 0; i < natts; i++) { @@ -233,78 +257,50 @@ ParquetRowGroupReader_ScanNextTuple( nulls[i] = true; continue; } - ParquetColumnReader *nextReader = - &rowGroupReader->columnReaders[colReaderIndex]; - int hawqTypeID = tupDesc->attrs[i]->atttypid; - if (hawqAttrToParquetColNum[i] == 1) + bool isJoinKeyColumn = false; + for (int j = 0; j < joinKeyCount; j++) { - ParquetColumnReader_readValue(nextReader, &values[i], &nulls[i], - hawqTypeID); - } - else - { - /* - * Because there are some memory reused inside the whole column reader, so need - * to switch the context from PerTupleContext to rowgroup->context - */ - MemoryContext oldContext = MemoryContextSwitchTo( - rowGroupReader->memoryContext); - - switch (hawqTypeID) { - case HAWQ_TYPE_POINT: - ParquetColumnReader_readPoint(nextReader, &values[i], &nulls[i]); - break; - case HAWQ_TYPE_PATH: - ParquetColumnReader_readPATH(nextReader, &values[i], &nulls[i]); - break; - case HAWQ_TYPE_LSEG: - ParquetColumnReader_readLSEG(nextReader, &values[i], &nulls[i]); - break; - case HAWQ_TYPE_BOX: - ParquetColumnReader_readBOX(nextReader, &values[i], &nulls[i]); - break; - case HAWQ_TYPE_CIRCLE: - ParquetColumnReader_readCIRCLE(nextReader, &values[i], &nulls[i]); - break; - case HAWQ_TYPE_POLYGON: - ParquetColumnReader_readPOLYGON(nextReader, &values[i], &nulls[i]); - break; - default: - /* TODO array type */ - /* TODO UDT */ - Insist(false); + if (joinKeySet[j] == i) + { + isJoinKeyColumn = true; break; } + } - MemoryContextSwitchTo(oldContext); + if (isJoinKeyColumn) + { + ParquetColumnReader *nextReader = + &rowGroupReader->columnReaders[colReaderIndex]; + int hawqTypeID = tupDesc->attrs[i]->atttypid; + + ParquetRowGroupReader_ScanOneAttribute( + rowGroupReader, hawqAttrToParquetColNum[i], + nextReader, &values[i], &nulls[i], hawqTypeID); } colReaderIndex += hawqAttrToParquetColNum[i]; } - if (rfState != NULL && rfState->hasRuntimeFilter - && !rfState->stopRuntimeFilter) + /* + * Step 2: skip following columns decoding if bloomfilter is mismatched + */ + if (useBloomFilter) { - Assert(rfState->bloomfilter != NULL); uint32_t hashkey = 0; - ListCell *hk; - int i = 0; - foreach(hk, rfState->joinkeys) + for (int i = 0; i < joinKeyCount; i++) { - AttrNumber attrno = (AttrNumber) lfirst(hk); Datum keyval; uint32 hkey; /* rotate hashkey left 1 bit at each step */ hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0); - keyval = values[attrno - 1]; + keyval = values[joinKeySet[i]]; /* Evaluate expression */ hkey = DatumGetUInt32( FunctionCall1(&rfState->hashfunctions[i], keyval)); hashkey ^= hkey; - i++; } if (!FindBloomFilter(rfState->bloomfilter, hashkey)) @@ -313,6 +309,43 @@ ParquetRowGroupReader_ScanNextTuple( } } + /* + * Step 3: fetch those columns not in hash join keys + */ + colReaderIndex = 0; + for (int i = 0; i < natts; i++) + { + // it is not expensive to do twice + if (projs[i] == false) + { + nulls[i] = true; + continue; + } + + bool isJoinKeyColumn = false; + for (int j = 0; j < joinKeyCount; j++) + { + if (joinKeySet[j] == i) + { + isJoinKeyColumn = true; + break; + } + } + + if (!isJoinKeyColumn) + { + ParquetColumnReader *nextReader = + &rowGroupReader->columnReaders[colReaderIndex]; + int hawqTypeID = tupDesc->attrs[i]->atttypid; + + ParquetRowGroupReader_ScanOneAttribute( + rowGroupReader, hawqAttrToParquetColNum[i], + nextReader, &values[i], &nulls[i], hawqTypeID); + } + + colReaderIndex += hawqAttrToParquetColNum[i]; + } + /*construct tuple, and return back*/ TupSetVirtualTupleNValid(slot, natts); return true; @@ -322,6 +355,63 @@ ParquetRowGroupReader_ScanNextTuple( return false; } +/* + * Get one attribute of a tuple from current row group into slot. + * + * Similar to ParquetColumnReader_readValue() but consider more hawq types. + */ +void +ParquetRowGroupReader_ScanOneAttribute( + ParquetRowGroupReader *rowGroupReader, + int colChildNum, // hawqAttrToParquetColNum + ParquetColumnReader *columnReader, + Datum *value, + bool *null, + int hawqTypeID) +{ + if (colChildNum == 1) + { + ParquetColumnReader_readValue(columnReader, value, null, hawqTypeID); + } + else + { + /* + * Because there are some memory reused inside the whole column reader, so need + * to switch the context from PerTupleContext to rowgroup->context + */ + MemoryContext oldContext = MemoryContextSwitchTo( + rowGroupReader->memoryContext); + + switch (hawqTypeID) { + case HAWQ_TYPE_POINT: + ParquetColumnReader_readPoint(columnReader, value, null); + break; + case HAWQ_TYPE_PATH: + ParquetColumnReader_readPATH(columnReader, value, null); + break; + case HAWQ_TYPE_LSEG: + ParquetColumnReader_readLSEG(columnReader, value, null); + break; + case HAWQ_TYPE_BOX: + ParquetColumnReader_readBOX(columnReader, value, null); + break; + case HAWQ_TYPE_CIRCLE: + ParquetColumnReader_readCIRCLE(columnReader, value, null); + break; + case HAWQ_TYPE_POLYGON: + ParquetColumnReader_readPOLYGON(columnReader, value, null); + break; + default: + /* TODO array type */ + /* TODO UDT */ + Insist(false); + break; + } + + MemoryContextSwitchTo(oldContext); + } +} + /** * finish scanning row group, but keeping the structure palloced */ diff --git a/src/include/cdb/cdbparquetrowgroup.h b/src/include/cdb/cdbparquetrowgroup.h index 4f5ab7a4b7..f245880a57 100644 --- a/src/include/cdb/cdbparquetrowgroup.h +++ b/src/include/cdb/cdbparquetrowgroup.h @@ -75,6 +75,16 @@ ParquetRowGroupReader_ScanNextTuple( RuntimeFilterState *rfState, TupleTableSlot *slot); +/* Get one attribute of a tuple from current row group*/ +void +ParquetRowGroupReader_ScanOneAttribute( + ParquetRowGroupReader *rowGroupReader, + int colChildNum, + ParquetColumnReader *columnReader, + Datum *value, + bool *null, + int hawqTypeID); + /* Finish scanning current row group*/ void ParquetRowGroupReader_FinishedScanRowGroup(