From 3ac345e34a2328bc72603bf5c93f83d223159076 Mon Sep 17 00:00:00 2001 From: William Blanke Date: Wed, 8 Jan 2025 11:06:40 -0800 Subject: [PATCH] fix postprocessing priority mutexes --- chia/full_node/full_node.py | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/chia/full_node/full_node.py b/chia/full_node/full_node.py index de9e48e90365..9958016cf45a 100644 --- a/chia/full_node/full_node.py +++ b/chia/full_node/full_node.py @@ -316,14 +316,16 @@ async def manage(self) -> AsyncIterator[None]: ) async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high): pending_tx = await self.mempool_manager.new_peak(self.blockchain.get_tx_peak(), None) - assert len(pending_tx.items) == 0 # no pending transactions when starting up - - full_peak: Optional[FullBlock] = await self.blockchain.get_full_peak() - assert full_peak is not None - state_change_summary = StateChangeSummary(peak, uint32(max(peak.height - 1, 0)), [], [], [], []) - ppp_result: PeakPostProcessingResult = await self.peak_post_processing( - full_peak, state_change_summary, None - ) + assert len(pending_tx.items) == 0 # no pending transactions when starting up + + full_peak: Optional[FullBlock] = await self.blockchain.get_full_peak() + assert full_peak is not None + state_change_summary = StateChangeSummary(peak, uint32(max(peak.height - 1, 0)), [], [], [], []) + # Must be called under priority_mutex + ppp_result: PeakPostProcessingResult = await self.peak_post_processing( + full_peak, state_change_summary, None + ) + # Can be called outside of priority_mutex await self.peak_post_processing_2(full_peak, None, state_change_summary, ppp_result) if self.config["send_uncompact_interval"] != 0: sanitize_weight_proof_only = False @@ -648,7 +650,6 @@ async def short_sync_batch(self, peer: WSChiaConnection, start_height: uint32, t state_change_summary, peer, ) - await self.peak_post_processing_2(peak_fb, peer, state_change_summary, ppp_result) except Exception: # Still do post processing after cancel (or exception) peak_fb = await self.blockchain.get_full_peak() @@ -657,6 +658,9 @@ async def short_sync_batch(self, peer: WSChiaConnection, start_height: uint32, t raise finally: self.log.info(f"Added blocks {height}-{end_height}") + if state_change_summary is not None and peak_fb is not None: + # Call outside of priority_mutex to encourage concurrency + await self.peak_post_processing_2(peak_fb, peer, state_change_summary, ppp_result) finally: self.sync_store.batch_syncing.remove(peer.peer_node_id) return True @@ -1729,7 +1733,10 @@ async def _finish_sync(self, fork_point: Optional[uint32]) -> None: ppp_result: PeakPostProcessingResult = await self.peak_post_processing( peak_fb, state_change_summary, None ) - await self.peak_post_processing_2(peak_fb, None, state_change_summary, ppp_result) + + if peak_fb is not None: + # Call outside of priority_mutex to encourage concurrency + await self.peak_post_processing_2(peak_fb, None, state_change_summary, ppp_result) if peak is not None and self.weight_proof_handler is not None: await self.weight_proof_handler.get_proof_of_weight(peak.header_hash)