Skip to content

Commit

Permalink
Fix peak_post_processing w/priority_mutex (#19113)
Browse files Browse the repository at this point in the history
### Purpose:

peak_post_processing and peak_post_processing_2 were split to allow
priority_mutex to not be held as long (eg when broadcasting changes to
peers) to encourage concurrency. This PR fixes places where they were
not called correctly

### Current Behavior:

priority_mutex held when peak_post_processing_2 called

### New Behavior:

priority_mutex released when peak_post_processing_2 called

### Testing Notes:

None
  • Loading branch information
wjblanke authored Jan 9, 2025
2 parents 5121c5a + 3ac345e commit c5dd8f2
Showing 1 changed file with 17 additions and 10 deletions.
27 changes: 17 additions & 10 deletions chia/full_node/full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,14 +316,16 @@ async def manage(self) -> AsyncIterator[None]:
)
async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high):
pending_tx = await self.mempool_manager.new_peak(self.blockchain.get_tx_peak(), None)
assert len(pending_tx.items) == 0 # no pending transactions when starting up

full_peak: Optional[FullBlock] = await self.blockchain.get_full_peak()
assert full_peak is not None
state_change_summary = StateChangeSummary(peak, uint32(max(peak.height - 1, 0)), [], [], [], [])
ppp_result: PeakPostProcessingResult = await self.peak_post_processing(
full_peak, state_change_summary, None
)
assert len(pending_tx.items) == 0 # no pending transactions when starting up

full_peak: Optional[FullBlock] = await self.blockchain.get_full_peak()
assert full_peak is not None
state_change_summary = StateChangeSummary(peak, uint32(max(peak.height - 1, 0)), [], [], [], [])
# Must be called under priority_mutex
ppp_result: PeakPostProcessingResult = await self.peak_post_processing(
full_peak, state_change_summary, None
)
# Can be called outside of priority_mutex
await self.peak_post_processing_2(full_peak, None, state_change_summary, ppp_result)
if self.config["send_uncompact_interval"] != 0:
sanitize_weight_proof_only = False
Expand Down Expand Up @@ -648,7 +650,6 @@ async def short_sync_batch(self, peer: WSChiaConnection, start_height: uint32, t
state_change_summary,
peer,
)
await self.peak_post_processing_2(peak_fb, peer, state_change_summary, ppp_result)
except Exception:
# Still do post processing after cancel (or exception)
peak_fb = await self.blockchain.get_full_peak()
Expand All @@ -657,6 +658,9 @@ async def short_sync_batch(self, peer: WSChiaConnection, start_height: uint32, t
raise
finally:
self.log.info(f"Added blocks {height}-{end_height}")
if state_change_summary is not None and peak_fb is not None:
# Call outside of priority_mutex to encourage concurrency
await self.peak_post_processing_2(peak_fb, peer, state_change_summary, ppp_result)
finally:
self.sync_store.batch_syncing.remove(peer.peer_node_id)
return True
Expand Down Expand Up @@ -1729,7 +1733,10 @@ async def _finish_sync(self, fork_point: Optional[uint32]) -> None:
ppp_result: PeakPostProcessingResult = await self.peak_post_processing(
peak_fb, state_change_summary, None
)
await self.peak_post_processing_2(peak_fb, None, state_change_summary, ppp_result)

if peak_fb is not None:
# Call outside of priority_mutex to encourage concurrency
await self.peak_post_processing_2(peak_fb, None, state_change_summary, ppp_result)

if peak is not None and self.weight_proof_handler is not None:
await self.weight_proof_handler.get_proof_of_weight(peak.header_hash)
Expand Down

0 comments on commit c5dd8f2

Please sign in to comment.