Skip to content

Commit

Permalink
opt: improve farms maintenance performance via parallelization
Browse files Browse the repository at this point in the history
  • Loading branch information
tediou5 committed Jan 20, 2025
1 parent e0a3f16 commit dea7e1a
Showing 1 changed file with 10 additions and 19 deletions.
29 changes: 10 additions & 19 deletions crates/subspace-farmer/src/cluster/controller/farms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@ use crate::farm::{Farm, FarmId, SectorPlottingDetails, SectorUpdate};
use anyhow::anyhow;
use async_lock::RwLock as AsyncRwLock;
use futures::channel::oneshot;
use futures::future::FusedFuture;
use futures::stream::FuturesUnordered;
use futures::{select, FutureExt, StreamExt};
use parking_lot::Mutex;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::future::{ready, Future};
use std::collections::HashMap;
use std::future::Future;
use std::mem;
use std::pin::{pin, Pin};
use std::sync::Arc;
Expand Down Expand Up @@ -143,11 +142,9 @@ pub async fn maintain_farms(
) -> anyhow::Result<()> {
let mut known_farms = KnownFarms::new(identification_broadcast_interval);

// Futures that need to be processed sequentially in order to add/remove farms, if farm was
// Futures that need to be processed to add/remove farms, if farm was
// added, future will resolve with `Some`, `None` if removed
let mut farms_to_add_remove = VecDeque::<AddRemoveFuture<'_>>::new();
// Farm that is being added/removed right now (if any)
let mut farm_add_remove_in_progress = (Box::pin(ready(None)) as AddRemoveFuture<'_>).fuse();
let mut farms_to_add_remove = FuturesUnordered::<AddRemoveFuture<'_>>::new();
// Initialize with pending future so it never ends
let mut farms = FuturesUnordered::new();

Expand All @@ -174,16 +171,10 @@ pub async fn maintain_farms(
farm_pruning_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);

loop {
if farm_add_remove_in_progress.is_terminated() {
if let Some(fut) = farms_to_add_remove.pop_front() {
farm_add_remove_in_progress = fut.fuse();
}
}

select! {
(farm_index, result) = farms.select_next_some() => {
known_farms.remove(farm_index);
farms_to_add_remove.push_back(Box::pin(async move {
farms_to_add_remove.push(Box::pin(async move {
let plotted_pieces = Arc::clone(plotted_pieces);

let delete_farm_fut = task::spawn_blocking(move || {
Expand Down Expand Up @@ -240,7 +231,7 @@ pub async fn maintain_farms(
);
}

farms_to_add_remove.push_back(Box::pin(async move {
farms_to_add_remove.push(Box::pin(async move {
let plotted_pieces = Arc::clone(plotted_pieces);

let delete_farm_fut = task::spawn_blocking(move || {
Expand All @@ -259,7 +250,7 @@ pub async fn maintain_farms(
}));
}
}
result = farm_add_remove_in_progress => {
result = farms_to_add_remove.select_next_some() => {
if let Some((farm_index, expired_receiver, farm)) = result {
farms.push(async move {
select! {
Expand All @@ -282,7 +273,7 @@ fn process_farm_identify_message<'a>(
identify_message: ClusterFarmerIdentifyFarmBroadcast,
nats_client: &'a NatsClient,
known_farms: &mut KnownFarms,
farms_to_add_remove: &mut VecDeque<AddRemoveFuture<'a>>,
farms_to_add_remove: &mut FuturesUnordered<AddRemoveFuture<'a>>,
plotted_pieces: &'a Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
) {
let ClusterFarmerIdentifyFarmBroadcast {
Expand Down Expand Up @@ -327,7 +318,7 @@ fn process_farm_identify_message<'a>(
};

if remove {
farms_to_add_remove.push_back(Box::pin(async move {
farms_to_add_remove.push(Box::pin(async move {
let plotted_pieces = Arc::clone(plotted_pieces);

let delete_farm_fut = task::spawn_blocking(move || {
Expand All @@ -347,7 +338,7 @@ fn process_farm_identify_message<'a>(
}

if add {
farms_to_add_remove.push_back(Box::pin(async move {
farms_to_add_remove.push(Box::pin(async move {
match initialize_farm(
farm_index,
farm_id,
Expand Down

0 comments on commit dea7e1a

Please sign in to comment.