diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index 473c76c260f0..5d7c0a9f6541 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -385,10 +385,7 @@ impl NetworkState { } /// Handle the outcome of processed response, for example directly queue another request. - fn on_block_response_outcome( - &mut self, - outcome: BlockResponseOutcome, - ) -> Option> { + fn on_block_response_outcome(&mut self, outcome: BlockResponseOutcome) { match outcome { BlockResponseOutcome::Request(peer, request) => { self.handle_block_request(peer, request); @@ -397,7 +394,6 @@ impl NetworkState { self.peers_manager.apply_reputation_change(&peer, reputation_change); } } - None } /// Invoked when received a response from a connected peer. @@ -405,21 +401,19 @@ impl NetworkState { /// Delegates the response result to the fetcher which may return an outcome specific /// instruction that needs to be handled in [`Self::on_block_response_outcome`]. This could be /// a follow-up request or an instruction to slash the peer's reputation. - fn on_eth_response( - &mut self, - peer: PeerId, - resp: PeerResponseResult, - ) -> Option> { - match resp { + fn on_eth_response(&mut self, peer: PeerId, resp: PeerResponseResult) { + let outcome = match resp { PeerResponseResult::BlockHeaders(res) => { - let outcome = self.state_fetcher.on_block_headers_response(peer, res)?; - self.on_block_response_outcome(outcome) + self.state_fetcher.on_block_headers_response(peer, res) } PeerResponseResult::BlockBodies(res) => { - let outcome = self.state_fetcher.on_block_bodies_response(peer, res)?; - self.on_block_response_outcome(outcome) + self.state_fetcher.on_block_bodies_response(peer, res) } _ => None, + }; + + if let Some(outcome) = outcome { + self.on_block_response_outcome(outcome); } } @@ -443,13 +437,14 @@ impl NetworkState { } } - // need to buffer results here to make borrow checker happy - let mut closed_sessions = Vec::new(); - let mut received_responses = Vec::new(); + loop { + // need to buffer results here to make borrow checker happy + let mut closed_sessions = Vec::new(); + let mut received_responses = Vec::new(); - // poll all connected peers for responses - for (id, peer) in &mut self.active_peers { - if let Some(mut response) = peer.pending_response.take() { + // poll all connected peers for responses + for (id, peer) in &mut self.active_peers { + let Some(mut response) = peer.pending_response.take() else { continue }; match response.poll(cx) { Poll::Ready(res) => { // check if the error is due to a closed channel to the session @@ -460,7 +455,8 @@ impl NetworkState { "Request canceled, response channel from session closed." ); // if the channel is closed, this means the peer session is also - // closed, in which case we can invoke the [Self::on_closed_session] + // closed, in which case we can invoke the + // [Self::on_closed_session] // immediately, preventing followup requests and propagate the // connection dropped error closed_sessions.push(*id); @@ -474,15 +470,17 @@ impl NetworkState { } }; } - } - for peer in closed_sessions { - self.on_session_closed(peer) - } + for peer in closed_sessions { + self.on_session_closed(peer) + } + + if received_responses.is_empty() { + break; + } - for (peer_id, resp) in received_responses { - if let Some(action) = self.on_eth_response(peer_id, resp) { - self.queued_messages.push_back(action); + for (peer_id, resp) in received_responses { + self.on_eth_response(peer_id, resp); } } @@ -491,6 +489,8 @@ impl NetworkState { self.on_peer_action(action); } + // We need to poll again tn case we have received any responses because they may have + // triggered follow-up requests. if self.queued_messages.is_empty() { return Poll::Pending }