Skip to content

Commit

Permalink
chore: revert client side attempt
Browse files Browse the repository at this point in the history
  • Loading branch information
grumbach committed Dec 5, 2024
1 parent c8e7b2c commit a958cdf
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 47 deletions.
20 changes: 12 additions & 8 deletions autonomi/src/client/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ pub enum CostError {
#[error("Could not get store quote for: {0:?} after several retries")]
CouldNotGetStoreQuote(XorName),
#[error("Could not get store costs: {0:?}")]
CouldNotGetStoreQuotes(NetworkError),
CouldNotGetStoreCosts(NetworkError),
#[error("Failed to serialize {0}")]
Serialization(String),
}
Expand Down Expand Up @@ -194,7 +194,8 @@ impl Client {
if let Some(channel) = self.client_event_sender.as_ref() {
let tokens_spent = receipt
.values()
.fold(Amount::ZERO, |acc, (_, cost)| acc + cost.as_atto());
.map(|proof| proof.quote.cost.as_atto())
.sum::<Amount>();

let summary = UploadSummary {
record_count,
Expand Down Expand Up @@ -260,17 +261,20 @@ impl Client {
.get_store_quotes(content_addrs.into_iter())
.await
.inspect_err(|err| error!("Error getting store quotes: {err:?}"))?;
let total_cost = cost_map
.values()
.fold(Amount::ZERO, |acc, q| acc + q.total_cost.as_atto());
Ok(AttoTokens::from_atto(total_cost))
let total_cost = AttoTokens::from_atto(
cost_map
.values()
.map(|quote| quote.2.cost.as_atto())
.sum::<Amount>(),
);
Ok(total_cost)
}

// Upload chunks and retry failed uploads up to `RETRY_ATTEMPTS` times.
pub(crate) async fn upload_chunks_with_retries<'a>(
&self,
mut chunks: Vec<&'a Chunk>,
receipt: &HashMap<XorName, (Vec<ProofOfPayment>, AttoTokens)>,
receipt: &HashMap<XorName, ProofOfPayment>,
) -> Vec<(&'a Chunk, PutError)> {
let mut current_attempt: usize = 1;

Expand All @@ -287,7 +291,7 @@ impl Client {

upload_tasks.push(async move {
self_clone
.chunk_upload_with_payment(chunk, proof.0.clone())
.chunk_upload_with_payment(chunk, proof.clone())
.await
.inspect_err(|err| error!("Error uploading chunk {address:?} :{err:?}"))
// Return chunk reference too, to re-use it next attempt/iteration
Expand Down
3 changes: 2 additions & 1 deletion autonomi/src/client/data_private.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ impl Client {
if let Some(channel) = self.client_event_sender.as_ref() {
let tokens_spent = receipt
.values()
.fold(Amount::ZERO, |acc, (_, cost)| acc + cost.as_atto());
.map(|proof| proof.quote.cost.as_atto())
.sum::<Amount>();

let summary = UploadSummary {
record_count,
Expand Down
1 change: 0 additions & 1 deletion autonomi/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

pub mod address;
pub mod payment;
pub mod quote;

#[cfg(feature = "data")]
pub mod archive;
Expand Down
8 changes: 4 additions & 4 deletions autonomi/src/client/payment.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::client::data::PayError;
use crate::Client;
use ant_evm::{AttoTokens, EvmWallet, ProofOfPayment};
use ant_evm::{EvmWallet, ProofOfPayment};
use std::collections::HashMap;
use xor_name::XorName;

/// Contains the proof of payments for XOR addresses as well as the total cost.
pub type Receipt = HashMap<XorName, (Vec<ProofOfPayment>, AttoTokens)>;
/// Contains the proof of payment for XOR addresses.
pub type Receipt = HashMap<XorName, ProofOfPayment>;

/// Payment options for data payments.
#[derive(Clone)]
Expand Down Expand Up @@ -40,7 +40,7 @@ impl Client {
) -> Result<Receipt, PayError> {
match payment_option {
PaymentOption::Wallet(wallet) => {
let receipt = self.pay(content_addrs, &wallet).await?;
let (receipt, _) = self.pay(content_addrs, &wallet).await?;
Ok(receipt)
}
PaymentOption::Receipt(receipt) => Ok(receipt),
Expand Down
9 changes: 2 additions & 7 deletions autonomi/src/client/registers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl Client {

let reg_xor = address.xorname();
debug!("Paying for register at address: {address}");
let payment_proofs = self
let (payment_proofs, _skipped) = self
.pay(std::iter::once(reg_xor), wallet)
.await
.inspect_err(|err| {
Expand All @@ -307,11 +307,6 @@ impl Client {
};

let payee = proof
// NB TODO only pay the first one for now, but we should try all of them if first one fails
.0
.first()
.expect("Missing proof of payment")
// TODO remove the tmp hack above and upload to all of them one by one until one succeeds
.to_peer_id_payee()
.ok_or(RegisterError::InvalidQuote)
.inspect_err(|err| error!("Failed to get payee from payment proof: {err}"))?;
Expand Down Expand Up @@ -354,7 +349,7 @@ impl Client {
if let Some(channel) = self.client_event_sender.as_ref() {
let summary = UploadSummary {
record_count: 1,
tokens_spent: proof.1.as_atto(),
tokens_spent: proof.quote.cost.as_atto(),
};
if let Err(err) = channel.send(ClientEvent::UploadComplete(summary)).await {
error!("Failed to send client event: {err}");
Expand Down
109 changes: 90 additions & 19 deletions autonomi/src/client/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,27 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use crate::client::payment::Receipt;
use crate::utils::receipt_from_cost_map_and_payments;
use ant_evm::{EvmWallet, ProofOfPayment, QuotePayment};
use ant_networking::{
GetRecordCfg, PutRecordCfg, VerificationKind,
GetRecordCfg, Network, NetworkError, PayeeQuote, PutRecordCfg, VerificationKind,
};
use ant_protocol::{
messages::ChunkProof,
storage::{try_serialize_record, Chunk, RecordKind, RetryStrategy},
storage::{try_serialize_record, Chunk, ChunkAddress, RecordKind, RetryStrategy},
NetworkAddress,
};
use bytes::Bytes;
use futures::stream::{FuturesUnordered, StreamExt};
use libp2p::kad::{Quorum, Record};
use rand::{thread_rng, Rng};
use self_encryption::{decrypt_full_set, DataMap, EncryptedChunk};
use std::{future::Future, num::NonZero};
use std::{collections::HashMap, future::Future, num::NonZero};
use xor_name::XorName;

use super::{
quote::receipt_from_quotes_and_payments,
data::{GetError, PayError, PutError, CHUNK_DOWNLOAD_BATCH_SIZE},
payment::Receipt,
data::{CostError, GetError, PayError, PutError, CHUNK_DOWNLOAD_BATCH_SIZE},
Client,
};
use crate::self_encryption::DataMapLevel;
Expand Down Expand Up @@ -100,11 +101,9 @@ impl Client {
pub(crate) async fn chunk_upload_with_payment(
&self,
chunk: &Chunk,
payment: Vec<ProofOfPayment>,
payment: ProofOfPayment,
) -> Result<(), PutError> {
// NB TODO only pay the first one for now, but we should try all of them if first one fails
// NB TODO remove expects!!
let storing_node = payment.first().expect("Missing proof of payment").to_peer_id_payee().expect("Missing node Peer ID");
let storing_node = payment.to_peer_id_payee().expect("Missing node Peer ID");

debug!("Storing chunk: {chunk:?} to {:?}", storing_node);

Expand Down Expand Up @@ -162,9 +161,10 @@ impl Client {
&self,
content_addrs: impl Iterator<Item = XorName>,
wallet: &EvmWallet,
) -> Result<Receipt, PayError> {
let quotes = self.get_store_quotes(content_addrs).await?;
let quotes_to_pay: Vec<QuotePayment> = quotes.values().map(|q| q.nodes_to_pay.iter()).flatten().cloned().collect();
) -> Result<(Receipt, Vec<XorName>), PayError> {
let cost_map = self.get_store_quotes(content_addrs).await?;

let (quote_payments, skipped_chunks) = extract_quote_payments(&cost_map);

// Make sure nobody else can use the wallet while we are paying
debug!("Waiting for wallet lock");
Expand All @@ -175,26 +175,97 @@ impl Client {
// TODO: retry when it fails?
// Execute chunk payments
let payments = wallet
.pay_for_quotes(quotes_to_pay.into_iter())
.pay_for_quotes(quote_payments)
.await
.map_err(|err| PayError::from(err.0))?;

// payment is done, unlock the wallet for other threads
drop(lock_guard);
debug!("Unlocked wallet");

let proofs = receipt_from_quotes_and_payments(quotes, &payments);
let proofs = receipt_from_cost_map_and_payments(cost_map, &payments);

let already_paid_for = content_addrs.count() - quotes.len();
trace!(
"Chunk payments of {} chunks completed. {already_paid_for} chunks were free / already paid for",
proofs.len()
"Chunk payments of {} chunks completed. {} chunks were free / already paid for",
proofs.len(),
skipped_chunks.len()
);

Ok(proofs)
Ok((proofs, skipped_chunks))
}

pub(crate) async fn get_store_quotes(
&self,
content_addrs: impl Iterator<Item = XorName>,
) -> Result<HashMap<XorName, PayeeQuote>, CostError> {
let futures: Vec<_> = content_addrs
.into_iter()
.map(|content_addr| fetch_store_quote_with_retries(&self.network, content_addr))
.collect();

let quotes = futures::future::try_join_all(futures).await?;

Ok(quotes.into_iter().collect::<HashMap<XorName, PayeeQuote>>())
}
}

/// Fetch a store quote for a content address with a retry strategy.
async fn fetch_store_quote_with_retries(
network: &Network,
content_addr: XorName,
) -> Result<(XorName, PayeeQuote), CostError> {
let mut retries = 0;

loop {
match fetch_store_quote(network, content_addr).await {
Ok(quote) => {
break Ok((content_addr, quote));
}
Err(err) if retries < 2 => {
retries += 1;
error!("Error while fetching store quote: {err:?}, retry #{retries}");
}
Err(err) => {
error!(
"Error while fetching store quote: {err:?}, stopping after {retries} retries"
);
break Err(CostError::CouldNotGetStoreQuote(content_addr));
}
}
}
}

/// Fetch a store quote for a content address.
async fn fetch_store_quote(
network: &Network,
content_addr: XorName,
) -> Result<PayeeQuote, NetworkError> {
network
.get_store_costs_from_network(
NetworkAddress::from_chunk_address(ChunkAddress::new(content_addr)),
vec![],
)
.await
}

/// Form to be executed payments and already executed payments from a cost map.
pub(crate) fn extract_quote_payments(
cost_map: &HashMap<XorName, PayeeQuote>,
) -> (Vec<QuotePayment>, Vec<XorName>) {
let mut to_be_paid = vec![];
let mut already_paid = vec![];

for (chunk_address, (_, _, quote)) in cost_map.iter() {
if quote.cost.is_zero() {
already_paid.push(*chunk_address);
} else {
to_be_paid.push((quote.hash(), quote.rewards_address, quote.cost.as_atto()));
}
}

(to_be_paid, already_paid)
}

pub(crate) async fn process_tasks_with_max_concurrency<I, R>(tasks: I, batch_size: usize) -> Vec<R>
where
I: IntoIterator,
Expand Down
17 changes: 10 additions & 7 deletions autonomi/src/client/vault.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,14 @@ impl Client {

// NB TODO: vault should be priced differently from other data
let cost_map = self.get_store_quotes(std::iter::once(vault_xor)).await?;
let total_cost = cost_map.values().fold(Amount::ZERO, |acc, q| acc + q.total_cost.as_atto());
let total_cost = AttoTokens::from_atto(
cost_map
.values()
.map(|quote| quote.2.cost.as_atto())
.sum::<Amount>(),
);

Ok(AttoTokens::from_atto(total_cost))
Ok(total_cost)
}

/// Put data into the client's VaultPacket
Expand Down Expand Up @@ -192,14 +197,12 @@ impl Client {
})?;

let proof = match receipt.values().next() {
Some(proof) => {
// NB TODO only use the first one for now, but we should try the others if first one fails
total_cost = proof.1;
proof.0.first().expect("Missing proof of payment")
},
Some(proof) => proof,
None => return Err(PutError::PaymentUnexpectedlyInvalid(scratch_address)),
};

total_cost = proof.quote.cost;

Record {
key: scratch_key,
value: try_serialize_record(&(proof, scratch), RecordKind::ScratchpadWithPayment)
Expand Down

0 comments on commit a958cdf

Please sign in to comment.