diff --git a/ant-cli/src/access/user_data.rs b/ant-cli/src/access/user_data.rs index 30149d2d98..2f62d9b6dd 100644 --- a/ant-cli/src/access/user_data.rs +++ b/ant-cli/src/access/user_data.rs @@ -10,7 +10,8 @@ use std::collections::HashMap; use autonomi::client::{ address::{addr_to_str, str_to_addr}, - files::{archive::PrivateArchiveAccess, archive_public::ArchiveAddr}, + files::archive_private::PrivateArchiveAccess, + files::archive_public::ArchiveAddr, vault::UserData, }; use color_eyre::eyre::Result; diff --git a/ant-cli/src/actions/download.rs b/ant-cli/src/actions/download.rs index 6b3bbd380c..a2ab22c122 100644 --- a/ant-cli/src/actions/download.rs +++ b/ant-cli/src/actions/download.rs @@ -9,8 +9,8 @@ use super::get_progress_bar; use autonomi::{ client::{ - address::str_to_addr, - files::{archive::PrivateArchiveAccess, archive_public::ArchiveAddr}, + address::str_to_addr, files::archive_private::PrivateArchiveAccess, + files::archive_public::ArchiveAddr, }, Client, }; diff --git a/autonomi/src/client/data/mod.rs b/autonomi/src/client/data/mod.rs deleted file mode 100644 index 066c578585..0000000000 --- a/autonomi/src/client/data/mod.rs +++ /dev/null @@ -1,288 +0,0 @@ -// Copyright 2024 MaidSafe.net limited. -// -// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. -// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed -// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. Please review the Licences for the specific language governing -// permissions and limitations relating to use of the SAFE Network Software. - -use std::hash::{DefaultHasher, Hash, Hasher}; -use std::sync::LazyLock; - -use ant_evm::{Amount, EvmWalletError}; -use ant_networking::NetworkError; -use ant_protocol::storage::{Chunk, DataTypes}; -use ant_protocol::NetworkAddress; -use bytes::Bytes; -use serde::{Deserialize, Serialize}; -use xor_name::XorName; - -use crate::client::payment::PaymentOption; -use crate::client::{ClientEvent, UploadSummary}; -use crate::{self_encryption::encrypt, Client}; - -pub mod public; - -/// Number of chunks to upload in parallel. -/// -/// Can be overridden by the `CHUNK_UPLOAD_BATCH_SIZE` environment variable. -pub(crate) static CHUNK_UPLOAD_BATCH_SIZE: LazyLock = LazyLock::new(|| { - let batch_size = std::env::var("CHUNK_UPLOAD_BATCH_SIZE") - .ok() - .and_then(|s| s.parse().ok()) - .unwrap_or( - std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(1) - * 8, - ); - info!("Chunk upload batch size: {}", batch_size); - batch_size -}); - -/// Number of chunks to download in parallel. -/// -/// Can be overridden by the `CHUNK_DOWNLOAD_BATCH_SIZE` environment variable. -pub static CHUNK_DOWNLOAD_BATCH_SIZE: LazyLock = LazyLock::new(|| { - let batch_size = std::env::var("CHUNK_DOWNLOAD_BATCH_SIZE") - .ok() - .and_then(|s| s.parse().ok()) - .unwrap_or( - std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(1) - * 8, - ); - info!("Chunk download batch size: {}", batch_size); - batch_size -}); - -/// Number of retries to upload chunks. -pub(crate) const RETRY_ATTEMPTS: usize = 3; - -/// Raw Data Address (points to a DataMap) -pub type DataAddr = XorName; -/// Raw Chunk Address (points to a [`Chunk`]) -pub type ChunkAddr = XorName; - -/// Errors that can occur during the put operation. -#[derive(Debug, thiserror::Error)] -pub enum PutError { - #[error("Failed to self-encrypt data.")] - SelfEncryption(#[from] crate::self_encryption::Error), - #[error("A network error occurred.")] - Network(#[from] NetworkError), - #[error("Error occurred during cost estimation.")] - CostError(#[from] CostError), - #[error("Error occurred during payment.")] - PayError(#[from] PayError), - #[error("Serialization error: {0}")] - Serialization(String), - #[error("A wallet error occurred.")] - Wallet(#[from] ant_evm::EvmError), - #[error("The vault owner key does not match the client's public key")] - VaultBadOwner, - #[error("Payment unexpectedly invalid for {0:?}")] - PaymentUnexpectedlyInvalid(NetworkAddress), - #[error("The payment proof contains no payees.")] - PayeesMissing, -} - -/// Errors that can occur during the pay operation. -#[derive(Debug, thiserror::Error)] -pub enum PayError { - #[error("Wallet error: {0:?}")] - EvmWalletError(#[from] EvmWalletError), - #[error("Failed to self-encrypt data.")] - SelfEncryption(#[from] crate::self_encryption::Error), - #[error("Cost error: {0:?}")] - Cost(#[from] CostError), -} - -/// Errors that can occur during the get operation. -#[derive(Debug, thiserror::Error)] -pub enum GetError { - #[error("Could not deserialize data map.")] - InvalidDataMap(rmp_serde::decode::Error), - #[error("Failed to decrypt data.")] - Decryption(crate::self_encryption::Error), - #[error("Failed to deserialize")] - Deserialization(#[from] rmp_serde::decode::Error), - #[error("General networking error: {0:?}")] - Network(#[from] NetworkError), - #[error("General protocol error: {0:?}")] - Protocol(#[from] ant_protocol::Error), -} - -/// Errors that can occur during the cost calculation. -#[derive(Debug, thiserror::Error)] -pub enum CostError { - #[error("Failed to self-encrypt data.")] - SelfEncryption(#[from] crate::self_encryption::Error), - #[error("Could not get store quote for: {0:?} after several retries")] - CouldNotGetStoreQuote(XorName), - #[error("Could not get store costs: {0:?}")] - CouldNotGetStoreCosts(NetworkError), - #[error("Not enough node quotes for {0:?}, got: {1:?} and need at least {2:?}")] - NotEnoughNodeQuotes(XorName, usize, usize), - #[error("Failed to serialize {0}")] - Serialization(String), - #[error("Market price error: {0:?}")] - MarketPriceError(#[from] ant_evm::payment_vault::error::Error), -} - -/// Private data on the network can be accessed with this -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, PartialOrd, Ord)] -pub struct DataMapChunk(Chunk); - -impl DataMapChunk { - pub fn to_hex(&self) -> String { - hex::encode(self.0.value()) - } - - pub fn from_hex(hex: &str) -> Result { - let data = hex::decode(hex)?; - Ok(Self(Chunk::new(Bytes::from(data)))) - } - - /// Get a private address for [`DataMapChunk`]. Note that this is not a network address, it is only used for refering to private data client side. - pub fn address(&self) -> String { - hash_to_short_string(&self.to_hex()) - } -} - -impl From for DataMapChunk { - fn from(value: Chunk) -> Self { - Self(value) - } -} - -fn hash_to_short_string(input: &str) -> String { - let mut hasher = DefaultHasher::new(); - input.hash(&mut hasher); - let hash_value = hasher.finish(); - hash_value.to_string() -} - -impl Client { - /// Fetch a blob of (private) data from the network - /// - /// # Example - /// - /// ```no_run - /// use autonomi::{Client, Bytes}; - /// # #[tokio::main] - /// # async fn main() -> Result<(), Box> { - /// # let client = Client::init().await?; - /// # let data_map = todo!(); - /// let data_fetched = client.data_get(data_map).await?; - /// # Ok(()) - /// # } - /// ``` - pub async fn data_get(&self, data_map: DataMapChunk) -> Result { - info!( - "Fetching private data from Data Map {:?}", - data_map.0.address() - ); - let data = self.fetch_from_data_map_chunk(data_map.0.value()).await?; - - debug!("Successfully fetched a blob of private data from the network"); - Ok(data) - } - - /// Upload a piece of private data to the network. This data will be self-encrypted. - /// The [`DataMapChunk`] is not uploaded to the network, keeping the data private. - /// - /// Returns the [`DataMapChunk`] containing the map to the encrypted chunks. - /// - /// # Example - /// - /// ```no_run - /// use autonomi::{Client, Bytes}; - /// # #[tokio::main] - /// # async fn main() -> Result<(), Box> { - /// # let client = Client::init().await?; - /// # let wallet = todo!(); - /// let data = Bytes::from("Hello, World"); - /// let data_map = client.data_put(data, wallet).await?; - /// let data_fetched = client.data_get(data_map).await?; - /// assert_eq!(data, data_fetched); - /// # Ok(()) - /// # } - /// ``` - pub async fn data_put( - &self, - data: Bytes, - payment_option: PaymentOption, - ) -> Result { - let now = ant_networking::time::Instant::now(); - let (data_map_chunk, chunks) = encrypt(data)?; - debug!("Encryption took: {:.2?}", now.elapsed()); - - // Pay for all chunks - let xor_names: Vec<_> = chunks - .iter() - .map(|chunk| (*chunk.name(), chunk.serialised_size())) - .collect(); - info!("Paying for {} addresses", xor_names.len()); - let (receipt, skipped_payments) = self - .pay_for_content_addrs( - DataTypes::Chunk.get_index(), - xor_names.into_iter(), - payment_option, - ) - .await - .inspect_err(|err| error!("Error paying for data: {err:?}"))?; - - // Upload the chunks with the payments - debug!("Uploading {} chunks", chunks.len()); - - let mut failed_uploads = self - .upload_chunks_with_retries(chunks.iter().collect(), &receipt) - .await; - - // Return the last chunk upload error - if let Some(last_chunk_fail) = failed_uploads.pop() { - tracing::error!( - "Error uploading chunk ({:?}): {:?}", - last_chunk_fail.0.address(), - last_chunk_fail.1 - ); - return Err(last_chunk_fail.1); - } - - let record_count = chunks.len().saturating_sub(skipped_payments); - - // Reporting - if let Some(channel) = self.client_event_sender.as_ref() { - let tokens_spent = receipt - .values() - .map(|(_, cost)| cost.as_atto()) - .sum::(); - - let summary = UploadSummary { - records_paid: record_count, - records_already_paid: skipped_payments, - tokens_spent, - }; - if let Err(err) = channel.send(ClientEvent::UploadComplete(summary)).await { - error!("Failed to send client event: {err:?}"); - } - } - - Ok(DataMapChunk(data_map_chunk)) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_hex() { - let data_map = DataMapChunk(Chunk::new(Bytes::from_static(b"hello"))); - let hex = data_map.to_hex(); - let data_map2 = DataMapChunk::from_hex(&hex).expect("Failed to decode hex"); - assert_eq!(data_map, data_map2); - } -} diff --git a/autonomi/src/client/data_types/chunk.rs b/autonomi/src/client/data_types/chunk.rs new file mode 100644 index 0000000000..b6a14e26cf --- /dev/null +++ b/autonomi/src/client/data_types/chunk.rs @@ -0,0 +1,339 @@ +// Copyright 2025 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +use std::{ + collections::HashSet, + hash::{DefaultHasher, Hash, Hasher}, + num::NonZero, + sync::LazyLock, +}; + +use ant_evm::ProofOfPayment; +use ant_networking::{GetRecordCfg, NetworkError, PutRecordCfg, VerificationKind}; +use ant_protocol::{ + messages::ChunkProof, + storage::{ + try_deserialize_record, try_serialize_record, ChunkAddress, DataTypes, RecordHeader, + RecordKind, RetryStrategy, + }, + NetworkAddress, +}; +use bytes::Bytes; +use libp2p::kad::{Quorum, Record}; +use rand::{thread_rng, Rng}; +use self_encryption::{decrypt_full_set, DataMap, EncryptedChunk}; +use serde::{Deserialize, Serialize}; +use xor_name::XorName; + +pub use ant_protocol::storage::Chunk; + +use crate::{ + client::{payment::Receipt, utils::process_tasks_with_max_concurrency, GetError, PutError}, + self_encryption::DataMapLevel, + Client, +}; + +/// Number of retries to upload chunks. +pub(crate) const RETRY_ATTEMPTS: usize = 3; + +/// Number of chunks to upload in parallel. +/// +/// Can be overridden by the `CHUNK_UPLOAD_BATCH_SIZE` environment variable. +pub(crate) static CHUNK_UPLOAD_BATCH_SIZE: LazyLock = LazyLock::new(|| { + let batch_size = std::env::var("CHUNK_UPLOAD_BATCH_SIZE") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or( + std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1) + * 8, + ); + info!("Chunk upload batch size: {}", batch_size); + batch_size +}); + +/// Number of chunks to download in parallel. +/// +/// Can be overridden by the `CHUNK_DOWNLOAD_BATCH_SIZE` environment variable. +pub static CHUNK_DOWNLOAD_BATCH_SIZE: LazyLock = LazyLock::new(|| { + let batch_size = std::env::var("CHUNK_DOWNLOAD_BATCH_SIZE") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or( + std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1) + * 8, + ); + info!("Chunk download batch size: {}", batch_size); + batch_size +}); + +/// Raw Chunk Address (points to a [`Chunk`]) +pub type ChunkAddr = XorName; + +/// Private data on the network can be accessed with this +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct DataMapChunk(pub(crate) Chunk); + +impl DataMapChunk { + pub fn to_hex(&self) -> String { + hex::encode(self.0.value()) + } + + pub fn from_hex(hex: &str) -> Result { + let data = hex::decode(hex)?; + Ok(Self(Chunk::new(Bytes::from(data)))) + } + + /// Get a private address for [`DataMapChunk`]. Note that this is not a network address, it is only used for refering to private data client side. + pub fn address(&self) -> String { + hash_to_short_string(&self.to_hex()) + } +} + +impl From for DataMapChunk { + fn from(value: Chunk) -> Self { + Self(value) + } +} + +fn hash_to_short_string(input: &str) -> String { + let mut hasher = DefaultHasher::new(); + input.hash(&mut hasher); + let hash_value = hasher.finish(); + hash_value.to_string() +} + +impl Client { + /// Get a chunk from the network. + pub async fn chunk_get(&self, addr: ChunkAddr) -> Result { + info!("Getting chunk: {addr:?}"); + + let key = NetworkAddress::from_chunk_address(ChunkAddress::new(addr)).to_record_key(); + debug!("Fetching chunk from network at: {key:?}"); + let get_cfg = GetRecordCfg { + get_quorum: Quorum::One, + retry_strategy: None, + target_record: None, + expected_holders: HashSet::new(), + }; + + let record = self + .network + .get_record_from_network(key, &get_cfg) + .await + .inspect_err(|err| error!("Error fetching chunk: {err:?}"))?; + let header = RecordHeader::from_record(&record)?; + + if let Ok(true) = RecordHeader::is_record_of_type_chunk(&record) { + let chunk: Chunk = try_deserialize_record(&record)?; + Ok(chunk) + } else { + error!( + "Record kind mismatch: expected Chunk, got {:?}", + header.kind + ); + Err(NetworkError::RecordKindMismatch(RecordKind::DataOnly(DataTypes::Chunk)).into()) + } + } + + /// Upload chunks and retry failed uploads up to `RETRY_ATTEMPTS` times. + pub async fn upload_chunks_with_retries<'a>( + &self, + mut chunks: Vec<&'a Chunk>, + receipt: &Receipt, + ) -> Vec<(&'a Chunk, PutError)> { + let mut current_attempt: usize = 1; + + loop { + let mut upload_tasks = vec![]; + for chunk in chunks { + let self_clone = self.clone(); + let address = *chunk.address(); + + let Some((proof, _)) = receipt.get(chunk.name()) else { + debug!("Chunk at {address:?} was already paid for so skipping"); + continue; + }; + + upload_tasks.push(async move { + self_clone + .chunk_upload_with_payment(chunk, proof.clone()) + .await + .inspect_err(|err| error!("Error uploading chunk {address:?} :{err:?}")) + // Return chunk reference too, to re-use it next attempt/iteration + .map_err(|err| (chunk, err)) + }); + } + let uploads = + process_tasks_with_max_concurrency(upload_tasks, *CHUNK_UPLOAD_BATCH_SIZE).await; + + // Check for errors. + let total_uploads = uploads.len(); + let uploads_failed: Vec<_> = uploads.into_iter().filter_map(|up| up.err()).collect(); + info!( + "Uploaded {} chunks out of {total_uploads}", + total_uploads - uploads_failed.len() + ); + + // All uploads succeeded. + if uploads_failed.is_empty() { + return vec![]; + } + + // Max retries reached. + if current_attempt > RETRY_ATTEMPTS { + return uploads_failed; + } + + tracing::info!( + "Retrying putting {} failed chunks (attempt {current_attempt}/3)", + uploads_failed.len() + ); + + // Re-iterate over the failed chunks + chunks = uploads_failed.into_iter().map(|(chunk, _)| chunk).collect(); + current_attempt += 1; + } + } + + pub(crate) async fn chunk_upload_with_payment( + &self, + chunk: &Chunk, + payment: ProofOfPayment, + ) -> Result<(), PutError> { + let storing_nodes = payment.payees(); + + if storing_nodes.is_empty() { + return Err(PutError::PayeesMissing); + } + + debug!("Storing chunk: {chunk:?} to {:?}", storing_nodes); + + let key = chunk.network_address().to_record_key(); + + let record_kind = RecordKind::DataWithPayment(DataTypes::Chunk); + let record = Record { + key: key.clone(), + value: try_serialize_record(&(payment, chunk.clone()), record_kind) + .map_err(|e| { + PutError::Serialization(format!( + "Failed to serialize chunk with payment: {e:?}" + )) + })? + .to_vec(), + publisher: None, + expires: None, + }; + + let verification = { + let verification_cfg = GetRecordCfg { + get_quorum: Quorum::N(NonZero::new(2).expect("2 is non-zero")), + retry_strategy: Some(RetryStrategy::Balanced), + target_record: None, + expected_holders: Default::default(), + }; + + let stored_on_node = + try_serialize_record(&chunk, RecordKind::DataOnly(DataTypes::Chunk)) + .map_err(|e| { + PutError::Serialization(format!("Failed to serialize chunk: {e:?}")) + })? + .to_vec(); + let random_nonce = thread_rng().gen::(); + let expected_proof = ChunkProof::new(&stored_on_node, random_nonce); + + Some(( + VerificationKind::ChunkProof { + expected_proof, + nonce: random_nonce, + }, + verification_cfg, + )) + }; + + let put_cfg = PutRecordCfg { + put_quorum: Quorum::One, + retry_strategy: Some(RetryStrategy::Balanced), + use_put_record_to: Some(storing_nodes.clone()), + verification, + }; + let payment_upload = Ok(self.network.put_record(record, &put_cfg).await?); + debug!("Successfully stored chunk: {chunk:?} to {storing_nodes:?}"); + payment_upload + } + + /// Unpack a wrapped data map and fetch all bytes using self-encryption. + pub(crate) async fn fetch_from_data_map_chunk( + &self, + data_map_bytes: &Bytes, + ) -> Result { + let mut data_map_level: DataMapLevel = rmp_serde::from_slice(data_map_bytes) + .map_err(GetError::InvalidDataMap) + .inspect_err(|err| error!("Error deserializing data map: {err:?}"))?; + + loop { + let data_map = match &data_map_level { + DataMapLevel::First(map) => map, + DataMapLevel::Additional(map) => map, + }; + + let data = self.fetch_from_data_map(data_map).await?; + + match &data_map_level { + DataMapLevel::First(_) => break Ok(data), + DataMapLevel::Additional(_) => { + data_map_level = rmp_serde::from_slice(&data).map_err(|err| { + error!("Error deserializing data map: {err:?}"); + GetError::InvalidDataMap(err) + })?; + continue; + } + }; + } + } + + /// Fetch and decrypt all chunks in the data map. + pub(crate) async fn fetch_from_data_map(&self, data_map: &DataMap) -> Result { + debug!("Fetching encrypted data chunks from data map {data_map:?}"); + let mut download_tasks = vec![]; + for info in data_map.infos() { + download_tasks.push(async move { + match self + .chunk_get(info.dst_hash) + .await + .inspect_err(|err| error!("Error fetching chunk {:?}: {err:?}", info.dst_hash)) + { + Ok(chunk) => Ok(EncryptedChunk { + index: info.index, + content: chunk.value, + }), + Err(err) => { + error!("Error fetching chunk {:?}: {err:?}", info.dst_hash); + Err(err) + } + } + }); + } + debug!("Successfully fetched all the encrypted chunks"); + let encrypted_chunks = + process_tasks_with_max_concurrency(download_tasks, *CHUNK_DOWNLOAD_BATCH_SIZE) + .await + .into_iter() + .collect::, GetError>>()?; + + let data = decrypt_full_set(data_map, &encrypted_chunks).map_err(|e| { + error!("Error decrypting encrypted_chunks: {e:?}"); + GetError::Decryption(crate::self_encryption::Error::SelfEncryption(e)) + })?; + debug!("Successfully decrypted all the chunks"); + Ok(data) + } +} diff --git a/autonomi/src/client/graph.rs b/autonomi/src/client/data_types/graph.rs similarity index 97% rename from autonomi/src/client/graph.rs rename to autonomi/src/client/data_types/graph.rs index 7a03cc1e01..2e69e48c51 100644 --- a/autonomi/src/client/graph.rs +++ b/autonomi/src/client/data_types/graph.rs @@ -6,26 +6,23 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -use crate::client::data::PayError; +use crate::client::payment::PayError; +use crate::client::quote::CostError; use crate::client::Client; use crate::client::ClientEvent; use crate::client::UploadSummary; -use ant_evm::Amount; -use ant_evm::AttoTokens; -pub use ant_protocol::storage::GraphEntry; -use ant_protocol::storage::GraphEntryAddress; -pub use bls::SecretKey; - -use ant_evm::{EvmWallet, EvmWalletError}; +use ant_evm::{Amount, AttoTokens, EvmWallet, EvmWalletError}; use ant_networking::{GetRecordCfg, NetworkError, PutRecordCfg, VerificationKind}; +use ant_protocol::storage::GraphEntryAddress; use ant_protocol::{ storage::{try_serialize_record, DataTypes, RecordKind, RetryStrategy}, NetworkAddress, }; use libp2p::kad::{Quorum, Record}; -use super::data::CostError; +pub use ant_protocol::storage::GraphEntry; +pub use bls::SecretKey; #[derive(Debug, thiserror::Error)] pub enum GraphError { diff --git a/autonomi/src/client/data_types/mod.rs b/autonomi/src/client/data_types/mod.rs new file mode 100644 index 0000000000..0f4b4d8be1 --- /dev/null +++ b/autonomi/src/client/data_types/mod.rs @@ -0,0 +1,12 @@ +// Copyright 2025 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +pub mod chunk; +pub mod graph; +pub mod pointer; +pub mod scratchpad; diff --git a/autonomi/src/client/pointer.rs b/autonomi/src/client/data_types/pointer.rs similarity index 94% rename from autonomi/src/client/pointer.rs rename to autonomi/src/client/data_types/pointer.rs index 72fcc1c195..a8c0b5ef10 100644 --- a/autonomi/src/client/pointer.rs +++ b/autonomi/src/client/data_types/pointer.rs @@ -1,19 +1,15 @@ -use crate::client::data::PayError; -use crate::client::Client; -use tracing::{debug, error, trace}; - +use crate::client::{payment::PayError, quote::CostError, Client}; use ant_evm::{Amount, AttoTokens, EvmWallet, EvmWalletError}; use ant_networking::{GetRecordCfg, NetworkError, PutRecordCfg, VerificationKind}; use ant_protocol::{ - storage::{ - try_serialize_record, DataTypes, Pointer, PointerAddress, RecordKind, RetryStrategy, - }, + storage::{try_serialize_record, DataTypes, PointerAddress, RecordKind, RetryStrategy}, NetworkAddress, }; use bls::SecretKey; use libp2p::kad::{Quorum, Record}; +use tracing::{debug, error, trace}; -use super::data::CostError; +pub use ant_protocol::storage::Pointer; #[derive(Debug, thiserror::Error)] pub enum PointerError { @@ -56,7 +52,7 @@ impl Client { &self, pointer: Pointer, wallet: &EvmWallet, - ) -> Result<(), PointerError> { + ) -> Result { let address = pointer.network_address(); // pay for the pointer storage @@ -120,7 +116,7 @@ impl Client { error!("Failed to put record - pointer {address:?} to the network: {err}") })?; - Ok(()) + Ok(address) } /// Calculate the cost of storing a pointer diff --git a/autonomi/src/client/data_types/scratchpad.rs b/autonomi/src/client/data_types/scratchpad.rs new file mode 100644 index 0000000000..801fa7c0c2 --- /dev/null +++ b/autonomi/src/client/data_types/scratchpad.rs @@ -0,0 +1,281 @@ +// Copyright 2025 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +use crate::client::payment::PaymentOption; +use crate::client::PutError; +use crate::{client::quote::CostError, Client}; +use ant_evm::{Amount, AttoTokens}; +use ant_networking::{GetRecordCfg, GetRecordError, NetworkError, PutRecordCfg, VerificationKind}; +use ant_protocol::storage::{try_serialize_record, RecordKind, RetryStrategy}; +use ant_protocol::{ + storage::{try_deserialize_record, DataTypes, ScratchpadAddress}, + NetworkAddress, +}; +use libp2p::kad::{Quorum, Record}; +use std::collections::HashSet; + +pub use ant_protocol::storage::Scratchpad; +pub use bls::{PublicKey, SecretKey}; + +#[derive(Debug, thiserror::Error)] +pub enum ScratchpadError { + #[error("Scratchpad found at {0:?} was not a valid record.")] + CouldNotDeserializeScratchPad(ScratchpadAddress), + #[error("Network: {0}")] + Network(#[from] NetworkError), + #[error("Scratchpad not found")] + Missing, +} + +impl Client { + /// Get Scratchpad from the Network + /// It is stored at the owner's public key + pub async fn scratchpad_get( + &self, + public_key: &PublicKey, + ) -> Result { + let scratch_address = ScratchpadAddress::new(*public_key); + let network_address = NetworkAddress::from_scratchpad_address(scratch_address); + info!("Fetching scratchpad from network at {network_address:?}",); + let scratch_key = network_address.to_record_key(); + + let get_cfg = GetRecordCfg { + get_quorum: Quorum::Majority, + retry_strategy: None, + target_record: None, + expected_holders: HashSet::new(), + }; + + let pad = match self + .network + .get_record_from_network(scratch_key.clone(), &get_cfg) + .await + { + Ok(record) => { + debug!("Got scratchpad for {scratch_key:?}"); + try_deserialize_record::(&record) + .map_err(|_| ScratchpadError::CouldNotDeserializeScratchPad(scratch_address))? + } + Err(NetworkError::GetRecordError(GetRecordError::SplitRecord { result_map })) => { + debug!("Got multiple scratchpads for {scratch_key:?}"); + let mut pads = result_map + .values() + .map(|(record, _)| try_deserialize_record::(record)) + .collect::, _>>() + .map_err(|_| ScratchpadError::CouldNotDeserializeScratchPad(scratch_address))?; + + // take the latest versions + pads.sort_by_key(|s| s.count()); + let max_version = pads.last().map(|p| p.count()).unwrap_or_else(|| { + error!("Got empty scratchpad vector for {scratch_key:?}"); + u64::MAX + }); + let latest_pads: Vec<_> = pads + .into_iter() + .filter(|s| s.count() == max_version) + .collect(); + + // make sure we only have one of latest version + let pad = match &latest_pads[..] { + [one] => one, + [multi, ..] => { + error!("Got multiple conflicting scratchpads for {scratch_key:?} with the latest version, returning the first one"); + multi + } + [] => { + error!("Got empty scratchpad vector for {scratch_key:?}"); + return Err(ScratchpadError::Missing); + } + }; + pad.to_owned() + } + Err(e) => { + warn!("Failed to fetch scratchpad {network_address:?} from network: {e}"); + return Err(e)?; + } + }; + + Ok(pad) + } + + /// Returns the latest found version of the scratchpad for that secret key + /// If none is found, it creates a new one locally + /// Note that is does not upload that new scratchpad to the network, one would need to call [`Self::scratchpad_create`] to do so + /// Returns the scratchpad along with a boolean indicating if that scratchpad is new or not + pub async fn get_or_create_scratchpad( + &self, + public_key: &PublicKey, + content_type: u64, + ) -> Result<(Scratchpad, bool), PutError> { + let pad_res = self.scratchpad_get(public_key).await; + let mut is_new = true; + + let scratch = if let Ok(existing_data) = pad_res { + info!("Scratchpad already exists, returning existing data"); + + info!( + "scratch already exists, is version {:?}", + existing_data.count() + ); + + is_new = false; + + if existing_data.owner() != public_key { + return Err(PutError::ScratchpadBadOwner); + } + + existing_data + } else { + trace!("new scratchpad creation"); + Scratchpad::new(*public_key, content_type) + }; + + Ok((scratch, is_new)) + } + + /// Create a new scratchpad to the network + pub async fn scratchpad_create( + &self, + scratchpad: Scratchpad, + payment_option: PaymentOption, + ) -> Result { + let scratch_address = scratchpad.network_address(); + let scratch_key = scratch_address.to_record_key(); + + // pay for the scratchpad + let (receipt, _skipped_payments) = self + .pay_for_content_addrs( + DataTypes::Scratchpad.get_index(), + std::iter::once((scratchpad.xorname(), scratchpad.payload_size())), + payment_option, + ) + .await + .inspect_err(|err| { + error!("Failed to pay for new scratchpad at addr: {scratch_address:?} : {err}"); + })?; + + let (proof, price) = match receipt.values().next() { + Some(proof) => proof, + None => return Err(PutError::PaymentUnexpectedlyInvalid(scratch_address)), + }; + let total_cost = *price; + + let record = Record { + key: scratch_key, + value: try_serialize_record( + &(proof, scratchpad), + RecordKind::DataWithPayment(DataTypes::Scratchpad), + ) + .map_err(|_| { + PutError::Serialization("Failed to serialize scratchpad with payment".to_string()) + })? + .to_vec(), + publisher: None, + expires: None, + }; + + let put_cfg = PutRecordCfg { + put_quorum: Quorum::Majority, + retry_strategy: Some(RetryStrategy::Balanced), + use_put_record_to: None, + verification: Some(( + VerificationKind::Crdt, + GetRecordCfg { + get_quorum: Quorum::Majority, + retry_strategy: None, + target_record: None, + expected_holders: HashSet::new(), + }, + )), + }; + + debug!("Put record - scratchpad at {scratch_address:?} to the network"); + self.network + .put_record(record, &put_cfg) + .await + .inspect_err(|err| { + error!( + "Failed to put scratchpad {scratch_address:?} to the network with err: {err:?}" + ) + })?; + + Ok(total_cost) + } + + /// Update an existing scratchpad to the network + /// This operation is free but requires the scratchpad to be already created on the network + /// Only the latest version of the scratchpad is kept, make sure to update the scratchpad counter before calling this function + /// The method [`Scratchpad::update_and_sign`] should be used before calling this function to send the scratchpad to the network + pub async fn scratchpad_update(&self, scratchpad: Scratchpad) -> Result<(), PutError> { + let scratch_address = scratchpad.network_address(); + let scratch_key = scratch_address.to_record_key(); + + let put_cfg = PutRecordCfg { + put_quorum: Quorum::Majority, + retry_strategy: Some(RetryStrategy::Balanced), + use_put_record_to: None, + verification: Some(( + VerificationKind::Crdt, + GetRecordCfg { + get_quorum: Quorum::Majority, + retry_strategy: None, + target_record: None, + expected_holders: HashSet::new(), + }, + )), + }; + + let record = Record { + key: scratch_key, + value: try_serialize_record(&scratchpad, RecordKind::DataOnly(DataTypes::Scratchpad)) + .map_err(|_| PutError::Serialization("Failed to serialize scratchpad".to_string()))? + .to_vec(), + publisher: None, + expires: None, + }; + + debug!("Put record - scratchpad at {scratch_address:?} to the network"); + self.network + .put_record(record, &put_cfg) + .await + .inspect_err(|err| { + error!( + "Failed to put scratchpad {scratch_address:?} to the network with err: {err:?}" + ) + })?; + + Ok(()) + } + + /// Get the cost of creating a new Scratchpad + pub async fn scratchpad_cost(&self, owner: &SecretKey) -> Result { + info!("Getting cost for scratchpad"); + let client_pk = owner.public_key(); + let content_type = Default::default(); + let scratch = Scratchpad::new(client_pk, content_type); + let scratch_xor = scratch.address().xorname(); + + // TODO: define default size of Scratchpad + let store_quote = self + .get_store_quotes( + DataTypes::Scratchpad.get_index(), + std::iter::once((scratch_xor, 256)), + ) + .await?; + + let total_cost = AttoTokens::from_atto( + store_quote + .0 + .values() + .map(|quote| quote.price()) + .sum::(), + ); + + Ok(total_cost) + } +} diff --git a/autonomi/src/client/external_signer.rs b/autonomi/src/client/external_signer.rs index b05e958422..ef201083ca 100644 --- a/autonomi/src/client/external_signer.rs +++ b/autonomi/src/client/external_signer.rs @@ -1,4 +1,4 @@ -use crate::client::data::PutError; +use crate::client::PutError; use crate::self_encryption::encrypt; use crate::Client; use ant_evm::QuotePayment; diff --git a/autonomi/src/client/files/mod.rs b/autonomi/src/client/files/mod.rs deleted file mode 100644 index e53be148bf..0000000000 --- a/autonomi/src/client/files/mod.rs +++ /dev/null @@ -1,34 +0,0 @@ -use std::path::{Path, PathBuf}; - -pub mod archive; -pub mod archive_public; -pub mod fs; -pub mod fs_public; - -pub(crate) fn get_relative_file_path_from_abs_file_and_folder_path( - abs_file_pah: &Path, - abs_folder_path: &Path, -) -> PathBuf { - // check if the dir is a file - let is_file = abs_folder_path.is_file(); - - // could also be the file name - let dir_name = PathBuf::from( - abs_folder_path - .file_name() - .expect("Failed to get file/dir name"), - ); - - if is_file { - dir_name - } else { - let folder_prefix = abs_folder_path - .parent() - .unwrap_or(Path::new("")) - .to_path_buf(); - abs_file_pah - .strip_prefix(folder_prefix) - .expect("Could not strip prefix path") - .to_path_buf() - } -} diff --git a/autonomi/src/client/high_level/data/mod.rs b/autonomi/src/client/high_level/data/mod.rs new file mode 100644 index 0000000000..1e7de350fc --- /dev/null +++ b/autonomi/src/client/high_level/data/mod.rs @@ -0,0 +1,17 @@ +// Copyright 2025 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +use xor_name::XorName; + +/// Private data on the network, readable only if you have the DataMapChunk +pub mod private; +/// Public data on the network, readable by anyone with the DataAddr +pub mod public; + +/// Raw Data Address (points to a DataMap) +pub type DataAddr = XorName; diff --git a/autonomi/src/client/high_level/data/private.rs b/autonomi/src/client/high_level/data/private.rs new file mode 100644 index 0000000000..f3e6c3d8a7 --- /dev/null +++ b/autonomi/src/client/high_level/data/private.rs @@ -0,0 +1,140 @@ +// Copyright 2024 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +use ant_evm::Amount; +use ant_protocol::storage::DataTypes; +use bytes::Bytes; + +use crate::client::data_types::chunk::DataMapChunk; +use crate::client::payment::PaymentOption; +use crate::client::{ClientEvent, GetError, PutError, UploadSummary}; +use crate::{self_encryption::encrypt, Client}; + +impl Client { + /// Fetch a blob of (private) data from the network + /// + /// # Example + /// + /// ```no_run + /// use autonomi::{Client, Bytes}; + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box> { + /// # let client = Client::init().await?; + /// # let data_map = todo!(); + /// let data_fetched = client.data_get(data_map).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn data_get(&self, data_map: DataMapChunk) -> Result { + info!( + "Fetching private data from Data Map {:?}", + data_map.0.address() + ); + let data = self.fetch_from_data_map_chunk(data_map.0.value()).await?; + + debug!("Successfully fetched a blob of private data from the network"); + Ok(data) + } + + /// Upload a piece of private data to the network. This data will be self-encrypted. + /// The [`DataMapChunk`] is not uploaded to the network, keeping the data private. + /// + /// Returns the [`DataMapChunk`] containing the map to the encrypted chunks. + /// + /// # Example + /// + /// ```no_run + /// use autonomi::{Client, Bytes}; + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box> { + /// # let client = Client::init().await?; + /// # let wallet = todo!(); + /// let data = Bytes::from("Hello, World"); + /// let data_map = client.data_put(data, wallet).await?; + /// let data_fetched = client.data_get(data_map).await?; + /// assert_eq!(data, data_fetched); + /// # Ok(()) + /// # } + /// ``` + pub async fn data_put( + &self, + data: Bytes, + payment_option: PaymentOption, + ) -> Result { + let now = ant_networking::time::Instant::now(); + let (data_map_chunk, chunks) = encrypt(data)?; + debug!("Encryption took: {:.2?}", now.elapsed()); + + // Pay for all chunks + let xor_names: Vec<_> = chunks + .iter() + .map(|chunk| (*chunk.name(), chunk.serialised_size())) + .collect(); + info!("Paying for {} addresses", xor_names.len()); + let (receipt, skipped_payments) = self + .pay_for_content_addrs( + DataTypes::Chunk.get_index(), + xor_names.into_iter(), + payment_option, + ) + .await + .inspect_err(|err| error!("Error paying for data: {err:?}"))?; + + // Upload the chunks with the payments + debug!("Uploading {} chunks", chunks.len()); + + let mut failed_uploads = self + .upload_chunks_with_retries(chunks.iter().collect(), &receipt) + .await; + + // Return the last chunk upload error + if let Some(last_chunk_fail) = failed_uploads.pop() { + tracing::error!( + "Error uploading chunk ({:?}): {:?}", + last_chunk_fail.0.address(), + last_chunk_fail.1 + ); + return Err(last_chunk_fail.1); + } + + let record_count = chunks.len().saturating_sub(skipped_payments); + + // Reporting + if let Some(channel) = self.client_event_sender.as_ref() { + let tokens_spent = receipt + .values() + .map(|(_, cost)| cost.as_atto()) + .sum::(); + + let summary = UploadSummary { + records_paid: record_count, + records_already_paid: skipped_payments, + tokens_spent, + }; + if let Err(err) = channel.send(ClientEvent::UploadComplete(summary)).await { + error!("Failed to send client event: {err:?}"); + } + } + + Ok(DataMapChunk(data_map_chunk)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::client::data_types::chunk::Chunk; + + #[test] + fn test_hex() { + let data_map = DataMapChunk(Chunk::new(Bytes::from_static(b"hello"))); + let hex = data_map.to_hex(); + let data_map2 = DataMapChunk::from_hex(&hex).expect("Failed to decode hex"); + assert_eq!(data_map, data_map2); + } +} diff --git a/autonomi/src/client/data/public.rs b/autonomi/src/client/high_level/data/public.rs similarity index 56% rename from autonomi/src/client/data/public.rs rename to autonomi/src/client/high_level/data/public.rs index 6f8a9f2a51..4f20dd0f62 100644 --- a/autonomi/src/client/data/public.rs +++ b/autonomi/src/client/high_level/data/public.rs @@ -6,22 +6,16 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. +use ant_protocol::storage::DataTypes; use bytes::Bytes; -use libp2p::kad::Quorum; -use std::collections::HashSet; -use crate::client::payment::{PaymentOption, Receipt}; -use crate::client::utils::process_tasks_with_max_concurrency; -use crate::client::{ClientEvent, UploadSummary}; +use crate::client::payment::PaymentOption; +use crate::client::quote::CostError; +use crate::client::{ClientEvent, GetError, PutError, UploadSummary}; use crate::{self_encryption::encrypt, Client}; use ant_evm::{Amount, AttoTokens}; -use ant_networking::{GetRecordCfg, NetworkError}; -use ant_protocol::{ - storage::{try_deserialize_record, Chunk, ChunkAddress, DataTypes, RecordHeader, RecordKind}, - NetworkAddress, -}; -use super::*; +use super::DataAddr; impl Client { /// Fetch a blob of data from the network @@ -113,38 +107,6 @@ impl Client { Ok(map_xor_name) } - /// Get a raw chunk from the network. - pub async fn chunk_get(&self, addr: ChunkAddr) -> Result { - info!("Getting chunk: {addr:?}"); - - let key = NetworkAddress::from_chunk_address(ChunkAddress::new(addr)).to_record_key(); - debug!("Fetching chunk from network at: {key:?}"); - let get_cfg = GetRecordCfg { - get_quorum: Quorum::One, - retry_strategy: None, - target_record: None, - expected_holders: HashSet::new(), - }; - - let record = self - .network - .get_record_from_network(key, &get_cfg) - .await - .inspect_err(|err| error!("Error fetching chunk: {err:?}"))?; - let header = RecordHeader::from_record(&record)?; - - if let Ok(true) = RecordHeader::is_record_of_type_chunk(&record) { - let chunk: Chunk = try_deserialize_record(&record)?; - Ok(chunk) - } else { - error!( - "Record kind mismatch: expected Chunk, got {:?}", - header.kind - ); - Err(NetworkError::RecordKindMismatch(RecordKind::DataOnly(DataTypes::Chunk)).into()) - } - } - /// Get the estimated cost of storing a piece of data. pub async fn data_cost(&self, data: Bytes) -> Result { let now = ant_networking::time::Instant::now(); @@ -179,64 +141,4 @@ impl Client { Ok(total_cost) } - - // Upload chunks and retry failed uploads up to `RETRY_ATTEMPTS` times. - pub async fn upload_chunks_with_retries<'a>( - &self, - mut chunks: Vec<&'a Chunk>, - receipt: &Receipt, - ) -> Vec<(&'a Chunk, PutError)> { - let mut current_attempt: usize = 1; - - loop { - let mut upload_tasks = vec![]; - for chunk in chunks { - let self_clone = self.clone(); - let address = *chunk.address(); - - let Some((proof, _)) = receipt.get(chunk.name()) else { - debug!("Chunk at {address:?} was already paid for so skipping"); - continue; - }; - - upload_tasks.push(async move { - self_clone - .chunk_upload_with_payment(chunk, proof.clone()) - .await - .inspect_err(|err| error!("Error uploading chunk {address:?} :{err:?}")) - // Return chunk reference too, to re-use it next attempt/iteration - .map_err(|err| (chunk, err)) - }); - } - let uploads = - process_tasks_with_max_concurrency(upload_tasks, *CHUNK_UPLOAD_BATCH_SIZE).await; - - // Check for errors. - let total_uploads = uploads.len(); - let uploads_failed: Vec<_> = uploads.into_iter().filter_map(|up| up.err()).collect(); - info!( - "Uploaded {} chunks out of {total_uploads}", - total_uploads - uploads_failed.len() - ); - - // All uploads succeeded. - if uploads_failed.is_empty() { - return vec![]; - } - - // Max retries reached. - if current_attempt > RETRY_ATTEMPTS { - return uploads_failed; - } - - tracing::info!( - "Retrying putting {} failed chunks (attempt {current_attempt}/3)", - uploads_failed.len() - ); - - // Re-iterate over the failed chunks - chunks = uploads_failed.into_iter().map(|(chunk, _)| chunk).collect(); - current_attempt += 1; - } - } } diff --git a/autonomi/src/client/files/archive.rs b/autonomi/src/client/high_level/files/archive_private.rs similarity index 81% rename from autonomi/src/client/files/archive.rs rename to autonomi/src/client/high_level/files/archive_private.rs index 19e9642191..29b3b0ab83 100644 --- a/autonomi/src/client/files/archive.rs +++ b/autonomi/src/client/high_level/files/archive_private.rs @@ -15,51 +15,19 @@ use ant_networking::time::{Duration, SystemTime, UNIX_EPOCH}; use crate::{ client::{ - data::{DataMapChunk, GetError, PutError}, - payment::PaymentOption, + data_types::chunk::DataMapChunk, high_level::files::RenameError, payment::PaymentOption, + GetError, PutError, }, Client, }; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use thiserror::Error; + +use super::Metadata; /// Private archive data map, allowing access to the [`PrivateArchive`] data. pub type PrivateArchiveAccess = DataMapChunk; -#[derive(Error, Debug, PartialEq, Eq)] -pub enum RenameError { - #[error("File not found in archive: {0}")] - FileNotFound(PathBuf), -} - -/// Metadata for a file in an archive. Time values are UNIX timestamps. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct Metadata { - /// File creation time on local file system. See [`std::fs::Metadata::created`] for details per OS. - pub created: u64, - /// Last file modification time taken from local file system. See [`std::fs::Metadata::modified`] for details per OS. - pub modified: u64, - /// File size in bytes - pub size: u64, -} - -impl Metadata { - /// Create a new metadata struct with the current time as uploaded, created and modified. - pub fn new_with_size(size: u64) -> Self { - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or(Duration::from_secs(0)) - .as_secs(); - - Self { - created: now, - modified: now, - size, - } - } -} - /// Directory structure mapping filepaths to their data maps and metadata. /// /// The data maps are stored within this structure instead of uploading them to the network, keeping the data private. diff --git a/autonomi/src/client/files/archive_public.rs b/autonomi/src/client/high_level/files/archive_public.rs similarity index 95% rename from autonomi/src/client/files/archive_public.rs rename to autonomi/src/client/high_level/files/archive_public.rs index bf8cb44bff..fc49789c3b 100644 --- a/autonomi/src/client/files/archive_public.rs +++ b/autonomi/src/client/high_level/files/archive_public.rs @@ -18,19 +18,21 @@ use bytes::Bytes; use serde::{Deserialize, Serialize}; use xor_name::XorName; -use super::archive::Metadata; use crate::{ client::{ - data::{CostError, DataAddr, GetError, PutError}, - files::archive::RenameError, + high_level::{data::DataAddr, files::RenameError}, + quote::CostError, + GetError, PutError, }, Client, }; +use super::Metadata; + /// The address of a public archive on the network. Points to an [`PublicArchive`]. pub type ArchiveAddr = XorName; -/// Public variant of [`crate::client::files::archive::PrivateArchive`]. Differs in that data maps of files are uploaded +/// Public variant of [`crate::client::files::archive_private::PrivateArchive`]. Differs in that data maps of files are uploaded /// to the network, of which the addresses are stored in this archive. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] pub struct PublicArchive { @@ -138,7 +140,7 @@ impl Client { /// Create simple archive containing `file.txt` pointing to random XOR name. /// /// ```no_run - /// # use autonomi::{Client, client::{data::DataAddr, files::{archive::Metadata, archive_public::{PublicArchive, ArchiveAddr}}}}; + /// # use autonomi::{Client, client::{data::DataAddr, files::{Metadata, archive_public::{PublicArchive, ArchiveAddr}}}}; /// # use std::path::PathBuf; /// # #[tokio::main] /// # async fn main() -> Result<(), Box> { diff --git a/autonomi/src/client/files/fs.rs b/autonomi/src/client/high_level/files/fs_private.rs similarity index 70% rename from autonomi/src/client/files/fs.rs rename to autonomi/src/client/high_level/files/fs_private.rs index 2428f2d344..54f6f40693 100644 --- a/autonomi/src/client/files/fs.rs +++ b/autonomi/src/client/high_level/files/fs_private.rs @@ -14,72 +14,15 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -use super::archive::{PrivateArchive, PrivateArchiveAccess}; -use crate::client::data::{CostError, DataMapChunk, GetError, PutError}; -use crate::client::files::get_relative_file_path_from_abs_file_and_folder_path; -use crate::client::utils::process_tasks_with_max_concurrency; +use super::archive_private::{PrivateArchive, PrivateArchiveAccess}; +use super::{get_relative_file_path_from_abs_file_and_folder_path, FILE_UPLOAD_BATCH_SIZE}; +use super::{DownloadError, UploadError}; + use crate::client::Client; +use crate::client::{data_types::chunk::DataMapChunk, utils::process_tasks_with_max_concurrency}; use ant_evm::EvmWallet; use bytes::Bytes; -use std::{path::PathBuf, sync::LazyLock}; - -/// Number of files to upload in parallel. -/// -/// Can be overridden by the `FILE_UPLOAD_BATCH_SIZE` environment variable. -pub static FILE_UPLOAD_BATCH_SIZE: LazyLock = LazyLock::new(|| { - let batch_size = std::env::var("FILE_UPLOAD_BATCH_SIZE") - .ok() - .and_then(|s| s.parse().ok()) - .unwrap_or( - std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(1) - * 8, - ); - info!("File upload batch size: {}", batch_size); - batch_size -}); - -/// Errors that can occur during the file upload operation. -#[derive(Debug, thiserror::Error)] -pub enum UploadError { - #[error("Failed to recursively traverse directory")] - WalkDir(#[from] walkdir::Error), - #[error("Input/output failure")] - IoError(#[from] std::io::Error), - #[error("Failed to upload file")] - PutError(#[from] PutError), - #[error("Failed to fetch file")] - GetError(#[from] GetError), - #[error("Failed to serialize")] - Serialization(#[from] rmp_serde::encode::Error), - #[error("Failed to deserialize")] - Deserialization(#[from] rmp_serde::decode::Error), -} - -/// Errors that can occur during the download operation. -#[derive(Debug, thiserror::Error)] -pub enum DownloadError { - #[error("Failed to download file")] - GetError(#[from] GetError), - #[error("IO failure")] - IoError(#[from] std::io::Error), -} - -/// Errors that can occur during the file cost calculation. -#[derive(Debug, thiserror::Error)] -pub enum FileCostError { - #[error("Cost error: {0}")] - Cost(#[from] CostError), - #[error("IO failure")] - IoError(#[from] std::io::Error), - #[error("Serialization error")] - Serialization(#[from] rmp_serde::encode::Error), - #[error("Self encryption error")] - SelfEncryption(#[from] crate::self_encryption::Error), - #[error("Walkdir error")] - WalkDir(#[from] walkdir::Error), -} +use std::path::PathBuf; impl Client { /// Download a private file from network to local file system diff --git a/autonomi/src/client/files/fs_public.rs b/autonomi/src/client/high_level/files/fs_public.rs similarity index 96% rename from autonomi/src/client/files/fs_public.rs rename to autonomi/src/client/high_level/files/fs_public.rs index 92e5e5455b..77cb07506f 100644 --- a/autonomi/src/client/files/fs_public.rs +++ b/autonomi/src/client/high_level/files/fs_public.rs @@ -7,12 +7,12 @@ // permissions and limitations relating to use of the SAFE Network Software. use super::archive_public::{ArchiveAddr, PublicArchive}; -use super::fs::*; -use crate::client::data::DataAddr; -use crate::client::files::archive::Metadata; -use crate::client::files::get_relative_file_path_from_abs_file_and_folder_path; -use crate::client::utils::process_tasks_with_max_concurrency; +use super::{DownloadError, FileCostError, Metadata, UploadError}; +use crate::client::high_level::files::{ + get_relative_file_path_from_abs_file_and_folder_path, FILE_UPLOAD_BATCH_SIZE, +}; use crate::client::Client; +use crate::client::{high_level::data::DataAddr, utils::process_tasks_with_max_concurrency}; use ant_evm::EvmWallet; use ant_networking::time::{Duration, SystemTime}; use bytes::Bytes; diff --git a/autonomi/src/client/high_level/files/mod.rs b/autonomi/src/client/high_level/files/mod.rs new file mode 100644 index 0000000000..0b0d1b82a1 --- /dev/null +++ b/autonomi/src/client/high_level/files/mod.rs @@ -0,0 +1,133 @@ +use serde::{Deserialize, Serialize}; +use std::{ + path::{Path, PathBuf}, + sync::LazyLock, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; +use thiserror::Error; + +use crate::client::{quote::CostError, GetError, PutError}; + +pub mod archive_private; +pub mod archive_public; +pub mod fs_private; +pub mod fs_public; + +/// Number of files to upload in parallel. +/// +/// Can be overridden by the `FILE_UPLOAD_BATCH_SIZE` environment variable. +pub static FILE_UPLOAD_BATCH_SIZE: LazyLock = LazyLock::new(|| { + let batch_size = std::env::var("FILE_UPLOAD_BATCH_SIZE") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or( + std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1) + * 8, + ); + info!("File upload batch size: {}", batch_size); + batch_size +}); + +/// Metadata for a file in an archive. Time values are UNIX timestamps. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct Metadata { + /// File creation time on local file system. See [`std::fs::Metadata::created`] for details per OS. + pub created: u64, + /// Last file modification time taken from local file system. See [`std::fs::Metadata::modified`] for details per OS. + pub modified: u64, + /// File size in bytes + pub size: u64, +} + +impl Metadata { + /// Create a new metadata struct with the current time as uploaded, created and modified. + pub fn new_with_size(size: u64) -> Self { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::from_secs(0)) + .as_secs(); + + Self { + created: now, + modified: now, + size, + } + } +} + +#[derive(Error, Debug, PartialEq, Eq)] +pub enum RenameError { + #[error("File not found in archive: {0}")] + FileNotFound(PathBuf), +} + +/// Errors that can occur during the file upload operation. +#[derive(Debug, thiserror::Error)] +pub enum UploadError { + #[error("Failed to recursively traverse directory")] + WalkDir(#[from] walkdir::Error), + #[error("Input/output failure")] + IoError(#[from] std::io::Error), + #[error("Failed to upload file")] + PutError(#[from] PutError), + #[error("Failed to fetch file")] + GetError(#[from] GetError), + #[error("Failed to serialize")] + Serialization(#[from] rmp_serde::encode::Error), + #[error("Failed to deserialize")] + Deserialization(#[from] rmp_serde::decode::Error), +} + +/// Errors that can occur during the download operation. +#[derive(Debug, thiserror::Error)] +pub enum DownloadError { + #[error("Failed to download file")] + GetError(#[from] GetError), + #[error("IO failure")] + IoError(#[from] std::io::Error), +} + +/// Errors that can occur during the file cost calculation. +#[derive(Debug, thiserror::Error)] +pub enum FileCostError { + #[error("Cost error: {0}")] + Cost(#[from] CostError), + #[error("IO failure")] + IoError(#[from] std::io::Error), + #[error("Serialization error")] + Serialization(#[from] rmp_serde::encode::Error), + #[error("Self encryption error")] + SelfEncryption(#[from] crate::self_encryption::Error), + #[error("Walkdir error")] + WalkDir(#[from] walkdir::Error), +} + +pub(crate) fn get_relative_file_path_from_abs_file_and_folder_path( + abs_file_pah: &Path, + abs_folder_path: &Path, +) -> PathBuf { + // check if the dir is a file + let is_file = abs_folder_path.is_file(); + + // could also be the file name + let dir_name = PathBuf::from( + abs_folder_path + .file_name() + .expect("Failed to get file/dir name"), + ); + + if is_file { + dir_name + } else { + let folder_prefix = abs_folder_path + .parent() + .unwrap_or(Path::new("")) + .to_path_buf(); + abs_file_pah + .strip_prefix(folder_prefix) + .expect("Could not strip prefix path") + .to_path_buf() + } +} diff --git a/autonomi/src/client/high_level/mod.rs b/autonomi/src/client/high_level/mod.rs new file mode 100644 index 0000000000..5d44ae97d0 --- /dev/null +++ b/autonomi/src/client/high_level/mod.rs @@ -0,0 +1,11 @@ +// Copyright 2024 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +pub mod data; +pub mod files; +pub mod vault; diff --git a/autonomi/src/client/vault/key.rs b/autonomi/src/client/high_level/vault/key.rs similarity index 100% rename from autonomi/src/client/vault/key.rs rename to autonomi/src/client/high_level/vault/key.rs diff --git a/autonomi/src/client/high_level/vault/mod.rs b/autonomi/src/client/high_level/vault/mod.rs new file mode 100644 index 0000000000..293df4942a --- /dev/null +++ b/autonomi/src/client/high_level/vault/mod.rs @@ -0,0 +1,101 @@ +// Copyright 2024 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +pub mod key; +pub mod user_data; + +pub use key::{derive_vault_key, VaultSecretKey}; +pub use user_data::UserData; + +use crate::client::data_types::scratchpad::ScratchpadError; +use crate::client::payment::PaymentOption; +use crate::client::quote::CostError; +use crate::client::{Client, PutError}; +use ant_evm::AttoTokens; +use ant_protocol::Bytes; +use std::hash::{DefaultHasher, Hash, Hasher}; +use tracing::info; + +/// The content type of the vault data +/// The number is used to determine the type of the contents of the bytes contained in a vault +/// Custom apps can use this to store their own custom types of data in vaults +/// It is recommended to use the hash of the app name or an unique identifier as the content type using [`app_name_to_vault_content_type`] +/// The value 0 is reserved for tests +pub type VaultContentType = u64; + +/// For custom apps using Scratchpad, this function converts an app identifier or name to a [`VaultContentType`] +pub fn app_name_to_vault_content_type(s: T) -> VaultContentType { + let mut hasher = DefaultHasher::new(); + s.hash(&mut hasher); + hasher.finish() +} + +#[derive(Debug, thiserror::Error)] +pub enum VaultError { + #[error("Vault error: {0}")] + Scratchpad(#[from] ScratchpadError), + #[error("Protocol: {0}")] + Protocol(#[from] ant_protocol::Error), +} + +impl Client { + /// Retrieves and returns a decrypted vault if one exists. + /// Returns the content type of the bytes in the vault + pub async fn fetch_and_decrypt_vault( + &self, + secret_key: &VaultSecretKey, + ) -> Result<(Bytes, VaultContentType), VaultError> { + info!("Fetching and decrypting vault..."); + let public_key = secret_key.public_key(); + let pad = self.scratchpad_get(&public_key).await?; + + let data = pad.decrypt_data(secret_key)?; + debug!("vault data is successfully fetched and decrypted"); + Ok((data, pad.data_encoding())) + } + + /// Get the cost of creating a new vault + pub async fn vault_cost(&self, owner: &VaultSecretKey) -> Result { + info!("Getting cost for vault"); + self.scratchpad_cost(owner).await + } + + /// Put data into the client's VaultPacket + /// + /// Pays for a new VaultPacket if none yet created for the client. + /// Provide the bytes to be written to the vault and the content type of those bytes. + /// It is recommended to use the hash of the app name or unique identifier as the content type. + pub async fn write_bytes_to_vault( + &self, + data: Bytes, + payment_option: PaymentOption, + secret_key: &VaultSecretKey, + content_type: VaultContentType, + ) -> Result { + let public_key = secret_key.public_key(); + let (mut scratch, is_new) = self + .get_or_create_scratchpad(&public_key, content_type) + .await?; + + let _ = scratch.update_and_sign(data, secret_key); + debug_assert!(scratch.is_valid(), "Must be valid after being signed. This is a bug, please report it by opening an issue on our github"); + + let scratch_address = scratch.network_address(); + + info!("Writing to vault at {scratch_address:?}"); + + let total_cost = if is_new { + self.scratchpad_create(scratch, payment_option).await? + } else { + self.scratchpad_update(scratch).await?; + AttoTokens::zero() + }; + + Ok(total_cost) + } +} diff --git a/autonomi/src/client/vault/user_data.rs b/autonomi/src/client/high_level/vault/user_data.rs similarity index 94% rename from autonomi/src/client/vault/user_data.rs rename to autonomi/src/client/high_level/vault/user_data.rs index e4f564db61..2572383bde 100644 --- a/autonomi/src/client/vault/user_data.rs +++ b/autonomi/src/client/high_level/vault/user_data.rs @@ -8,19 +8,18 @@ use std::collections::HashMap; -use crate::client::data::GetError; -use crate::client::data::PutError; -use crate::client::files::archive::PrivateArchiveAccess; -use crate::client::files::archive_public::ArchiveAddr; +use crate::client::high_level::files::archive_private::PrivateArchiveAccess; +use crate::client::high_level::files::archive_public::ArchiveAddr; use crate::client::payment::PaymentOption; -use crate::client::vault::VaultError; -use crate::client::vault::{app_name_to_vault_content_type, VaultContentType, VaultSecretKey}; use crate::client::Client; +use crate::client::{GetError, PutError}; use ant_evm::AttoTokens; use ant_protocol::Bytes; use serde::{Deserialize, Serialize}; use std::sync::LazyLock; +use super::{app_name_to_vault_content_type, VaultContentType, VaultError, VaultSecretKey}; + /// Vault content type for UserDataVault pub static USER_DATA_VAULT_CONTENT_IDENTIFIER: LazyLock = LazyLock::new(|| app_name_to_vault_content_type("UserData")); diff --git a/autonomi/src/client/mod.rs b/autonomi/src/client/mod.rs index 06f434b76b..e3eb72ddfd 100644 --- a/autonomi/src/client/mod.rs +++ b/autonomi/src/client/mod.rs @@ -9,19 +9,31 @@ // Optionally enable nightly `doc_cfg`. Allows items to be annotated, e.g.: "Available on crate feature X only". #![cfg_attr(docsrs, feature(doc_cfg))] +/// The 4 basic Network data types. +/// - Chunk +/// - GraphEntry +/// - Pointer +/// - Scratchpad +pub mod data_types; +pub use data_types::chunk; +pub use data_types::graph; +pub use data_types::pointer; +pub use data_types::scratchpad; + +/// High-level types built on top of the basic Network data types. +/// Includes data, files and personnal data vaults +mod high_level; +pub use high_level::data; +pub use high_level::files; +pub use high_level::vault; + pub mod address; pub mod payment; pub mod quote; -pub mod data; -pub mod files; -pub mod graph; -pub mod pointer; - #[cfg(feature = "external-signer")] #[cfg_attr(docsrs, doc(cfg(feature = "external-signer")))] pub mod external_signer; -pub mod vault; // private module with utility functions mod rate_limiter; @@ -30,9 +42,13 @@ mod utils; use ant_bootstrap::{BootstrapCacheConfig, BootstrapCacheStore, PeersArgs}; pub use ant_evm::Amount; use ant_evm::EvmNetwork; -use ant_networking::{interval, multiaddr_is_global, Network, NetworkBuilder, NetworkEvent}; -use ant_protocol::version::IDENTIFY_PROTOCOL_STR; +use ant_networking::{ + interval, multiaddr_is_global, Network, NetworkBuilder, NetworkError, NetworkEvent, +}; +use ant_protocol::{version::IDENTIFY_PROTOCOL_STR, NetworkAddress}; use libp2p::{identity::Keypair, Multiaddr}; +use payment::PayError; +use quote::CostError; use std::{collections::HashSet, sync::Arc, time::Duration}; use tokio::sync::mpsc; @@ -107,6 +123,44 @@ pub enum ConnectError { Bootstrap(#[from] ant_bootstrap::Error), } +/// Errors that can occur during the put operation. +#[derive(Debug, thiserror::Error)] +pub enum PutError { + #[error("Failed to self-encrypt data.")] + SelfEncryption(#[from] crate::self_encryption::Error), + #[error("A network error occurred.")] + Network(#[from] NetworkError), + #[error("Error occurred during cost estimation.")] + CostError(#[from] CostError), + #[error("Error occurred during payment.")] + PayError(#[from] PayError), + #[error("Serialization error: {0}")] + Serialization(String), + #[error("A wallet error occurred.")] + Wallet(#[from] ant_evm::EvmError), + #[error("The owner key does not match the client's public key")] + ScratchpadBadOwner, + #[error("Payment unexpectedly invalid for {0:?}")] + PaymentUnexpectedlyInvalid(NetworkAddress), + #[error("The payment proof contains no payees.")] + PayeesMissing, +} + +/// Errors that can occur during the get operation. +#[derive(Debug, thiserror::Error)] +pub enum GetError { + #[error("Could not deserialize data map.")] + InvalidDataMap(rmp_serde::decode::Error), + #[error("Failed to decrypt data.")] + Decryption(crate::self_encryption::Error), + #[error("Failed to deserialize")] + Deserialization(#[from] rmp_serde::decode::Error), + #[error("General networking error: {0:?}")] + Network(#[from] NetworkError), + #[error("General protocol error: {0:?}")] + Protocol(#[from] ant_protocol::Error), +} + impl Client { /// Initialize the client with default configuration. /// diff --git a/autonomi/src/client/payment.rs b/autonomi/src/client/payment.rs index 2e693088cb..ca928c6d07 100644 --- a/autonomi/src/client/payment.rs +++ b/autonomi/src/client/payment.rs @@ -1,15 +1,27 @@ -use crate::client::data::PayError; use crate::client::quote::StoreQuote; use crate::Client; -use ant_evm::{AttoTokens, EncodedPeerId, EvmWallet, ProofOfPayment}; +use ant_evm::{AttoTokens, EncodedPeerId, EvmWallet, EvmWalletError, ProofOfPayment}; use std::collections::HashMap; use xor_name::XorName; -use super::utils::AlreadyPaidAddressesCount; +use super::quote::CostError; /// Contains the proof of payments for each XOR address and the amount paid pub type Receipt = HashMap; +pub type AlreadyPaidAddressesCount = usize; + +/// Errors that can occur during the pay operation. +#[derive(Debug, thiserror::Error)] +pub enum PayError { + #[error("Wallet error: {0:?}")] + EvmWalletError(#[from] EvmWalletError), + #[error("Failed to self-encrypt data.")] + SelfEncryption(#[from] crate::self_encryption::Error), + #[error("Cost error: {0:?}")] + Cost(#[from] CostError), +} + pub fn receipt_from_store_quotes(quotes: StoreQuote) -> Receipt { let mut receipt = Receipt::new(); @@ -77,4 +89,43 @@ impl Client { PaymentOption::Receipt(receipt) => Ok((receipt, 0)), } } + + /// Pay for the chunks and get the proof of payment. + pub(crate) async fn pay( + &self, + data_type: u32, + content_addrs: impl Iterator + Clone, + wallet: &EvmWallet, + ) -> Result<(Receipt, AlreadyPaidAddressesCount), PayError> { + let number_of_content_addrs = content_addrs.clone().count(); + let quotes = self.get_store_quotes(data_type, content_addrs).await?; + + // Make sure nobody else can use the wallet while we are paying + debug!("Waiting for wallet lock"); + let lock_guard = wallet.lock().await; + debug!("Locked wallet"); + + // TODO: the error might contain some succeeded quote payments as well. These should be returned on err, so that they can be skipped when retrying. + // TODO: retry when it fails? + // Execute chunk payments + let _payments = wallet + .pay_for_quotes(quotes.payments()) + .await + .map_err(|err| PayError::from(err.0))?; + + // payment is done, unlock the wallet for other threads + drop(lock_guard); + debug!("Unlocked wallet"); + + let skipped_chunks = number_of_content_addrs - quotes.len(); + trace!( + "Chunk payments of {} chunks completed. {} chunks were free / already paid for", + quotes.len(), + skipped_chunks + ); + + let receipt = receipt_from_store_quotes(quotes); + + Ok((receipt, skipped_chunks)) + } } diff --git a/autonomi/src/client/quote.rs b/autonomi/src/client/quote.rs index a98f64d050..6a64c40c16 100644 --- a/autonomi/src/client/quote.rs +++ b/autonomi/src/client/quote.rs @@ -6,7 +6,7 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -use super::{data::CostError, Client}; +use super::Client; use crate::client::rate_limiter::RateLimiter; use ant_evm::payment_vault::get_market_price; use ant_evm::{Amount, EvmNetwork, PaymentQuote, QuotePayment, QuotingMetrics}; @@ -52,6 +52,23 @@ impl StoreQuote { } } +/// Errors that can occur during the cost calculation. +#[derive(Debug, thiserror::Error)] +pub enum CostError { + #[error("Failed to self-encrypt data.")] + SelfEncryption(#[from] crate::self_encryption::Error), + #[error("Could not get store quote for: {0:?} after several retries")] + CouldNotGetStoreQuote(XorName), + #[error("Could not get store costs: {0:?}")] + CouldNotGetStoreCosts(NetworkError), + #[error("Not enough node quotes for {0:?}, got: {1:?} and need at least {2:?}")] + NotEnoughNodeQuotes(XorName, usize, usize), + #[error("Failed to serialize {0}")] + Serialization(String), + #[error("Market price error: {0:?}")] + MarketPriceError(#[from] ant_evm::payment_vault::error::Error), +} + impl Client { pub async fn get_store_quotes( &self, diff --git a/autonomi/src/client/utils.rs b/autonomi/src/client/utils.rs index 13567984c2..af123dca2a 100644 --- a/autonomi/src/client/utils.rs +++ b/autonomi/src/client/utils.rs @@ -6,202 +6,8 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -use crate::client::payment::{receipt_from_store_quotes, Receipt}; -use ant_evm::{EvmWallet, ProofOfPayment}; -use ant_networking::{GetRecordCfg, PutRecordCfg, VerificationKind}; -use ant_protocol::{ - messages::ChunkProof, - storage::{try_serialize_record, Chunk, DataTypes, RecordKind, RetryStrategy}, -}; -use bytes::Bytes; use futures::stream::{FuturesUnordered, StreamExt}; -use libp2p::kad::{Quorum, Record}; -use rand::{thread_rng, Rng}; -use self_encryption::{decrypt_full_set, DataMap, EncryptedChunk}; -use std::{future::Future, num::NonZero}; -use xor_name::XorName; - -use super::{ - data::{GetError, PayError, PutError, CHUNK_DOWNLOAD_BATCH_SIZE}, - Client, -}; -use crate::self_encryption::DataMapLevel; - -pub type AlreadyPaidAddressesCount = usize; - -impl Client { - /// Fetch and decrypt all chunks in the data map. - pub(crate) async fn fetch_from_data_map(&self, data_map: &DataMap) -> Result { - debug!("Fetching encrypted data chunks from data map {data_map:?}"); - let mut download_tasks = vec![]; - for info in data_map.infos() { - download_tasks.push(async move { - match self - .chunk_get(info.dst_hash) - .await - .inspect_err(|err| error!("Error fetching chunk {:?}: {err:?}", info.dst_hash)) - { - Ok(chunk) => Ok(EncryptedChunk { - index: info.index, - content: chunk.value, - }), - Err(err) => { - error!("Error fetching chunk {:?}: {err:?}", info.dst_hash); - Err(err) - } - } - }); - } - debug!("Successfully fetched all the encrypted chunks"); - let encrypted_chunks = - process_tasks_with_max_concurrency(download_tasks, *CHUNK_DOWNLOAD_BATCH_SIZE) - .await - .into_iter() - .collect::, GetError>>()?; - - let data = decrypt_full_set(data_map, &encrypted_chunks).map_err(|e| { - error!("Error decrypting encrypted_chunks: {e:?}"); - GetError::Decryption(crate::self_encryption::Error::SelfEncryption(e)) - })?; - debug!("Successfully decrypted all the chunks"); - Ok(data) - } - - /// Unpack a wrapped data map and fetch all bytes using self-encryption. - pub(crate) async fn fetch_from_data_map_chunk( - &self, - data_map_bytes: &Bytes, - ) -> Result { - let mut data_map_level: DataMapLevel = rmp_serde::from_slice(data_map_bytes) - .map_err(GetError::InvalidDataMap) - .inspect_err(|err| error!("Error deserializing data map: {err:?}"))?; - - loop { - let data_map = match &data_map_level { - DataMapLevel::First(map) => map, - DataMapLevel::Additional(map) => map, - }; - - let data = self.fetch_from_data_map(data_map).await?; - - match &data_map_level { - DataMapLevel::First(_) => break Ok(data), - DataMapLevel::Additional(_) => { - data_map_level = rmp_serde::from_slice(&data).map_err(|err| { - error!("Error deserializing data map: {err:?}"); - GetError::InvalidDataMap(err) - })?; - continue; - } - }; - } - } - - pub(crate) async fn chunk_upload_with_payment( - &self, - chunk: &Chunk, - payment: ProofOfPayment, - ) -> Result<(), PutError> { - let storing_nodes = payment.payees(); - - if storing_nodes.is_empty() { - return Err(PutError::PayeesMissing); - } - - debug!("Storing chunk: {chunk:?} to {:?}", storing_nodes); - - let key = chunk.network_address().to_record_key(); - - let record_kind = RecordKind::DataWithPayment(DataTypes::Chunk); - let record = Record { - key: key.clone(), - value: try_serialize_record(&(payment, chunk.clone()), record_kind) - .map_err(|e| { - PutError::Serialization(format!( - "Failed to serialize chunk with payment: {e:?}" - )) - })? - .to_vec(), - publisher: None, - expires: None, - }; - - let verification = { - let verification_cfg = GetRecordCfg { - get_quorum: Quorum::N(NonZero::new(2).expect("2 is non-zero")), - retry_strategy: Some(RetryStrategy::Balanced), - target_record: None, - expected_holders: Default::default(), - }; - - let stored_on_node = - try_serialize_record(&chunk, RecordKind::DataOnly(DataTypes::Chunk)) - .map_err(|e| { - PutError::Serialization(format!("Failed to serialize chunk: {e:?}")) - })? - .to_vec(); - let random_nonce = thread_rng().gen::(); - let expected_proof = ChunkProof::new(&stored_on_node, random_nonce); - - Some(( - VerificationKind::ChunkProof { - expected_proof, - nonce: random_nonce, - }, - verification_cfg, - )) - }; - - let put_cfg = PutRecordCfg { - put_quorum: Quorum::One, - retry_strategy: Some(RetryStrategy::Balanced), - use_put_record_to: Some(storing_nodes.clone()), - verification, - }; - let payment_upload = Ok(self.network.put_record(record, &put_cfg).await?); - debug!("Successfully stored chunk: {chunk:?} to {storing_nodes:?}"); - payment_upload - } - - /// Pay for the chunks and get the proof of payment. - pub(crate) async fn pay( - &self, - data_type: u32, - content_addrs: impl Iterator + Clone, - wallet: &EvmWallet, - ) -> Result<(Receipt, AlreadyPaidAddressesCount), PayError> { - let number_of_content_addrs = content_addrs.clone().count(); - let quotes = self.get_store_quotes(data_type, content_addrs).await?; - - // Make sure nobody else can use the wallet while we are paying - debug!("Waiting for wallet lock"); - let lock_guard = wallet.lock().await; - debug!("Locked wallet"); - - // TODO: the error might contain some succeeded quote payments as well. These should be returned on err, so that they can be skipped when retrying. - // TODO: retry when it fails? - // Execute chunk payments - let _payments = wallet - .pay_for_quotes(quotes.payments()) - .await - .map_err(|err| PayError::from(err.0))?; - - // payment is done, unlock the wallet for other threads - drop(lock_guard); - debug!("Unlocked wallet"); - - let skipped_chunks = number_of_content_addrs - quotes.len(); - trace!( - "Chunk payments of {} chunks completed. {} chunks were free / already paid for", - quotes.len(), - skipped_chunks - ); - - let receipt = receipt_from_store_quotes(quotes); - - Ok((receipt, skipped_chunks)) - } -} +use std::future::Future; pub(crate) async fn process_tasks_with_max_concurrency(tasks: I, batch_size: usize) -> Vec where diff --git a/autonomi/src/client/vault.rs b/autonomi/src/client/vault.rs deleted file mode 100644 index 5c4c4c43b1..0000000000 --- a/autonomi/src/client/vault.rs +++ /dev/null @@ -1,306 +0,0 @@ -// Copyright 2024 MaidSafe.net limited. -// -// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. -// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed -// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. Please review the Licences for the specific language governing -// permissions and limitations relating to use of the SAFE Network Software. - -pub mod key; -pub mod user_data; - -pub use key::{derive_vault_key, VaultSecretKey}; -pub use user_data::UserData; - -use super::data::CostError; -use crate::client::data::PutError; -use crate::client::payment::PaymentOption; -use crate::client::Client; -use ant_evm::{Amount, AttoTokens}; -use ant_networking::{GetRecordCfg, GetRecordError, NetworkError, PutRecordCfg, VerificationKind}; -use ant_protocol::storage::{ - try_serialize_record, DataTypes, RecordKind, RetryStrategy, Scratchpad, ScratchpadAddress, -}; -use ant_protocol::Bytes; -use ant_protocol::{storage::try_deserialize_record, NetworkAddress}; -use libp2p::kad::{Quorum, Record}; -use std::collections::HashSet; -use std::hash::{DefaultHasher, Hash, Hasher}; -use tracing::info; - -#[derive(Debug, thiserror::Error)] -pub enum VaultError { - #[error("Could not generate Vault secret key from entropy: {0:?}")] - Bls(#[from] bls::Error), - #[error("Scratchpad found at {0:?} was not a valid record.")] - CouldNotDeserializeVaultScratchPad(ScratchpadAddress), - #[error("Protocol: {0}")] - Protocol(#[from] ant_protocol::Error), - #[error("Network: {0}")] - Network(#[from] NetworkError), - #[error("Vault not found")] - Missing, -} - -/// The content type of the vault data -/// The number is used to determine the type of the contents of the bytes contained in a vault -/// Custom apps can use this to store their own custom types of data in vaults -/// It is recommended to use the hash of the app name or an unique identifier as the content type using [`app_name_to_vault_content_type`] -/// The value 0 is reserved for tests -pub type VaultContentType = u64; - -/// For custom apps using Scratchpad, this function converts an app identifier or name to a [`VaultContentType`] -pub fn app_name_to_vault_content_type(s: T) -> VaultContentType { - let mut hasher = DefaultHasher::new(); - s.hash(&mut hasher); - hasher.finish() -} - -impl Client { - /// Retrieves and returns a decrypted vault if one exists. - /// Returns the content type of the bytes in the vault - pub async fn fetch_and_decrypt_vault( - &self, - secret_key: &VaultSecretKey, - ) -> Result<(Bytes, VaultContentType), VaultError> { - info!("Fetching and decrypting vault..."); - let pad = self.get_vault_from_network(secret_key).await?; - - let data = pad.decrypt_data(secret_key)?; - debug!("vault data is successfully fetched and decrypted"); - Ok((data, pad.data_encoding())) - } - - /// Gets the vault Scratchpad from a provided client public key - async fn get_vault_from_network( - &self, - secret_key: &VaultSecretKey, - ) -> Result { - let client_pk = secret_key.public_key(); - - let scratch_address = ScratchpadAddress::new(client_pk); - let network_address = NetworkAddress::from_scratchpad_address(scratch_address); - info!("Fetching vault from network at {network_address:?}",); - let scratch_key = network_address.to_record_key(); - - let get_cfg = GetRecordCfg { - get_quorum: Quorum::Majority, - retry_strategy: None, - target_record: None, - expected_holders: HashSet::new(), - }; - - let pad = match self - .network - .get_record_from_network(scratch_key.clone(), &get_cfg) - .await - { - Ok(record) => { - debug!("Got scratchpad for {scratch_key:?}"); - try_deserialize_record::(&record) - .map_err(|_| VaultError::CouldNotDeserializeVaultScratchPad(scratch_address))? - } - Err(NetworkError::GetRecordError(GetRecordError::SplitRecord { result_map })) => { - debug!("Got multiple scratchpads for {scratch_key:?}"); - let mut pads = result_map - .values() - .map(|(record, _)| try_deserialize_record::(record)) - .collect::, _>>() - .map_err(|_| VaultError::CouldNotDeserializeVaultScratchPad(scratch_address))?; - - // take the latest versions - pads.sort_by_key(|s| s.count()); - let max_version = pads.last().map(|p| p.count()).unwrap_or_else(|| { - error!("Got empty scratchpad vector for {scratch_key:?}"); - u64::MAX - }); - let latest_pads: Vec<_> = pads - .into_iter() - .filter(|s| s.count() == max_version) - .collect(); - - // make sure we only have one of latest version - let pad = match &latest_pads[..] { - [one] => one, - [multi, ..] => { - error!("Got multiple conflicting scratchpads for {scratch_key:?} with the latest version, returning the first one"); - multi - } - [] => { - error!("Got empty scratchpad vector for {scratch_key:?}"); - return Err(VaultError::Missing); - } - }; - pad.to_owned() - } - Err(e) => { - warn!("Failed to fetch vault {network_address:?} from network: {e}"); - return Err(e)?; - } - }; - - Ok(pad) - } - - /// Get the cost of creating a new vault - pub async fn vault_cost(&self, owner: &VaultSecretKey) -> Result { - info!("Getting cost for vault"); - let client_pk = owner.public_key(); - let content_type = Default::default(); - let scratch = Scratchpad::new(client_pk, content_type); - let vault_xor = scratch.address().xorname(); - - // TODO: define default size of Scratchpad - let store_quote = self - .get_store_quotes( - DataTypes::Scratchpad.get_index(), - std::iter::once((vault_xor, 256)), - ) - .await?; - - let total_cost = AttoTokens::from_atto( - store_quote - .0 - .values() - .map(|quote| quote.price()) - .sum::(), - ); - - Ok(total_cost) - } - - /// Put data into the client's VaultPacket - /// - /// Pays for a new VaultPacket if none yet created for the client. - /// Provide the bytes to be written to the vault and the content type of those bytes. - /// It is recommended to use the hash of the app name or unique identifier as the content type. - pub async fn write_bytes_to_vault( - &self, - data: Bytes, - payment_option: PaymentOption, - secret_key: &VaultSecretKey, - content_type: VaultContentType, - ) -> Result { - let mut total_cost = AttoTokens::zero(); - - let (mut scratch, is_new) = self - .get_or_create_scratchpad(secret_key, content_type) - .await?; - - let _ = scratch.update_and_sign(data, secret_key); - debug_assert!(scratch.is_valid(), "Must be valid after being signed. This is a bug, please report it by opening an issue on our github"); - - let scratch_address = scratch.network_address(); - let scratch_key = scratch_address.to_record_key(); - - info!("Writing to vault at {scratch_address:?}",); - - let record = if is_new { - let (receipt, _skipped_payments) = self - .pay_for_content_addrs( - DataTypes::Scratchpad.get_index(), - std::iter::once((scratch.xorname(), scratch.payload_size())), - payment_option, - ) - .await - .inspect_err(|err| { - error!("Failed to pay for new vault at addr: {scratch_address:?} : {err}"); - })?; - - let (proof, price) = match receipt.values().next() { - Some(proof) => proof, - None => return Err(PutError::PaymentUnexpectedlyInvalid(scratch_address)), - }; - - total_cost = *price; - - Record { - key: scratch_key, - value: try_serialize_record( - &(proof, scratch), - RecordKind::DataWithPayment(DataTypes::Scratchpad), - ) - .map_err(|_| { - PutError::Serialization( - "Failed to serialize scratchpad with payment".to_string(), - ) - })? - .to_vec(), - publisher: None, - expires: None, - } - } else { - Record { - key: scratch_key, - value: try_serialize_record(&scratch, RecordKind::DataOnly(DataTypes::Scratchpad)) - .map_err(|_| { - PutError::Serialization("Failed to serialize scratchpad".to_string()) - })? - .to_vec(), - publisher: None, - expires: None, - } - }; - - let put_cfg = PutRecordCfg { - put_quorum: Quorum::Majority, - retry_strategy: Some(RetryStrategy::Balanced), - use_put_record_to: None, - verification: Some(( - VerificationKind::Crdt, - GetRecordCfg { - get_quorum: Quorum::Majority, - retry_strategy: None, - target_record: None, - expected_holders: HashSet::new(), - }, - )), - }; - - debug!("Put record - scratchpad at {scratch_address:?} to the network"); - self.network - .put_record(record, &put_cfg) - .await - .inspect_err(|err| { - error!( - "Failed to put scratchpad {scratch_address:?} to the network with err: {err:?}" - ) - })?; - - Ok(total_cost) - } - - /// Returns an existing scratchpad or creates a new one if it does not exist. - pub async fn get_or_create_scratchpad( - &self, - secret_key: &VaultSecretKey, - content_type: VaultContentType, - ) -> Result<(Scratchpad, bool), PutError> { - let client_pk = secret_key.public_key(); - - let pad_res = self.get_vault_from_network(secret_key).await; - let mut is_new = true; - - let scratch = if let Ok(existing_data) = pad_res { - info!("Scratchpad already exists, returning existing data"); - - info!( - "scratch already exists, is version {:?}", - existing_data.count() - ); - - is_new = false; - - if existing_data.owner() != &client_pk { - return Err(PutError::VaultBadOwner); - } - - existing_data - } else { - trace!("new scratchpad creation"); - Scratchpad::new(client_pk, content_type) - }; - - Ok((scratch, is_new)) - } -} diff --git a/autonomi/src/lib.rs b/autonomi/src/lib.rs index 247a2a55c2..68de9d1bcd 100644 --- a/autonomi/src/lib.rs +++ b/autonomi/src/lib.rs @@ -55,13 +55,24 @@ extern crate tracing; pub mod client; pub mod self_encryption; +// The Network data types +pub use client::data_types::chunk; +pub use client::data_types::graph; +pub use client::data_types::pointer; +pub use client::data_types::scratchpad; + +/// The high-level data types +pub use client::data; +pub use client::files; +pub use client::vault; + +// Re-exports of the evm types pub use ant_evm::utils::get_evm_network; pub use ant_evm::Amount; pub use ant_evm::EvmNetwork as Network; pub use ant_evm::EvmWallet as Wallet; pub use ant_evm::QuoteHash; pub use ant_evm::RewardsAddress; -pub use ant_protocol::storage::{Chunk, ChunkAddress}; #[doc(no_inline)] // Place this under 'Re-exports' in the docs. pub use bytes::Bytes; @@ -70,8 +81,9 @@ pub use libp2p::Multiaddr; #[doc(inline)] pub use client::{ - files::archive::Metadata, files::archive::PrivateArchive, files::archive_public::PublicArchive, - Client, ClientConfig, + data_types::chunk::Chunk, data_types::graph::GraphEntry, data_types::pointer::Pointer, + data_types::scratchpad::Scratchpad, files::archive_private::PrivateArchive, + files::archive_public::PublicArchive, files::Metadata, Client, ClientConfig, }; #[cfg(feature = "extension-module")] diff --git a/autonomi/src/python.rs b/autonomi/src/python.rs index 6f98801b12..0e7be1ec6e 100644 --- a/autonomi/src/python.rs +++ b/autonomi/src/python.rs @@ -2,8 +2,8 @@ #![allow(non_local_definitions)] use crate::client::{ - data::DataMapChunk, - files::{archive::PrivateArchiveAccess, archive_public::ArchiveAddr}, + chunk::DataMapChunk, + files::{archive_private::PrivateArchiveAccess, archive_public::ArchiveAddr}, payment::PaymentOption as RustPaymentOption, vault::{UserData, VaultSecretKey as RustVaultSecretKey}, Client as RustClient, @@ -190,11 +190,13 @@ impl Client { target: &PyPointerTarget, key: &PySecretKey, wallet: &Wallet, - ) -> PyResult<()> { + ) -> PyResult { let rt = tokio::runtime::Runtime::new().expect("Could not start tokio runtime"); let pointer = RustPointer::new(owner.inner, counter, target.inner.clone(), &key.inner); - rt.block_on(self.inner.pointer_put(pointer, &wallet.inner)) - .map_err(|e| PyValueError::new_err(format!("Failed to put pointer: {e}"))) + let addr = rt + .block_on(self.inner.pointer_put(pointer, &wallet.inner)) + .map_err(|e| PyValueError::new_err(format!("Failed to put pointer: {e}")))?; + Ok(PyPointerAddress { inner: addr }) } fn pointer_cost(&self, key: &PySecretKey) -> PyResult { diff --git a/autonomi/tests/external_signer.rs b/autonomi/tests/external_signer.rs index 59190c6c9d..eaba749c52 100644 --- a/autonomi/tests/external_signer.rs +++ b/autonomi/tests/external_signer.rs @@ -6,7 +6,7 @@ use ant_evm::{QuoteHash, TxHash}; use ant_logging::LogBuilder; use ant_protocol::storage::DataTypes; use autonomi::client::external_signer::encrypt_data; -use autonomi::client::files::archive::{Metadata, PrivateArchive}; +use autonomi::client::files::{archive_private::PrivateArchive, Metadata}; use autonomi::client::payment::{receipt_from_store_quotes, Receipt}; use autonomi::client::quote::StoreQuote; use autonomi::client::vault::user_data::USER_DATA_VAULT_CONTENT_IDENTIFIER; @@ -149,7 +149,7 @@ async fn external_signer_put() -> eyre::Result<()> { ); let (scratch, is_new) = client - .get_or_create_scratchpad(&vault_key, *USER_DATA_VAULT_CONTENT_IDENTIFIER) + .get_or_create_scratchpad(&vault_key.public_key(), *USER_DATA_VAULT_CONTENT_IDENTIFIER) .await?; assert!(is_new, "Scratchpad is not new");