Skip to content

Commit

Permalink
pace block requests to avoid the peer hitting its response rate limit…
Browse files Browse the repository at this point in the history
…. Prioritize fast peers over slow ones
  • Loading branch information
arvidn committed Oct 25, 2024
1 parent 2df931b commit 7b4743b
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 6 deletions.
67 changes: 61 additions & 6 deletions chia/full_node/full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
from chia.util.errors import ConsensusError, Err, TimestampError, ValidationError
from chia.util.ints import uint8, uint32, uint64, uint128
from chia.util.limited_semaphore import LimitedSemaphore
from chia.util.network import is_localhost
from chia.util.path import path_from_root
from chia.util.profiler import enable_profiler, mem_profile_task, profile_task
from chia.util.safe_cancel_task import cancel_task_safe
Expand Down Expand Up @@ -1111,27 +1112,75 @@ async def sync_from_fork_point(
blockchain = AugmentedBlockchain(self.blockchain)

async def fetch_blocks(output_queue: asyncio.Queue[Optional[tuple[WSChiaConnection, list[FullBlock]]]]) -> None:
# the rate limit for respond_blocks is 100 messages / 60 seconds.
# But the limit is scaled to 30% for outbound messages, so that's 30
# messages per 60 seconds.
# That's 2 seconds per request.
seconds_per_request = 2
start_height, end_height = 0, 0
new_peers_with_peak: list[WSChiaConnection] = peers_with_peak[:]

# the timestamp of when the next request_block message is allowed to
# be sent. It's initialized to the current time, and bumped by the
# seconds_per_request every time we send a request. This ensures we
# won't exceed the 100 requests / 60 seconds rate limit.
# Whichever peer has the lowest timestamp is the one we request
# from. peers that take more than 5 seconds to respond are pushed to
# the end of the queue, to be less likely to request from.

# This should be cleaned up to not be a hard coded value, and maybe
# allow higher request rates (and align the request_blocks and
# respond_blocks rate limits).
now = time.monotonic()
new_peers_with_peak: list[tuple[WSChiaConnection, float]] = [(c, now) for c in peers_with_peak[:]]
self.log.info(f"peers with peak: {len(new_peers_with_peak)}")
random.shuffle(new_peers_with_peak)
try:
# block request ranges are *inclusive*, this requires some
# gymnastics of this range (+1 to make it exclusive, like normal
# ranges) and then -1 when forming the request message
for start_height in range(fork_point_height, target_peak_sb_height + 1, batch_size):
end_height = min(target_peak_sb_height, start_height + batch_size - 1)
request = RequestBlocks(uint32(start_height), uint32(end_height), True)
new_peers_with_peak.sort(key=lambda pair: pair[1])
fetched = False
for peer in random.sample(new_peers_with_peak, len(new_peers_with_peak)):
for idx, (peer, timestamp) in enumerate(new_peers_with_peak):
if peer.closed:
continue

start = time.monotonic()
if start < timestamp:
# rate limit ourselves, since we sent a message to
# this peer too recently
await asyncio.sleep(timestamp - start)
start = time.monotonic()

# update the timestamp, now that we're sending a request
if is_localhost(peer.peer_info.host):
# we don't apply rate limits to localhost, and our
# tests depend on it
new_peers_with_peak[idx] = (
new_peers_with_peak[idx][0],
new_peers_with_peak[idx][1] + 0.1,
)
else:
new_peers_with_peak[idx] = (
new_peers_with_peak[idx][0],
new_peers_with_peak[idx][1] + seconds_per_request,
)
response = await peer.call_api(FullNodeAPI.request_blocks, request, timeout=30)
end = time.monotonic()
if end - start > 5:
self.log.info(f"sync pipeline, peer took {end-start:0.2f} to respond to request_blocks")
if response is None:
self.log.info(f"peer timed out after {end-start:.1f} s")
await peer.close()
elif isinstance(response, RespondBlocks):
if end - start > 5:
self.log.info(f"peer took {end-start:.1f} s to respond to request_blocks")
# this isn't a great peer, reduce its priority
# to prefer any peers that had to wait for it
new_peers_with_peak[idx] = (
new_peers_with_peak[idx][0],
end,
)
start = time.monotonic()
await output_queue.put((peer, response.blocks))
end = time.monotonic()
Expand All @@ -1146,8 +1195,12 @@ async def fetch_blocks(output_queue: asyncio.Queue[Optional[tuple[WSChiaConnecti
self.log.error(f"failed fetching {start_height} to {end_height} from peers")
return
if self.sync_store.peers_changed.is_set():
new_peers_with_peak = self.get_peers_with_peak(peak_hash)
existing_peers = {c: timestamp for c, timestamp in new_peers_with_peak}
peers = self.get_peers_with_peak(peak_hash)
new_peers_with_peak = [(c, existing_peers.get(c, end)) for c in peers]
random.shuffle(new_peers_with_peak)
self.sync_store.peers_changed.clear()
self.log.info(f"peers with peak: {len(new_peers_with_peak)}")
except Exception as e:
self.log.error(f"Exception fetching {start_height} to {end_height} from peer {e}")
finally:
Expand Down Expand Up @@ -1254,7 +1307,9 @@ async def ingest_blocks(
block_rate_time = now
block_rate_height = end_height

self.log.info(f"Added blocks {start_height} to {end_height} ({block_rate} blocks/s)")
self.log.info(
f"Added blocks {start_height} to {end_height} ({block_rate} blocks/s) (from: {peer.peer_info.ip})"
)
peak: Optional[BlockRecord] = self.blockchain.get_peak()
if state_change_summary is not None:
assert peak is not None
Expand Down
18 changes: 18 additions & 0 deletions chia/server/ws_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -751,3 +751,21 @@ def get_peer_logging(self) -> PeerInfo:

def has_capability(self, capability: Capability) -> bool:
return capability in self.peer_capabilities

def __hash__(self) -> int:
return hash(self.peer_node_id)

def __eq__(self, other: object) -> bool:
if not isinstance(object, WSChiaConnection):
return False
return self.peer_node_id == other.peer_node_id

def __lt__(self, other: object) -> bool:
if not isinstance(object, WSChiaConnection):
return False
return self.peer_node_id < other.peer_node_id

def __gt__(self, other: object) -> bool:
if not isinstance(object, WSChiaConnection):
return False
return self.peer_node_id > other.peer_node_id

0 comments on commit 7b4743b

Please sign in to comment.