From dea7e1a160aba0d2abbe33611c6a513853adb36a Mon Sep 17 00:00:00 2001 From: tediou5 Date: Mon, 20 Jan 2025 15:02:40 +0800 Subject: [PATCH] opt: improve farms maintenance performance via parallelization --- .../src/cluster/controller/farms.rs | 29 +++++++------------ 1 file changed, 10 insertions(+), 19 deletions(-) diff --git a/crates/subspace-farmer/src/cluster/controller/farms.rs b/crates/subspace-farmer/src/cluster/controller/farms.rs index 4c725e3d49..d5647ba7ff 100644 --- a/crates/subspace-farmer/src/cluster/controller/farms.rs +++ b/crates/subspace-farmer/src/cluster/controller/farms.rs @@ -12,13 +12,12 @@ use crate::farm::{Farm, FarmId, SectorPlottingDetails, SectorUpdate}; use anyhow::anyhow; use async_lock::RwLock as AsyncRwLock; use futures::channel::oneshot; -use futures::future::FusedFuture; use futures::stream::FuturesUnordered; use futures::{select, FutureExt, StreamExt}; use parking_lot::Mutex; use std::collections::hash_map::Entry; -use std::collections::{HashMap, VecDeque}; -use std::future::{ready, Future}; +use std::collections::HashMap; +use std::future::Future; use std::mem; use std::pin::{pin, Pin}; use std::sync::Arc; @@ -143,11 +142,9 @@ pub async fn maintain_farms( ) -> anyhow::Result<()> { let mut known_farms = KnownFarms::new(identification_broadcast_interval); - // Futures that need to be processed sequentially in order to add/remove farms, if farm was + // Futures that need to be processed to add/remove farms, if farm was // added, future will resolve with `Some`, `None` if removed - let mut farms_to_add_remove = VecDeque::>::new(); - // Farm that is being added/removed right now (if any) - let mut farm_add_remove_in_progress = (Box::pin(ready(None)) as AddRemoveFuture<'_>).fuse(); + let mut farms_to_add_remove = FuturesUnordered::>::new(); // Initialize with pending future so it never ends let mut farms = FuturesUnordered::new(); @@ -174,16 +171,10 @@ pub async fn maintain_farms( farm_pruning_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); loop { - if farm_add_remove_in_progress.is_terminated() { - if let Some(fut) = farms_to_add_remove.pop_front() { - farm_add_remove_in_progress = fut.fuse(); - } - } - select! { (farm_index, result) = farms.select_next_some() => { known_farms.remove(farm_index); - farms_to_add_remove.push_back(Box::pin(async move { + farms_to_add_remove.push(Box::pin(async move { let plotted_pieces = Arc::clone(plotted_pieces); let delete_farm_fut = task::spawn_blocking(move || { @@ -240,7 +231,7 @@ pub async fn maintain_farms( ); } - farms_to_add_remove.push_back(Box::pin(async move { + farms_to_add_remove.push(Box::pin(async move { let plotted_pieces = Arc::clone(plotted_pieces); let delete_farm_fut = task::spawn_blocking(move || { @@ -259,7 +250,7 @@ pub async fn maintain_farms( })); } } - result = farm_add_remove_in_progress => { + result = farms_to_add_remove.select_next_some() => { if let Some((farm_index, expired_receiver, farm)) = result { farms.push(async move { select! { @@ -282,7 +273,7 @@ fn process_farm_identify_message<'a>( identify_message: ClusterFarmerIdentifyFarmBroadcast, nats_client: &'a NatsClient, known_farms: &mut KnownFarms, - farms_to_add_remove: &mut VecDeque>, + farms_to_add_remove: &mut FuturesUnordered>, plotted_pieces: &'a Arc>>, ) { let ClusterFarmerIdentifyFarmBroadcast { @@ -327,7 +318,7 @@ fn process_farm_identify_message<'a>( }; if remove { - farms_to_add_remove.push_back(Box::pin(async move { + farms_to_add_remove.push(Box::pin(async move { let plotted_pieces = Arc::clone(plotted_pieces); let delete_farm_fut = task::spawn_blocking(move || { @@ -347,7 +338,7 @@ fn process_farm_identify_message<'a>( } if add { - farms_to_add_remove.push_back(Box::pin(async move { + farms_to_add_remove.push(Box::pin(async move { match initialize_farm( farm_index, farm_id,