diff --git a/sn_networking/src/event/kad.rs b/sn_networking/src/event/kad.rs index 06861ffba8..ec941a933e 100644 --- a/sn_networking/src/event/kad.rs +++ b/sn_networking/src/event/kad.rs @@ -7,17 +7,21 @@ // permissions and limitations relating to use of the SAFE Network Software. use crate::{ - driver::PendingGetClosestType, get_quorum_value, GetRecordCfg, GetRecordError, NetworkError, - Result, SwarmDriver, CLOSE_GROUP_SIZE, + driver::PendingGetClosestType, get_quorum_value, get_raw_signed_spends_from_record, + GetRecordCfg, GetRecordError, NetworkError, Result, SwarmDriver, CLOSE_GROUP_SIZE, }; use itertools::Itertools; use libp2p::kad::{ self, GetClosestPeersError, InboundRequest, PeerRecord, ProgressStep, QueryId, QueryResult, QueryStats, Record, K_VALUE, }; -use sn_protocol::PrettyPrintRecordKey; +use sn_protocol::{ + storage::{try_serialize_record, RecordKind}, + PrettyPrintRecordKey, +}; +use sn_transfers::SignedSpend; use std::{ - collections::{hash_map::Entry, HashSet}, + collections::{hash_map::Entry, BTreeSet, HashSet}, time::Instant, }; use tokio::sync::oneshot; @@ -395,9 +399,40 @@ impl SwarmDriver { Self::send_record_after_checking_target(sender, peer_record.record, &cfg)?; } else { debug!("For record {pretty_key:?} task {query_id:?}, fetch completed with split record"); - sender - .send(Err(GetRecordError::SplitRecord { result_map })) - .map_err(|_| NetworkError::InternalMsgChannelDropped)?; + let mut accumulated_spends = BTreeSet::new(); + for (record, _) in result_map.values() { + match get_raw_signed_spends_from_record(record) { + Ok(spends) => { + accumulated_spends.extend(spends); + } + Err(_) => { + continue; + } + } + } + if !accumulated_spends.is_empty() { + info!("For record {pretty_key:?} task {query_id:?}, found split record for a spend, accumulated and sending them as a single record"); + let accumulated_spends = accumulated_spends + .into_iter() + .take(2) + .collect::>(); + + let bytes = try_serialize_record(&accumulated_spends, RecordKind::Spend)?; + + let new_accumulated_record = Record { + key: peer_record.record.key, + value: bytes.to_vec(), + publisher: None, + expires: None, + }; + sender + .send(Ok(new_accumulated_record)) + .map_err(|_| NetworkError::InternalMsgChannelDropped)?; + } else { + sender + .send(Err(GetRecordError::SplitRecord { result_map })) + .map_err(|_| NetworkError::InternalMsgChannelDropped)?; + } } // Stop the query; possibly stops more nodes from being queried. diff --git a/sn_node/src/put_validation.rs b/sn_node/src/put_validation.rs index cd309595a3..931c827526 100644 --- a/sn_node/src/put_validation.rs +++ b/sn_node/src/put_validation.rs @@ -350,7 +350,7 @@ impl Node { // if we have no spends to verify, return early let unique_pubkey = match spends_for_key.as_slice() { [] => { - warn!("Found no valid spends to verify uppon validation for {pretty_key:?}"); + warn!("Found no valid spends to verify upon validation for {pretty_key:?}"); return Err(Error::InvalidRequest(format!( "No spends to verify when validating {pretty_key:?}" ))); @@ -633,12 +633,8 @@ impl Node { "Validating before storing spend at {spend_addr:?} with unique key: {unique_pubkey}" ); - // if we already have a double spend locally, no need to check the rest let local_spends = self.get_local_spends(spend_addr).await?; - if let [a, b, ..] = local_spends.as_slice() { - debug!("Got a double spend locally already, skipping check for: {unique_pubkey:?}"); - return Ok((a.to_owned(), Some(b.to_owned()))); - } + let mut all_verified_spends = BTreeSet::from_iter(local_spends.into_iter()); // get spends from the network at the address for that unique pubkey let network_spends = match self.network.get_raw_spends(spend_addr).await { @@ -672,26 +668,15 @@ impl Node { } // collect spends until we have a double spend or until we have all the results - let mut all_verified_spends = BTreeSet::from_iter(local_spends.into_iter()); while let Some(res) = tasks.join_next().await { match res { Ok((spend, Ok(()))) => { info!("Successfully verified {spend:?}"); let _inserted = all_verified_spends.insert(spend); - - // exit early if we have a double spend - if let [a, b, ..] = all_verified_spends - .iter() - .collect::>() - .as_slice() - { - debug!("Got a double spend for {unique_pubkey:?}"); - return Ok(((*a).clone(), Some((*b).clone()))); - } } Ok((spend, Err(e))) => { // an error here most probably means the received spend is invalid - warn!("Skipping spend {spend:?} as an error occured during validation: {e:?}"); + warn!("Skipping spend {spend:?} as an error occurred during validation: {e:?}"); } Err(e) => { let s = @@ -712,6 +697,10 @@ impl Node { debug!("Got a single valid spend for {unique_pubkey:?}"); Ok((a.to_owned(), None)) } + [a, b] => { + warn!("Got a double spend for {unique_pubkey:?}"); + Ok((a.to_owned(), Some(b.to_owned()))) + } _ => { debug!( "No valid spends found while validating Spend PUT. Who is sending us garbage?"