diff --git a/sn_networking/src/event/kad.rs b/sn_networking/src/event/kad.rs index 06861ffba8..ec941a933e 100644 --- a/sn_networking/src/event/kad.rs +++ b/sn_networking/src/event/kad.rs @@ -7,17 +7,21 @@ // permissions and limitations relating to use of the SAFE Network Software. use crate::{ - driver::PendingGetClosestType, get_quorum_value, GetRecordCfg, GetRecordError, NetworkError, - Result, SwarmDriver, CLOSE_GROUP_SIZE, + driver::PendingGetClosestType, get_quorum_value, get_raw_signed_spends_from_record, + GetRecordCfg, GetRecordError, NetworkError, Result, SwarmDriver, CLOSE_GROUP_SIZE, }; use itertools::Itertools; use libp2p::kad::{ self, GetClosestPeersError, InboundRequest, PeerRecord, ProgressStep, QueryId, QueryResult, QueryStats, Record, K_VALUE, }; -use sn_protocol::PrettyPrintRecordKey; +use sn_protocol::{ + storage::{try_serialize_record, RecordKind}, + PrettyPrintRecordKey, +}; +use sn_transfers::SignedSpend; use std::{ - collections::{hash_map::Entry, HashSet}, + collections::{hash_map::Entry, BTreeSet, HashSet}, time::Instant, }; use tokio::sync::oneshot; @@ -395,9 +399,40 @@ impl SwarmDriver { Self::send_record_after_checking_target(sender, peer_record.record, &cfg)?; } else { debug!("For record {pretty_key:?} task {query_id:?}, fetch completed with split record"); - sender - .send(Err(GetRecordError::SplitRecord { result_map })) - .map_err(|_| NetworkError::InternalMsgChannelDropped)?; + let mut accumulated_spends = BTreeSet::new(); + for (record, _) in result_map.values() { + match get_raw_signed_spends_from_record(record) { + Ok(spends) => { + accumulated_spends.extend(spends); + } + Err(_) => { + continue; + } + } + } + if !accumulated_spends.is_empty() { + info!("For record {pretty_key:?} task {query_id:?}, found split record for a spend, accumulated and sending them as a single record"); + let accumulated_spends = accumulated_spends + .into_iter() + .take(2) + .collect::>(); + + let bytes = try_serialize_record(&accumulated_spends, RecordKind::Spend)?; + + let new_accumulated_record = Record { + key: peer_record.record.key, + value: bytes.to_vec(), + publisher: None, + expires: None, + }; + sender + .send(Ok(new_accumulated_record)) + .map_err(|_| NetworkError::InternalMsgChannelDropped)?; + } else { + sender + .send(Err(GetRecordError::SplitRecord { result_map })) + .map_err(|_| NetworkError::InternalMsgChannelDropped)?; + } } // Stop the query; possibly stops more nodes from being queried.