diff --git a/handel/src/aggregation.rs b/handel/src/aggregation.rs index ec2b30abd7..4deff62673 100644 --- a/handel/src/aggregation.rs +++ b/handel/src/aggregation.rs @@ -1,11 +1,13 @@ use std::{ + num::NonZeroUsize, pin::Pin, task::{Context, Poll}, + thread, }; use futures::{ future::{BoxFuture, FutureExt}, - stream::{BoxStream, Stream, StreamExt}, + stream::{BoxStream, FuturesUnordered, Stream, StreamExt}, }; use nimiq_time::{interval, Interval}; @@ -68,12 +70,15 @@ where /// Future of the currently verified pending contribution. /// There is only ever one contribution being verified at a time. - current_verification: - Option)>>, + current_verifications: FuturesUnordered< + BoxFuture<'static, (VerificationResult, PendingContribution)>, + >, /// The final result of the aggregation once it has been produced. /// A `Some(_)` value here indicates that the aggregation has finished. final_result: Option, + + available_parallelism: usize, } impl Aggregation @@ -131,8 +136,9 @@ where network: sender, start_level_interval, periodic_update_interval, - current_verification: None, + current_verifications: FuturesUnordered::new(), final_result: None, + available_parallelism: thread::available_parallelism().map_or(1, NonZeroUsize::get), } } @@ -406,7 +412,7 @@ where return Some(result); } - self.current_verification = Some(fut); + self.current_verifications.push(fut); None } @@ -466,27 +472,25 @@ where } // Poll the verification future if there is one. - if let Some(future) = &mut self.current_verification { - if let Poll::Ready((result, contribution)) = future.poll_unpin(cx) { - // If a result is produced, unset the future such that a new one can take its place. - self.current_verification = None; - - if result.is_ok() { - // If the contribution was successfully verified, apply it and return the new - // best aggregate. - best_aggregate = Some(self.apply_contribution(contribution)) - } else { - // Verification failed, ban sender. - warn!( - id = %self.protocol.identify(), - ?result, - ?contribution, - "Rejecting invalid contribution" - ); - self.network.ban_node(contribution.origin); - } + while let Poll::Ready(Some((result, contribution))) = + self.current_verifications.poll_next_unpin(cx) + { + if result.is_ok() { + // If the contribution was successfully verified, apply it and return the new + // best aggregate. + best_aggregate = Some(self.apply_contribution(contribution)); + break; + } else { + // Verification failed, ban sender. + warn!( + id = %self.protocol.identify(), + ?result, + ?contribution, + "Rejecting invalid contribution" + ); + self.network.ban_node(contribution.origin); } - }; + } // Check if the automatic update interval triggers, if so perform the update. while let Poll::Ready(_instant) = self.periodic_update_interval.poll_next_unpin(cx) { @@ -505,7 +509,8 @@ where // does not have produced a value yet. This is necessary as the verification future could // resolve immediately producing a second item for the stream. As the new best aggregate // will be returned, this stream will be polled again creating the future in the next poll. - if self.current_verification.is_none() && best_aggregate.is_none() { + if self.current_verifications.len() < self.available_parallelism && best_aggregate.is_none() + { // Get the next best pending contribution. while let Poll::Ready(Some(contribution)) = self.pending_contributions.poll_next_unpin(cx)