diff --git a/chia/_tests/plot_sync/test_sender.py b/chia/_tests/plot_sync/test_sender.py index 5d78be6393dc..c04035f722f0 100644 --- a/chia/_tests/plot_sync/test_sender.py +++ b/chia/_tests/plot_sync/test_sender.py @@ -45,7 +45,7 @@ def test_set_connection_values(bt: BlockTools, seeded_random: random.Random) -> # Test setting a valid connection works sender.set_connection(farmer_connection) # type:ignore[arg-type] assert sender._connection is not None - assert sender._connection == farmer_connection # type: ignore[comparison-overlap] + assert id(sender._connection) == id(farmer_connection) @pytest.mark.anyio diff --git a/chia/full_node/full_node.py b/chia/full_node/full_node.py index 055a495e7ba9..53d89d1a5c18 100644 --- a/chia/full_node/full_node.py +++ b/chia/full_node/full_node.py @@ -86,6 +86,7 @@ from chia.util.errors import ConsensusError, Err, TimestampError, ValidationError from chia.util.ints import uint8, uint32, uint64, uint128 from chia.util.limited_semaphore import LimitedSemaphore +from chia.util.network import is_localhost from chia.util.path import path_from_root from chia.util.profiler import enable_profiler, mem_profile_task, profile_task from chia.util.safe_cancel_task import cancel_task_safe @@ -1124,8 +1125,28 @@ async def sync_from_fork_point( blockchain = AugmentedBlockchain(self.blockchain) async def fetch_blocks(output_queue: asyncio.Queue[Optional[tuple[WSChiaConnection, list[FullBlock]]]]) -> None: + # the rate limit for respond_blocks is 100 messages / 60 seconds. + # But the limit is scaled to 30% for outbound messages, so that's 30 + # messages per 60 seconds. + # That's 2 seconds per request. + seconds_per_request = 2 start_height, end_height = 0, 0 - new_peers_with_peak: list[WSChiaConnection] = peers_with_peak[:] + + # the timestamp of when the next request_block message is allowed to + # be sent. It's initialized to the current time, and bumped by the + # seconds_per_request every time we send a request. This ensures we + # won't exceed the 100 requests / 60 seconds rate limit. + # Whichever peer has the lowest timestamp is the one we request + # from. peers that take more than 5 seconds to respond are pushed to + # the end of the queue, to be less likely to request from. + + # This should be cleaned up to not be a hard coded value, and maybe + # allow higher request rates (and align the request_blocks and + # respond_blocks rate limits). + now = time.monotonic() + new_peers_with_peak: list[tuple[WSChiaConnection, float]] = [(c, now) for c in peers_with_peak[:]] + self.log.info(f"peers with peak: {len(new_peers_with_peak)}") + random.shuffle(new_peers_with_peak) try: # block request ranges are *inclusive*, this requires some # gymnastics of this range (+1 to make it exclusive, like normal @@ -1133,18 +1154,55 @@ async def fetch_blocks(output_queue: asyncio.Queue[Optional[tuple[WSChiaConnecti for start_height in range(fork_point_height, target_peak_sb_height + 1, batch_size): end_height = min(target_peak_sb_height, start_height + batch_size - 1) request = RequestBlocks(uint32(start_height), uint32(end_height), True) + new_peers_with_peak.sort(key=lambda pair: pair[1]) fetched = False - for peer in random.sample(new_peers_with_peak, len(new_peers_with_peak)): + for idx, (peer, timestamp) in enumerate(new_peers_with_peak): if peer.closed: continue + start = time.monotonic() + if start < timestamp: + # rate limit ourselves, since we sent a message to + # this peer too recently + await asyncio.sleep(timestamp - start) + start = time.monotonic() + + # update the timestamp, now that we're sending a request + # it's OK for the timestamp to fall behind wall-clock + # time. It just means we're allowed to send more + # requests to catch up + if is_localhost(peer.peer_info.host): + # we don't apply rate limits to localhost, and our + # tests depend on it + bump = 0.1 + else: + bump = seconds_per_request + + new_peers_with_peak[idx] = ( + new_peers_with_peak[idx][0], + new_peers_with_peak[idx][1] + bump, + ) response = await peer.call_api(FullNodeAPI.request_blocks, request, timeout=30) end = time.monotonic() if end - start > 5: self.log.info(f"sync pipeline, peer took {end - start:0.2f} to respond to request_blocks") if response is None: + self.log.info(f"peer timed out after {end - start:.1f} s") await peer.close() elif isinstance(response, RespondBlocks): + if end - start > 5: + self.log.info(f"peer took {end - start:.1f} s to respond to request_blocks") + # this isn't a great peer, reduce its priority + # to prefer any peers that had to wait for it. + # By setting the next allowed timestamp to now, + # means that any other peer that has waited for + # this will have its next allowed timestamp in + # the passed, and be prefered multiple times + # over this peer. + new_peers_with_peak[idx] = ( + new_peers_with_peak[idx][0], + end, + ) start = time.monotonic() await output_queue.put((peer, response.blocks)) end = time.monotonic() @@ -1159,8 +1217,12 @@ async def fetch_blocks(output_queue: asyncio.Queue[Optional[tuple[WSChiaConnecti self.log.error(f"failed fetching {start_height} to {end_height} from peers") return if self.sync_store.peers_changed.is_set(): - new_peers_with_peak = self.get_peers_with_peak(peak_hash) + existing_peers = {id(c): timestamp for c, timestamp in new_peers_with_peak} + peers = self.get_peers_with_peak(peak_hash) + new_peers_with_peak = [(c, existing_peers.get(id(c), end)) for c in peers] + random.shuffle(new_peers_with_peak) self.sync_store.peers_changed.clear() + self.log.info(f"peers with peak: {len(new_peers_with_peak)}") except Exception as e: self.log.error(f"Exception fetching {start_height} to {end_height} from peer {e}") finally: @@ -1267,7 +1329,9 @@ async def ingest_blocks( block_rate_time = now block_rate_height = end_height - self.log.info(f"Added blocks {start_height} to {end_height} ({block_rate} blocks/s)") + self.log.info( + f"Added blocks {start_height} to {end_height} ({block_rate} blocks/s) (from: {peer.peer_info.ip})" + ) peak: Optional[BlockRecord] = self.blockchain.get_peak() if state_change_summary is not None: assert peak is not None