Skip to content

Commit

Permalink
feat: move the RetryStrategy into protocol and use that during cli up…
Browse files Browse the repository at this point in the history
…load/download
  • Loading branch information
RolandSherwin committed Feb 8, 2024
1 parent d91ec7a commit 7ac1ced
Show file tree
Hide file tree
Showing 12 changed files with 160 additions and 93 deletions.
45 changes: 33 additions & 12 deletions sn_cli/src/subcommands/files/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ use rand::{seq::SliceRandom, thread_rng};
use serde::Deserialize;
use sn_client::{
Client, Error as ClientError, FileUploadEvent, FilesApi, FilesDownload, FilesDownloadEvent,
FilesUpload, BATCH_SIZE, MAX_UPLOAD_RETRIES,
FilesUpload, BATCH_SIZE,
};
use sn_protocol::storage::{Chunk, ChunkAddress};
use sn_protocol::storage::{Chunk, ChunkAddress, RetryStrategy};
use sn_transfers::{Error as TransfersError, WalletError};
use std::{
collections::BTreeSet,
Expand Down Expand Up @@ -59,10 +59,12 @@ pub enum FilesCmds {
/// Should the file be made accessible to all. (This is irreversible)
#[clap(long, name = "make_public", default_value = "false", short = 'p')]
make_public: bool,
/// The retry_count for retrying failed chunks
/// during payment and upload processing.
#[clap(long, default_value_t = MAX_UPLOAD_RETRIES, short = 'r')]
max_retries: usize,
/// Set the strategy to use on chunk upload failure. Does not modify the spend failure retry attempts yet.
///
/// Choose a retry strategy based on effort level, from 'quick' (least effort), through 'balanced',
/// to 'persistent' (most effort).
#[clap(long, default_value_t = RetryStrategy::Balanced, short = 'r', help = "Sets the retry strategy on upload failure. Options: 'quick' for minimal effort, 'balanced' for moderate effort, or 'persistent' for maximum effort.")]
retry_strategy: RetryStrategy,
},
Download {
/// The name to apply to the downloaded file.
Expand All @@ -86,6 +88,12 @@ pub enum FilesCmds {
/// The batch_size for parallel downloading
#[clap(long, default_value_t = BATCH_SIZE , short='b')]
batch_size: usize,
/// Set the strategy to use on downloads failure.
///
/// Choose a retry strategy based on effort level, from 'quick' (least effort), through 'balanced',
/// to 'persistent' (most effort).
#[clap(long, default_value_t = RetryStrategy::Quick, short = 'r', help = "Sets the retry strategy on download failure. Options: 'quick' for minimal effort, 'balanced' for moderate effort, or 'persistent' for maximum effort.")]
retry_strategy: RetryStrategy,
},
}

Expand Down Expand Up @@ -154,7 +162,7 @@ pub(crate) async fn files_cmds(
FilesCmds::Upload {
path,
batch_size,
max_retries,
retry_strategy,
make_public,
} => {
upload_files(
Expand All @@ -164,7 +172,7 @@ pub(crate) async fn files_cmds(
root_dir.to_path_buf(),
verify_store,
batch_size,
max_retries,
retry_strategy,
)
.await?
}
Expand All @@ -173,6 +181,7 @@ pub(crate) async fn files_cmds(
file_addr,
show_holders,
batch_size,
retry_strategy,
} => {
if (file_name.is_some() && file_addr.is_none())
|| (file_addr.is_some() && file_name.is_none())
Expand Down Expand Up @@ -221,12 +230,20 @@ pub(crate) async fn files_cmds(
&download_dir,
show_holders,
batch_size,
retry_strategy,
)
.await
}
_ => {
println!("Attempting to download all files uploaded by the current user...");
download_files(&files_api, root_dir, show_holders, batch_size).await?
download_files(
&files_api,
root_dir,
show_holders,
batch_size,
retry_strategy,
)
.await?
}
}
}
Expand All @@ -243,7 +260,7 @@ async fn upload_files(
root_dir: PathBuf,
verify_store: bool,
batch_size: usize,
max_retries: usize,
retry_strategy: RetryStrategy,
) -> Result<()> {
debug!("Uploading file(s) from {files_path:?}, batch size {batch_size:?} will verify?: {verify_store}");
if make_data_public {
Expand Down Expand Up @@ -322,7 +339,7 @@ async fn upload_files(
let mut files_upload = FilesUpload::new(files_api)
.set_batch_size(batch_size)
.set_verify_store(verify_store)
.set_max_retries(max_retries);
.set_retry_strategy(retry_strategy);
let mut upload_event_rx = files_upload.get_upload_events();
// keep track of the progress in a separate task
let progress_bar_clone = progress_bar.clone();
Expand Down Expand Up @@ -449,6 +466,7 @@ async fn download_files(
root_dir: &Path,
show_holders: bool,
batch_size: usize,
retry_strategy: RetryStrategy,
) -> Result<()> {
info!("Downloading with batch size of {}", batch_size);
let uploaded_files_path = root_dir.join(UPLOADED_FILES);
Expand Down Expand Up @@ -493,6 +511,7 @@ async fn download_files(
&download_path,
show_holders,
batch_size,
retry_strategy,
)
.await;
}
Expand All @@ -519,10 +538,12 @@ async fn download_file(
download_path: &Path,
show_holders: bool,
batch_size: usize,
retry_strategy: RetryStrategy,
) {
let mut files_download = FilesDownload::new(files_api.clone())
.set_batch_size(batch_size)
.set_show_holders(show_holders);
.set_show_holders(show_holders)
.set_retry_strategy(retry_strategy);

println!("Downloading {file_name:?} from {xor_name:64x} with batch-size {batch_size}");
debug!("Downloading {file_name:?} from {:64x}", xor_name);
Expand Down
21 changes: 15 additions & 6 deletions sn_client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ use sn_networking::{
multiaddr_is_global,
target_arch::{interval, spawn, timeout, Instant},
Error as NetworkError, GetRecordCfg, GetRecordError, NetworkBuilder, NetworkEvent,
PutRecordCfg, RetryStrategy, VerificationKind, CLOSE_GROUP_SIZE,
PutRecordCfg, VerificationKind, CLOSE_GROUP_SIZE,
};
use sn_protocol::{
error::Error as ProtocolError,
messages::ChunkProof,
storage::{
try_deserialize_record, try_serialize_record, Chunk, ChunkAddress, RecordHeader,
RecordKind, RegisterAddress, SpendAddress,
RecordKind, RegisterAddress, RetryStrategy, SpendAddress,
},
NetworkAddress, PrettyPrintRecordKey,
};
Expand Down Expand Up @@ -593,16 +593,19 @@ impl Client {
/// * 'payee' - [PeerId]
/// * 'payment' - [Payment]
/// * 'verify_store' - Boolean
/// * 'retry_strategy' - [Option]<[RetryStrategy]> : Uses Balanced by default
///
pub(super) async fn store_chunk(
&self,
chunk: Chunk,
payee: PeerId,
payment: Payment,
verify_store: bool,
retry_strategy: Option<RetryStrategy>,
) -> Result<()> {
info!("Store chunk: {:?}", chunk.address());
let key = chunk.network_address().to_record_key();
let retry_strategy = Some(retry_strategy.unwrap_or(RetryStrategy::Balanced));

let record_kind = RecordKind::ChunkWithPayment;
let record = Record {
Expand All @@ -617,7 +620,7 @@ impl Client {
get_quorum: Quorum::N(
NonZeroUsize::new(2).ok_or(Error::NonZeroUsizeWasInitialisedAsZero)?,
),
re_attempt: Some(RetryStrategy::Balanced),
re_attempt: retry_strategy,
target_record: None, // Not used since we use ChunkProof
expected_holders: Default::default(),
};
Expand All @@ -641,7 +644,7 @@ impl Client {
};
let put_cfg = PutRecordCfg {
put_quorum: Quorum::One,
re_attempt: Some(RetryStrategy::Balanced),
re_attempt: retry_strategy,
use_put_record_to: Some(vec![payee]),
verification,
};
Expand All @@ -653,6 +656,7 @@ impl Client {
/// # Arguments
/// * 'address' - [ChunkAddress]
/// * 'show_holders' - Boolean
/// * 'retry_strategy' - [Option]<[RetryStrategy]> : Uses Quick by default
///
/// Return Type:
///
Expand All @@ -677,7 +681,12 @@ impl Client {
/// # Ok(())
/// # }
/// ```
pub async fn get_chunk(&self, address: ChunkAddress, show_holders: bool) -> Result<Chunk> {
pub async fn get_chunk(
&self,
address: ChunkAddress,
show_holders: bool,
retry_strategy: Option<RetryStrategy>,
) -> Result<Chunk> {
info!("Getting chunk: {address:?}");
let key = NetworkAddress::from_chunk_address(address).to_record_key();

Expand All @@ -696,7 +705,7 @@ impl Client {

let get_cfg = GetRecordCfg {
get_quorum: Quorum::One,
re_attempt: Some(RetryStrategy::Quick),
re_attempt: Some(retry_strategy.unwrap_or(RetryStrategy::Quick)),
target_record: None,
expected_holders,
};
Expand Down
39 changes: 27 additions & 12 deletions sn_client/src/files/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
use crate::{
chunks::{DataMapLevel, Error as ChunksError},
error::{Error as ClientError, Result},
Client, FilesApi, BATCH_SIZE, MAX_UPLOAD_RETRIES,
Client, FilesApi, BATCH_SIZE,
};
use bytes::Bytes;
use futures::StreamExt;
use itertools::Itertools;
use self_encryption::{decrypt_full_set, DataMap, EncryptedChunk, StreamSelfDecryptor};
use sn_networking::target_arch::Instant;
use sn_protocol::storage::{Chunk, ChunkAddress};
use sn_protocol::storage::{Chunk, ChunkAddress, RetryStrategy};

use std::{collections::HashMap, fs, path::PathBuf};
use tokio::sync::mpsc::{self};
Expand Down Expand Up @@ -51,8 +51,7 @@ pub struct FilesDownload {
// Configurations
batch_size: usize,
show_holders: bool,
// todo: controlled by GetRecordCfg, need to expose things.
max_retries: usize,
retry_strategy: RetryStrategy,
// API
api: FilesApi,
// Events
Expand All @@ -67,7 +66,7 @@ impl FilesDownload {
Self {
batch_size: BATCH_SIZE,
show_holders: false,
max_retries: MAX_UPLOAD_RETRIES,
retry_strategy: RetryStrategy::Quick,
api: files_api,
event_sender: None,
logged_event_sender_absence: false,
Expand All @@ -90,11 +89,11 @@ impl FilesDownload {
self
}

/// Sets the maximum number of retries to perform if a chunk fails to download.
/// Sets the RetryStrategy to increase the re-try on failure attempts.
///
/// By default, this option is set to the constant `MAX_UPLOAD_RETRIES: usize = 3`.
pub fn set_max_retries(mut self, max_retries: usize) -> Self {
self.max_retries = max_retries;
/// By default, this option is set to RetryStrategy::Balanced
pub fn set_retry_strategy(mut self, retry_strategy: RetryStrategy) -> Self {
self.retry_strategy = retry_strategy;
self
}

Expand Down Expand Up @@ -146,7 +145,11 @@ impl FilesDownload {
length: usize,
) -> Result<Bytes> {
debug!("Reading {length} bytes at: {address:?}, starting from position: {position}");
let chunk = self.api.client.get_chunk(address, false).await?;
let chunk = self
.api
.client
.get_chunk(address, false, Some(self.retry_strategy))
.await?;

// First try to deserialize a LargeFile, if it works, we go and seek it.
// If an error occurs, we consider it to be a SmallFile.
Expand Down Expand Up @@ -267,7 +270,12 @@ impl FilesDownload {
info!("Downloading via supplied local datamap");
chunk
} else {
match self.api.client.get_chunk(address, self.show_holders).await {
match self
.api
.client
.get_chunk(address, self.show_holders, Some(self.retry_strategy))
.await
{
Ok(chunk) => chunk,
Err(err) => {
error!("Failed to fetch head chunk {address:?}");
Expand Down Expand Up @@ -367,6 +375,7 @@ impl FilesDownload {

let client_clone = self.api.client.clone();
let show_holders = self.show_holders;
let retry_strategy = self.retry_strategy;
// the initial index is not always 0 as we might seek a range of bytes. So fetch the first index
let mut current_index = chunk_infos
.first()
Expand All @@ -379,6 +388,7 @@ impl FilesDownload {
chunk_info.dst_hash,
chunk_info.index,
show_holders,
retry_strategy,
)
})
.buffer_unordered(self.batch_size);
Expand Down Expand Up @@ -503,9 +513,14 @@ impl FilesDownload {
address: XorName,
index: usize,
show_holders: bool,
retry_strategy: RetryStrategy,
) -> std::result::Result<(ChunkAddress, usize, EncryptedChunk), ChunksError> {
let chunk = client
.get_chunk(ChunkAddress::new(address), show_holders)
.get_chunk(
ChunkAddress::new(address),
show_holders,
Some(retry_strategy),
)
.await
.map_err(|err| {
error!("Chunk missing {address:?} with {err:?}",);
Expand Down
11 changes: 5 additions & 6 deletions sn_client/src/files/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use bytes::Bytes;
use libp2p::PeerId;
use self_encryption::{self, MIN_ENCRYPTABLE_BYTES};
use sn_protocol::{
storage::{Chunk, ChunkAddress},
storage::{Chunk, ChunkAddress, RetryStrategy},
NetworkAddress,
};
use sn_transfers::HotWallet;
Expand All @@ -32,9 +32,6 @@ use xor_name::XorName;
/// `BATCH_SIZE` determines the number of chunks that are processed in parallel during the payment and upload process.
pub const BATCH_SIZE: usize = 16;

/// The maximum number of retries to perform on a failed chunk.
pub const MAX_UPLOAD_RETRIES: usize = 3;

/// File APIs.
#[derive(Clone)]
pub struct FilesApi {
Expand Down Expand Up @@ -115,11 +112,13 @@ impl FilesApi {
/// Directly writes Chunks to the network in the
/// form of immutable self encrypted chunks.
///
/// * 'retry_strategy' - [Option]<[RetryStrategy]> : Uses Balanced by default
pub async fn get_local_payment_and_upload_chunk(
&self,
chunk: Chunk,
payee: PeerId,
verify_store: bool,
retry_strategy: Option<RetryStrategy>,
) -> Result<()> {
let chunk_addr = chunk.network_address();
trace!("Client upload started for chunk: {chunk_addr:?} to {payee:?}");
Expand All @@ -133,7 +132,7 @@ impl FilesApi {
);

self.client
.store_chunk(chunk, payee, payment, verify_store)
.store_chunk(chunk, payee, payment, verify_store, retry_strategy)
.await?;

wallet_client.remove_payment_for_addr(&chunk_addr)?;
Expand Down Expand Up @@ -179,7 +178,7 @@ impl FilesApi {

for (_chunk_name, chunk_path) in chunks_paths {
let chunk = Chunk::new(Bytes::from(fs::read(chunk_path)?));
self.get_local_payment_and_upload_chunk(chunk, PeerId::random(), verify)
self.get_local_payment_and_upload_chunk(chunk, PeerId::random(), verify, None)
.await?;
}

Expand Down
Loading

0 comments on commit 7ac1ced

Please sign in to comment.