Skip to content

Commit

Permalink
Merge pull request #3150 from autonomys/remove-piece-getter-concurren…
Browse files Browse the repository at this point in the history
…cy-limits

Remove piece getter concurrency option
  • Loading branch information
nazar-pc authored Oct 18, 2024
2 parents 64df00e + 96763a8 commit 1755b51
Show file tree
Hide file tree
Showing 5 changed files with 4 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,6 @@ const GET_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(40);
/// Arguments for controller
#[derive(Debug, Parser)]
pub(super) struct ControllerArgs {
/// Piece getter concurrency.
///
/// Increasing this value will cause higher memory usage.
#[arg(long, default_value = "128")]
piece_getter_concurrency: NonZeroUsize,
/// Base path where to store P2P network identity
#[arg(long, value_hint = ValueHint::DirPath)]
base_path: Option<PathBuf>,
Expand Down Expand Up @@ -88,7 +83,6 @@ pub(super) async fn controller(
controller_args: ControllerArgs,
) -> anyhow::Result<Pin<Box<dyn Future<Output = anyhow::Result<()>>>>> {
let ControllerArgs {
piece_getter_concurrency,
base_path,
node_rpc_url,
cache_group,
Expand Down Expand Up @@ -186,7 +180,6 @@ pub(super) async fn controller(
..ExponentialBackoff::default()
},
},
piece_getter_concurrency,
);

let farmer_cache_worker_fut = run_future_in_dedicated_thread(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,6 @@ struct RocmPlottingOptions {
/// Arguments for plotter
#[derive(Debug, Parser)]
pub(super) struct PlotterArgs {
/// Piece getter concurrency.
///
/// Increasing this value can cause NATS communication issues if too many messages arrive via
/// NATS, but are not processed quickly enough.
#[arg(long, default_value = "32")]
piece_getter_concurrency: NonZeroUsize,
/// Plotting options only used by CPU plotter
#[clap(flatten)]
cpu_plotting_options: CpuPlottingOptions,
Expand All @@ -154,7 +148,6 @@ where
PosTable: Table,
{
let PlotterArgs {
piece_getter_concurrency,
cpu_plotting_options,
#[cfg(feature = "cuda")]
cuda_plotting_options,
Expand All @@ -169,7 +162,7 @@ where
.expect("Not zero; qed"),
)
.map_err(|error| anyhow!("Failed to instantiate erasure coding: {error}"))?;
let piece_getter = ClusterPieceGetter::new(nats_client.clone(), piece_getter_concurrency);
let piece_getter = ClusterPieceGetter::new(nats_client.clone());

let global_mutex = Arc::default();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,6 @@ pub(crate) struct FarmingArgs {
/// one specified endpoint. Format: 127.0.0.1:8080
#[arg(long, aliases = ["metrics-endpoint", "metrics-endpoints"])]
prometheus_listen_on: Vec<SocketAddr>,
/// Piece getter concurrency.
///
/// Increasing this value will cause higher memory usage.
#[arg(long, default_value = "128")]
piece_getter_concurrency: NonZeroUsize,
/// Size of PER FARM thread pool used for farming (mostly for blocking I/O, but also for some
/// compute-intensive operations during proving). Defaults to the number of logical CPUs
/// on UMA systems, or the number of logical CPUs in first NUMA node on NUMA systems, but
Expand Down Expand Up @@ -310,7 +305,6 @@ where
tmp,
mut disk_farms,
prometheus_listen_on,
piece_getter_concurrency,
farming_thread_pool_size,
cpu_plotting_options,
#[cfg(feature = "cuda")]
Expand Down Expand Up @@ -457,7 +451,6 @@ where
..ExponentialBackoff::default()
},
},
piece_getter_concurrency,
);

let farmer_cache_worker_fut = run_future_in_dedicated_thread(
Expand Down
19 changes: 2 additions & 17 deletions crates/subspace-farmer/src/cluster/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use crate::farm::{PieceCacheId, PieceCacheOffset};
use crate::farmer_cache::FarmerCache;
use crate::node_client::NodeClient;
use anyhow::anyhow;
use async_lock::Semaphore;
use async_nats::HeaderValue;
use async_trait::async_trait;
use futures::channel::mpsc;
Expand All @@ -26,7 +25,6 @@ use futures::{select, stream, FutureExt, Stream, StreamExt};
use parity_scale_codec::{Decode, Encode};
use parking_lot::Mutex;
use std::collections::{HashMap, HashSet};
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
Expand Down Expand Up @@ -196,14 +194,11 @@ impl GenericStreamRequest for ClusterControllerPiecesRequest {
#[derive(Debug, Clone)]
pub struct ClusterPieceGetter {
nats_client: NatsClient,
request_semaphore: Arc<Semaphore>,
}

#[async_trait]
impl PieceGetter for ClusterPieceGetter {
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
let _guard = self.request_semaphore.acquire().await;

if let Some((piece_cache_id, piece_cache_offset)) = self
.nats_client
.request(
Expand Down Expand Up @@ -296,8 +291,6 @@ impl PieceGetter for ClusterPieceGetter {
let mut cached_pieces_by_cache_id = HashMap::<PieceCacheId, Vec<PieceCacheOffset>>::new();

{
let _guard = self.request_semaphore.acquire().await;

let mut cached_pieces = self
.nats_client
.stream_request(
Expand Down Expand Up @@ -325,8 +318,6 @@ impl PieceGetter for ClusterPieceGetter {
let piece_indices_to_download = &piece_indices_to_get;

async move {
let _guard = self.request_semaphore.acquire().await;

let mut pieces_stream = match self
.nats_client
.stream_request(
Expand Down Expand Up @@ -388,8 +379,6 @@ impl PieceGetter for ClusterPieceGetter {
return;
}

let _guard = self.request_semaphore.acquire().await;

let mut pieces_from_controller = match self
.nats_client
.stream_request(
Expand Down Expand Up @@ -448,12 +437,8 @@ impl PieceGetter for ClusterPieceGetter {
impl ClusterPieceGetter {
/// Create new instance
#[inline]
pub fn new(nats_client: NatsClient, request_concurrency: NonZeroUsize) -> Self {
let request_semaphore = Arc::new(Semaphore::new(request_concurrency.get()));
Self {
nats_client,
request_semaphore,
}
pub fn new(nats_client: NatsClient) -> Self {
Self { nats_client }
}
}

Expand Down
13 changes: 1 addition & 12 deletions crates/subspace-farmer/src/farmer_piece_getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use crate::farm::plotted_pieces::PlottedPieces;
use crate::farmer_cache::FarmerCache;
use crate::node_client::NodeClient;
use async_lock::{RwLock as AsyncRwLock, Semaphore};
use async_lock::RwLock as AsyncRwLock;
use async_trait::async_trait;
use backoff::backoff::Backoff;
use backoff::future::retry;
Expand All @@ -14,7 +14,6 @@ use futures::stream::FuturesUnordered;
use futures::{stream, FutureExt, Stream, StreamExt};
use std::fmt;
use std::hash::Hash;
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Weak};
Expand Down Expand Up @@ -44,7 +43,6 @@ struct Inner<FarmIndex, CacheIndex, PV, NC> {
node_client: NC,
plotted_pieces: Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
dsn_cache_retry_policy: DsnCacheRetryPolicy,
request_semaphore: Arc<Semaphore>,
}

/// Farmer-specific piece getter.
Expand Down Expand Up @@ -89,25 +87,20 @@ where
node_client: NC,
plotted_pieces: Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
dsn_cache_retry_policy: DsnCacheRetryPolicy,
request_concurrency: NonZeroUsize,
) -> Self {
let request_semaphore = Arc::new(Semaphore::new(request_concurrency.get()));
Self {
inner: Arc::new(Inner {
piece_provider,
farmer_cache,
node_client,
plotted_pieces,
dsn_cache_retry_policy,
request_semaphore,
}),
}
}

/// Fast way to get piece using various caches
pub async fn get_piece_fast(&self, piece_index: PieceIndex) -> Option<Piece> {
let _guard = self.inner.request_semaphore.acquire().await;

self.get_piece_fast_internal(piece_index).await
}

Expand Down Expand Up @@ -163,8 +156,6 @@ where

/// Slow way to get piece using archival storage
pub async fn get_piece_slow(&self, piece_index: PieceIndex) -> Option<Piece> {
let _guard = self.inner.request_semaphore.acquire().await;

self.get_piece_slow_internal(piece_index).await
}

Expand Down Expand Up @@ -230,8 +221,6 @@ where
NC: NodeClient,
{
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
let _guard = self.inner.request_semaphore.acquire().await;

{
let retries = AtomicU32::new(0);
let max_retries = u32::from(self.inner.dsn_cache_retry_policy.max_retries);
Expand Down

0 comments on commit 1755b51

Please sign in to comment.