From bf55f86863163b19492841d43147955f1d8660a8 Mon Sep 17 00:00:00 2001 From: mkaruza Date: Mon, 13 May 2024 09:09:26 +0200 Subject: [PATCH] Interrupt handling When custom scan node start we are increasing `QueryCancelHoldoffCount` to block canceling query. In fetch page loop we are monitoring `QueryCancelPending` signal for thread to finish execution. `QueryCancelHoldoffCount` counter will be decreased when node exits. --- src/quack_heap_seq_scan.cpp | 13 ++++++++++--- src/quack_node.cpp | 3 ++- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/quack_heap_seq_scan.cpp b/src/quack_heap_seq_scan.cpp index 0c7cf37a..e6fdab59 100644 --- a/src/quack_heap_seq_scan.cpp +++ b/src/quack_heap_seq_scan.cpp @@ -69,7 +69,7 @@ PostgresHeapSeqScan::PreparePageRead(PostgresHeapSeqScanThreadInfo &threadScanIn void PostgresHeapSeqScan::InitParallelScanState(duckdb::TableFunctionInitInput &input) { - (void) GetRelation(); + (void)GetRelation(); m_parallel_scan_state.m_nblocks = RelationGetNumberOfBlocks(m_rel); /* SELECT COUNT(*) FROM */ @@ -111,7 +111,9 @@ PostgresHeapSeqScan::ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqSc threadScanInfo.m_read_next_page = true; } else { block = threadScanInfo.m_block_number; - page = BufferGetPage(threadScanInfo.m_buffer); + if (block != InvalidBlockNumber) { + page = BufferGetPage(threadScanInfo.m_buffer); + } } while (block != InvalidBlockNumber) { @@ -159,7 +161,12 @@ PostgresHeapSeqScan::ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqSc UnlockReleaseBuffer(threadScanInfo.m_buffer); m_parallel_scan_state.m_lock.unlock(); threadScanInfo.m_read_next_page = true; - block = threadScanInfo.m_block_number = m_parallel_scan_state.AssignNextBlockNumber(); + /* Handle cancel request */ + if (QueryCancelPending) { + block = threadScanInfo.m_block_number = InvalidBlockNumber; + } else { + block = threadScanInfo.m_block_number = m_parallel_scan_state.AssignNextBlockNumber(); + } } /* We have collected STANDARD_VECTOR_SIZE */ diff --git a/src/quack_node.cpp b/src/quack_node.cpp index ccb32e16..76f8805b 100644 --- a/src/quack_node.cpp +++ b/src/quack_node.cpp @@ -47,6 +47,7 @@ void Quack_BeginCustomScan(CustomScanState *cscanstate, EState *estate, int eflags) { QuackScanState *quackScanState = (QuackScanState *)cscanstate; quackScanState->css.ss.ps.ps_ResultTupleDesc = quackScanState->css.ss.ss_ScanTupleSlot->tts_tupleDescriptor; + HOLD_CANCEL_INTERRUPTS(); } static TupleTableSlot * @@ -54,7 +55,6 @@ Quack_ExecCustomScan(CustomScanState *node) { QuackScanState *quackScanState = (QuackScanState *)node; TupleTableSlot *slot = quackScanState->css.ss.ss_ScanTupleSlot; MemoryContext oldContext; - if (!quackScanState->is_executed) { quackScanState->queryResult = quackScanState->preparedStatement->Execute(); @@ -107,6 +107,7 @@ Quack_EndCustomScan(CustomScanState *node) { quackScanState->queryResult.reset(); delete quackScanState->preparedStatement; delete quackScanState->duckdb; + RESUME_CANCEL_INTERRUPTS(); } void