Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(network): collect all spends and store the first two ordered spends #1904

Merged
merged 2 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 42 additions & 7 deletions sn_networking/src/event/kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,21 @@
// permissions and limitations relating to use of the SAFE Network Software.

use crate::{
driver::PendingGetClosestType, get_quorum_value, GetRecordCfg, GetRecordError, NetworkError,
Result, SwarmDriver, CLOSE_GROUP_SIZE,
driver::PendingGetClosestType, get_quorum_value, get_raw_signed_spends_from_record,
GetRecordCfg, GetRecordError, NetworkError, Result, SwarmDriver, CLOSE_GROUP_SIZE,
};
use itertools::Itertools;
use libp2p::kad::{
self, GetClosestPeersError, InboundRequest, PeerRecord, ProgressStep, QueryId, QueryResult,
QueryStats, Record, K_VALUE,
};
use sn_protocol::PrettyPrintRecordKey;
use sn_protocol::{
storage::{try_serialize_record, RecordKind},
PrettyPrintRecordKey,
};
use sn_transfers::SignedSpend;
use std::{
collections::{hash_map::Entry, HashSet},
collections::{hash_map::Entry, BTreeSet, HashSet},
time::Instant,
};
use tokio::sync::oneshot;
Expand Down Expand Up @@ -395,9 +399,40 @@ impl SwarmDriver {
Self::send_record_after_checking_target(sender, peer_record.record, &cfg)?;
} else {
debug!("For record {pretty_key:?} task {query_id:?}, fetch completed with split record");
sender
.send(Err(GetRecordError::SplitRecord { result_map }))
.map_err(|_| NetworkError::InternalMsgChannelDropped)?;
let mut accumulated_spends = BTreeSet::new();
for (record, _) in result_map.values() {
match get_raw_signed_spends_from_record(record) {
Ok(spends) => {
accumulated_spends.extend(spends);
}
Err(_) => {
continue;
}
}
}
if !accumulated_spends.is_empty() {
info!("For record {pretty_key:?} task {query_id:?}, found split record for a spend, accumulated and sending them as a single record");
let accumulated_spends = accumulated_spends
.into_iter()
.take(2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why 2 ? why not more?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep a random magic number here for now, say 50? If so, might have to do the same during PUT validation (we store 2 there).

The lower bound of a single signed spend is 825B. Can grow if the inputs/outputs increase.

.collect::<Vec<SignedSpend>>();

let bytes = try_serialize_record(&accumulated_spends, RecordKind::Spend)?;

let new_accumulated_record = Record {
key: peer_record.record.key,
value: bytes.to_vec(),
publisher: None,
expires: None,
};
sender
.send(Ok(new_accumulated_record))
.map_err(|_| NetworkError::InternalMsgChannelDropped)?;
} else {
sender
.send(Err(GetRecordError::SplitRecord { result_map }))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if no entry within the accumulated_record,
it shall be considered as NoHolders, instead of SplitRecord ?

Copy link
Member Author

@RolandSherwin RolandSherwin Jun 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we have no idea if the record we're GET'ing is a spend here, we just try to read the header and collect them IF they're spends. If not, we assume it is a chunk/register and just return SplitRecordError

.map_err(|_| NetworkError::InternalMsgChannelDropped)?;
}
}

// Stop the query; possibly stops more nodes from being queried.
Expand Down
25 changes: 7 additions & 18 deletions sn_node/src/put_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ impl Node {
// if we have no spends to verify, return early
let unique_pubkey = match spends_for_key.as_slice() {
[] => {
warn!("Found no valid spends to verify uppon validation for {pretty_key:?}");
warn!("Found no valid spends to verify upon validation for {pretty_key:?}");
return Err(Error::InvalidRequest(format!(
"No spends to verify when validating {pretty_key:?}"
)));
Expand Down Expand Up @@ -633,12 +633,8 @@ impl Node {
"Validating before storing spend at {spend_addr:?} with unique key: {unique_pubkey}"
);

// if we already have a double spend locally, no need to check the rest
let local_spends = self.get_local_spends(spend_addr).await?;
if let [a, b, ..] = local_spends.as_slice() {
debug!("Got a double spend locally already, skipping check for: {unique_pubkey:?}");
return Ok((a.to_owned(), Some(b.to_owned())));
}
let mut all_verified_spends = BTreeSet::from_iter(local_spends.into_iter());

// get spends from the network at the address for that unique pubkey
let network_spends = match self.network.get_raw_spends(spend_addr).await {
Expand Down Expand Up @@ -672,26 +668,15 @@ impl Node {
}

// collect spends until we have a double spend or until we have all the results
let mut all_verified_spends = BTreeSet::from_iter(local_spends.into_iter());
while let Some(res) = tasks.join_next().await {
match res {
Ok((spend, Ok(()))) => {
info!("Successfully verified {spend:?}");
let _inserted = all_verified_spends.insert(spend);

// exit early if we have a double spend
if let [a, b, ..] = all_verified_spends
.iter()
.collect::<Vec<&SignedSpend>>()
.as_slice()
{
debug!("Got a double spend for {unique_pubkey:?}");
return Ok(((*a).clone(), Some((*b).clone())));
}
}
Ok((spend, Err(e))) => {
// an error here most probably means the received spend is invalid
warn!("Skipping spend {spend:?} as an error occured during validation: {e:?}");
warn!("Skipping spend {spend:?} as an error occurred during validation: {e:?}");
}
Err(e) => {
let s =
Expand All @@ -712,6 +697,10 @@ impl Node {
debug!("Got a single valid spend for {unique_pubkey:?}");
Ok((a.to_owned(), None))
}
[a, b] => {
warn!("Got a double spend for {unique_pubkey:?}");
Ok((a.to_owned(), Some(b.to_owned())))
}
_ => {
debug!(
"No valid spends found while validating Spend PUT. Who is sending us garbage?"
Expand Down
Loading