Skip to content

Commit

Permalink
chore: autonomi compiles!
Browse files Browse the repository at this point in the history
  • Loading branch information
mickvandijke committed Dec 9, 2024
1 parent f93e902 commit 918d236
Show file tree
Hide file tree
Showing 13 changed files with 115 additions and 132 deletions.
23 changes: 14 additions & 9 deletions autonomi/src/client/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@

use bytes::Bytes;
use libp2p::kad::Quorum;
use std::collections::HashSet;

use std::collections::{HashMap, HashSet};
use std::sync::LazyLock;
use xor_name::XorName;

use crate::client::payment::PaymentOption;
use crate::client::payment::{PaymentOption, Receipt};
use crate::client::utils::process_tasks_with_max_concurrency;
use crate::client::{ClientEvent, UploadSummary};
use crate::{self_encryption::encrypt, Client};
use ant_evm::EvmWalletError;
use ant_evm::{Amount, AttoTokens};
use ant_evm::{EvmWalletError, ProofOfPayment};
use ant_networking::{GetRecordCfg, NetworkError};
use ant_protocol::{
storage::{try_deserialize_record, Chunk, ChunkAddress, RecordHeader, RecordKind},
Expand Down Expand Up @@ -84,6 +84,8 @@ pub enum PutError {
VaultBadOwner,
#[error("Payment unexpectedly invalid for {0:?}")]
PaymentUnexpectedlyInvalid(NetworkAddress),
#[error("The payment proof contains no payees.")]
PayeesMissing,
}

/// Errors that can occur during the pay operation.
Expand Down Expand Up @@ -198,7 +200,7 @@ impl Client {
if let Some(channel) = self.client_event_sender.as_ref() {
let tokens_spent = receipt
.values()
.map(|proof| proof.quote.cost.as_atto())
.map(|(_proof, price)| price.as_atto())
.sum::<Amount>();

let summary = UploadSummary {
Expand Down Expand Up @@ -261,24 +263,27 @@ impl Client {
content_addrs.len()
);

let cost_map = self
let store_quote = self
.get_store_quotes(content_addrs.into_iter())
.await
.inspect_err(|err| error!("Error getting store quotes: {err:?}"))?;

let total_cost = AttoTokens::from_atto(
cost_map
store_quote
.0
.values()
.map(|quote| quote.2.cost.as_atto())
.map(|quote| quote.price())
.sum::<Amount>(),
);

Ok(total_cost)
}

// Upload chunks and retry failed uploads up to `RETRY_ATTEMPTS` times.
pub(crate) async fn upload_chunks_with_retries<'a>(
&self,
mut chunks: Vec<&'a Chunk>,
receipt: &HashMap<XorName, ProofOfPayment>,
receipt: &Receipt,
) -> Vec<(&'a Chunk, PutError)> {
let mut current_attempt: usize = 1;

Expand All @@ -288,7 +293,7 @@ impl Client {
let self_clone = self.clone();
let address = *chunk.address();

let Some(proof) = receipt.get(chunk.name()) else {
let Some((proof, _)) = receipt.get(chunk.name()) else {
debug!("Chunk at {address:?} was already paid for so skipping");
continue;
};
Expand Down
2 changes: 1 addition & 1 deletion autonomi/src/client/data_private.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl Client {
if let Some(channel) = self.client_event_sender.as_ref() {
let tokens_spent = receipt
.values()
.map(|proof| proof.quote.cost.as_atto())
.map(|(_proof, price)| price.as_atto())
.sum::<Amount>();

let summary = UploadSummary {
Expand Down
7 changes: 7 additions & 0 deletions autonomi/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ mod utils;

pub use ant_evm::Amount;

use crate::EvmNetwork;
use ant_networking::{interval, multiaddr_is_global, Network, NetworkBuilder, NetworkEvent};
use ant_protocol::{version::IDENTIFY_PROTOCOL_STR, CLOSE_GROUP_SIZE};
use libp2p::{identity::Keypair, Multiaddr};
Expand Down Expand Up @@ -67,6 +68,7 @@ const CLIENT_EVENT_CHANNEL_SIZE: usize = 100;
pub struct Client {
pub(crate) network: Network,
pub(crate) client_event_sender: Arc<Option<mpsc::Sender<ClientEvent>>>,
pub(crate) evm_network: EvmNetwork,
}

/// Error returned by [`Client::connect`].
Expand Down Expand Up @@ -120,6 +122,7 @@ impl Client {
Ok(Self {
network,
client_event_sender: Arc::new(None),
evm_network: Default::default(),
})
}

Expand All @@ -130,6 +133,10 @@ impl Client {
self.client_event_sender = Arc::new(Some(client_event_sender));
client_event_receiver
}

pub fn set_evm_network(&mut self, evm_network: EvmNetwork) {
self.evm_network = evm_network;
}
}

fn build_client_and_run_swarm(local: bool) -> (Network, mpsc::Receiver<NetworkEvent>) {
Expand Down
2 changes: 1 addition & 1 deletion autonomi/src/client/payment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl From<Receipt> for PaymentOption {
impl Client {
pub(crate) async fn pay_for_content_addrs(
&self,
content_addrs: impl Iterator<Item = XorName>,
content_addrs: impl Iterator<Item = XorName> + Clone,
payment_option: PaymentOption,
) -> Result<Receipt, PayError> {
match payment_option {
Expand Down
5 changes: 2 additions & 3 deletions autonomi/src/client/quote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
// permissions and limitations relating to use of the SAFE Network Software.

use super::{data::CostError, Client};
use crate::EvmNetwork;
use ant_evm::payment_vault::get_market_price;
use ant_evm::{Amount, PaymentQuote, QuotePayment};
use ant_networking::{Network, NetworkError};
Expand Down Expand Up @@ -51,7 +50,6 @@ impl StoreQuote {
impl Client {
pub(crate) async fn get_store_quotes(
&self,
evm_network: &EvmNetwork,
content_addrs: impl Iterator<Item = XorName>,
) -> Result<StoreQuote, CostError> {
// get all quotes from nodes
Expand All @@ -68,7 +66,8 @@ impl Client {
let mut prices = vec![];
for (peer, quote) in raw_quotes {
// NB TODO @mick we need to batch this smart contract call
let price = get_market_price(evm_network, quote.quoting_metrics.clone()).await?;
let price =
get_market_price(&self.evm_network, quote.quoting_metrics.clone()).await?;
prices.push((peer, quote, price));
}

Expand Down
35 changes: 22 additions & 13 deletions autonomi/src/client/registers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use crate::client::Client;
use crate::client::ClientEvent;
use crate::client::UploadSummary;

use ant_evm::EvmNetwork;
pub use ant_registers::{Permissions as RegisterPermissions, RegisterAddress};
pub use bls::SecretKey as RegisterSecretKey;

Expand Down Expand Up @@ -50,6 +49,8 @@ pub enum RegisterError {
CouldNotSign(#[source] ant_registers::Error),
#[error("Received invalid quote from node, this node is possibly malfunctioning, try another node by trying another register name")]
InvalidQuote,
#[error("The payment proof contains no payees.")]
PayeesMissing,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -225,7 +226,6 @@ impl Client {
/// Get the cost to create a register
pub async fn register_cost(
&self,
evm_network: &EvmNetwork,
name: String,
owner: RegisterSecretKey,
) -> Result<AttoTokens, RegisterError> {
Expand All @@ -239,11 +239,13 @@ impl Client {

// get cost to store register
// NB TODO: register should be priced differently from other data
let cost_map = self.get_store_quotes(evm_network, std::iter::once(reg_xor)).await?;
let store_quote = self.get_store_quotes(std::iter::once(reg_xor)).await?;

let total_cost = AttoTokens::from_atto(
cost_map
store_quote
.0
.values()
.map(|quote| quote.2.cost.as_atto())
.map(|quote| quote.price())
.sum::<Amount>(),
);

Expand Down Expand Up @@ -300,18 +302,24 @@ impl Client {
.inspect_err(|err| {
error!("Failed to pay for register at address: {address} : {err}")
})?;
let proof = if let Some(proof) = payment_proofs.get(&reg_xor) {
proof
let (proof, price) = if let Some((proof, price)) = payment_proofs.get(&reg_xor) {
(proof, price)
} else {
// register was skipped, meaning it was already paid for
error!("Register at address: {address} was already paid for");
return Err(RegisterError::Network(NetworkError::RegisterAlreadyExists));
};

let payee = proof
.to_peer_id_payee()
.ok_or(RegisterError::InvalidQuote)
.inspect_err(|err| error!("Failed to get payee from payment proof: {err}"))?;
let payees = proof.payees();

if payees.is_empty() {
error!(
"Failed to get payees from payment proof: {:?}",
RegisterError::PayeesMissing
);
return Err(RegisterError::PayeesMissing);
}

let signed_register = register.signed_reg.clone();

let record = Record {
Expand All @@ -333,10 +341,11 @@ impl Client {
expected_holders: Default::default(),
is_register: true,
};

let put_cfg = PutRecordCfg {
put_quorum: Quorum::All,
retry_strategy: None,
use_put_record_to: Some(vec![payee]),
use_put_record_to: Some(payees), // CODE REVIEW: should we put to all or just one here?
verification: Some((VerificationKind::Network, get_cfg)),
};

Expand All @@ -351,7 +360,7 @@ impl Client {
if let Some(channel) = self.client_event_sender.as_ref() {
let summary = UploadSummary {
record_count: 1,
tokens_spent: proof.quote.cost.as_atto(),
tokens_spent: price.as_atto(),
};
if let Err(err) = channel.send(ClientEvent::UploadComplete(summary)).await {
error!("Failed to send client event: {err}");
Expand Down
26 changes: 15 additions & 11 deletions autonomi/src/client/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// permissions and limitations relating to use of the SAFE Network Software.

use crate::client::payment::{receipt_from_store_quotes_and_payments, Receipt};
use ant_evm::{EvmNetwork, EvmWallet, ProofOfPayment};
use ant_evm::{EvmWallet, ProofOfPayment};
use ant_networking::{GetRecordCfg, PutRecordCfg, VerificationKind};
use ant_protocol::{
messages::ChunkProof,
Expand Down Expand Up @@ -99,9 +99,13 @@ impl Client {
chunk: &Chunk,
payment: ProofOfPayment,
) -> Result<(), PutError> {
let storing_node = payment.to_peer_id_payee().expect("Missing node Peer ID");
let storing_nodes = payment.payees();

debug!("Storing chunk: {chunk:?} to {:?}", storing_node);
if storing_nodes.is_empty() {
return Err(PutError::PayeesMissing);
}

debug!("Storing chunk: {chunk:?} to {:?}", storing_nodes);

let key = chunk.network_address().to_record_key();

Expand Down Expand Up @@ -146,21 +150,21 @@ impl Client {
let put_cfg = PutRecordCfg {
put_quorum: Quorum::One,
retry_strategy: Some(RetryStrategy::Balanced),
use_put_record_to: Some(vec![storing_node]),
use_put_record_to: Some(storing_nodes), // CODE REVIEW: do we put to all payees or just one?
verification,
};

Ok(self.network.put_record(record, &put_cfg).await?)
}

/// Pay for the chunks and get the proof of payment.
pub(crate) async fn pay(
&self,
content_addrs: impl Iterator<Item = XorName>,
content_addrs: impl Iterator<Item = XorName> + Clone,
wallet: &EvmWallet,
) -> Result<Receipt, PayError> {
let quotes = self
.get_store_quotes(wallet.network(), content_addrs.clone())
.await?;
let number_of_content_addrs = content_addrs.clone().count();
let quotes = self.get_store_quotes(content_addrs).await?;

// Make sure nobody else can use the wallet while we are paying
debug!("Waiting for wallet lock");
Expand All @@ -179,15 +183,15 @@ impl Client {
drop(lock_guard);
debug!("Unlocked wallet");

let receipt = receipt_from_store_quotes_and_payments(quotes, payments);

let skipped_chunks = content_addrs.count() - quotes.len();
let skipped_chunks = number_of_content_addrs - quotes.len();
trace!(
"Chunk payments of {} chunks completed. {} chunks were free / already paid for",
quotes.len(),
skipped_chunks
);

let receipt = receipt_from_store_quotes_and_payments(quotes, payments);

Ok(receipt)
}
}
Expand Down
12 changes: 7 additions & 5 deletions autonomi/src/client/vault.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,13 @@ impl Client {
let vault_xor = scratch.network_address().as_xorname().unwrap_or_default();

// NB TODO: vault should be priced differently from other data
let cost_map = self.get_store_quotes(std::iter::once(vault_xor)).await?;
let store_quote = self.get_store_quotes(std::iter::once(vault_xor)).await?;

let total_cost = AttoTokens::from_atto(
cost_map
store_quote
.0
.values()
.map(|quote| quote.2.cost.as_atto())
.map(|quote| quote.price())
.sum::<Amount>(),
);

Expand Down Expand Up @@ -196,12 +198,12 @@ impl Client {
error!("Failed to pay for new vault at addr: {scratch_address:?} : {err}");
})?;

let proof = match receipt.values().next() {
let (proof, price) = match receipt.values().next() {
Some(proof) => proof,
None => return Err(PutError::PaymentUnexpectedlyInvalid(scratch_address)),
};

total_cost = proof.quote.cost;
total_cost = price.clone();

Record {
key: scratch_key,
Expand Down
2 changes: 2 additions & 0 deletions evmlib/src/contract/payment_vault/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ pub enum Error {
RpcError(#[from] RpcError<TransportErrorKind>),
#[error(transparent)]
PendingTransactionError(#[from] alloy::providers::PendingTransactionError),
#[error("Payment is invalid.")]
PaymentInvalid,
}
6 changes: 6 additions & 0 deletions evmlib/src/contract/payment_vault/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ sol!(
"abi/IPaymentVault.json"
);

pub struct PaymentVerification {
pub quote_hash: FixedBytes<32>,
pub amount_paid: Amount,
pub is_valid: bool,
}

impl From<(QuoteHash, Address, Amount)> for IPaymentVault::DataPayment {
fn from(value: (QuoteHash, Address, Amount)) -> Self {
Self {
Expand Down
Loading

0 comments on commit 918d236

Please sign in to comment.