From 64402a25e8fe7e7e3c7d0ab4d1984c83b82e6eb2 Mon Sep 17 00:00:00 2001 From: grumbach <anselmega@gmail.com> Date: Tue, 5 Mar 2024 14:26:34 +0900 Subject: [PATCH] chore: clean swarm commands errs and spend errors --- Cargo.lock | 1 + sn_cli/src/subcommands/gossipsub.rs | 6 +- sn_cli/src/subcommands/wallet/wo_wallet.rs | 2 +- sn_client/src/api.rs | 23 +++-- sn_client/src/wallet.rs | 60 ++++++++----- sn_networking/src/error.rs | 3 +- sn_networking/src/lib.rs | 99 ++++++++++++---------- sn_node/Cargo.toml | 1 + sn_node/src/bin/safenode/rpc_service.rs | 35 ++------ sn_node/src/lib.rs | 20 ++--- sn_node/src/node.rs | 25 ++---- sn_node/src/put_validation.rs | 12 ++- sn_node/src/replication.rs | 6 +- sn_node/tests/double_spend.rs | 9 +- sn_node/tests/nodes_rewards.rs | 2 +- sn_transfers/src/cashnotes/unique_keys.rs | 6 ++ sn_transfers/src/wallet/wallet_file.rs | 8 +- 17 files changed, 158 insertions(+), 160 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 40d65f5ea1..0b68eaf820 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5079,6 +5079,7 @@ name = "sn_node" version = "0.104.35" dependencies = [ "assert_fs", + "assert_matches", "async-trait", "blsttc", "bytes", diff --git a/sn_cli/src/subcommands/gossipsub.rs b/sn_cli/src/subcommands/gossipsub.rs index c7371bc63d..8e93f03121 100644 --- a/sn_cli/src/subcommands/gossipsub.rs +++ b/sn_cli/src/subcommands/gossipsub.rs @@ -39,7 +39,7 @@ pub enum GossipsubCmds { pub(crate) async fn gossipsub_cmds(cmds: GossipsubCmds, client: &Client) -> Result<()> { match cmds { GossipsubCmds::Subscribe { topic } => { - client.subscribe_to_topic(topic.clone())?; + client.subscribe_to_topic(topic.clone()); println!("Subscribed to topic '{topic}'. Listening for messages published on it..."); let mut events_channel = client.events_channel(); while let Ok(event) = events_channel.recv().await { @@ -50,11 +50,11 @@ pub(crate) async fn gossipsub_cmds(cmds: GossipsubCmds, client: &Client) -> Resu } } GossipsubCmds::Unsubscribe { topic } => { - client.unsubscribe_from_topic(topic.clone())?; + client.unsubscribe_from_topic(topic.clone()); println!("Unsubscribed from topic '{topic}'."); } GossipsubCmds::Publish { topic, msg } => { - client.publish_on_topic(topic.clone(), msg.into())?; + client.publish_on_topic(topic.clone(), msg.into()); println!("Message published on topic '{topic}'."); } } diff --git a/sn_cli/src/subcommands/wallet/wo_wallet.rs b/sn_cli/src/subcommands/wallet/wo_wallet.rs index 338ac52bce..3ca311edd7 100644 --- a/sn_cli/src/subcommands/wallet/wo_wallet.rs +++ b/sn_cli/src/subcommands/wallet/wo_wallet.rs @@ -365,7 +365,7 @@ async fn listen_notifs_and_deposit(root_dir: &Path, client: &Client, pk_hex: Str let main_pk = wallet.address(); let pk = main_pk.public_key(); - client.subscribe_to_topic(ROYALTY_TRANSFER_NOTIF_TOPIC.to_string())?; + client.subscribe_to_topic(ROYALTY_TRANSFER_NOTIF_TOPIC.to_string()); let mut events_receiver = client.events_channel(); println!("Current balance in local wallet: {}", wallet.balance()); diff --git a/sn_client/src/api.rs b/sn_client/src/api.rs index f006139695..631a8888a5 100644 --- a/sn_client/src/api.rs +++ b/sn_client/src/api.rs @@ -961,15 +961,14 @@ impl Client { /// # async fn main() -> Result<(),Error>{ /// let client = Client::new(SecretKey::random(), None, false, None, None).await?; /// // Subscribing to the gossipsub topic "Royalty Transfer Notification" - /// client.subscribe_to_topic(String::from("ROYALTY_TRANSFER_NOTIFICATION"))?; + /// client.subscribe_to_topic(String::from("ROYALTY_TRANSFER_NOTIFICATION")); /// # Ok(()) /// # } /// ``` - pub fn subscribe_to_topic(&self, topic_id: String) -> Result<()> { + pub fn subscribe_to_topic(&self, topic_id: String) { info!("Subscribing to topic id: {topic_id}"); - self.network.subscribe_to_topic(topic_id)?; - self.network.start_handle_gossip()?; - Ok(()) + self.network.subscribe_to_topic(topic_id); + self.network.start_handle_gossip(); } /// Unsubscribe from given gossipsub topic @@ -985,14 +984,13 @@ impl Client { /// # async fn main() -> Result<(),Error>{ /// let client = Client::new(SecretKey::random(), None, false, None, None).await?; /// // Unsubscribing to the gossipsub topic "Royalty Transfer Notification" - /// client.unsubscribe_from_topic(String::from("ROYALTY_TRANSFER_NOTIFICATION"))?; + /// client.unsubscribe_from_topic(String::from("ROYALTY_TRANSFER_NOTIFICATION")); /// # Ok(()) /// # } /// ``` - pub fn unsubscribe_from_topic(&self, topic_id: String) -> Result<()> { + pub fn unsubscribe_from_topic(&self, topic_id: String) { info!("Unsubscribing from topic id: {topic_id}"); - self.network.unsubscribe_from_topic(topic_id)?; - Ok(()) + self.network.unsubscribe_from_topic(topic_id); } /// Publish message on given topic @@ -1010,14 +1008,13 @@ impl Client { /// let client = Client::new(SecretKey::random(), None, false, None, None).await?; /// let msg = String::from("Transfer Successful."); /// // Note the use of .into() to set the argument as bytes - /// client.publish_on_topic(String::from("ROYALTY_TRANSFER_NOTIFICATION"), msg.into())?; + /// client.publish_on_topic(String::from("ROYALTY_TRANSFER_NOTIFICATION"), msg.into()); /// # Ok(()) /// # } /// ``` - pub fn publish_on_topic(&self, topic_id: String, msg: Bytes) -> Result<()> { + pub fn publish_on_topic(&self, topic_id: String, msg: Bytes) { info!("Publishing msg on topic id: {topic_id}"); - self.network.publish_on_topic(topic_id, msg)?; - Ok(()) + self.network.publish_on_topic(topic_id, msg); } /// This function is used to receive a Vector of CashNoteRedemptions and turn them back into spendable CashNotes. diff --git a/sn_client/src/wallet.rs b/sn_client/src/wallet.rs index 3a521be441..c2d56aea3b 100644 --- a/sn_client/src/wallet.rs +++ b/sn_client/src/wallet.rs @@ -722,6 +722,7 @@ impl Client { ) -> WalletResult<()> { let mut tasks = Vec::new(); + // send spends to the network in parralel for spend_request in spend_requests { debug!( "sending spend request to the network: {:?}: {spend_request:#?}", @@ -739,29 +740,50 @@ impl Client { tasks.push(the_task); } - let mut spent_cash_notes = BTreeSet::default(); - for (cash_note_key, spend_attempt_result) in join_all(tasks).await { - // This is a record mismatch on spend, we need to clean up and remove the spent CashNote from the wallet - // This only happens if we're verifying the store - if let Err(Error::Network(sn_networking::Error::GetRecordError( - GetRecordError::RecordDoesNotMatch(record_key), - ))) = spend_attempt_result - { - warn!("Record mismatch on spend, removing CashNote from wallet: {record_key:?}"); - spent_cash_notes.insert(*cash_note_key); - } else { - return spend_attempt_result - .map_err(|err| WalletError::CouldNotSendMoney(err.to_string())); + // wait for all the tasks to complete and gather the errors + let mut errors = Vec::new(); + let mut double_spent_keys = BTreeSet::new(); + for (spend_key, spend_attempt_result) in join_all(tasks).await { + match spend_attempt_result { + Err(Error::Network(sn_networking::Error::GetRecordError( + GetRecordError::RecordDoesNotMatch(_), + ))) + | Err(Error::Network(sn_networking::Error::GetRecordError( + GetRecordError::SplitRecord { .. }, + ))) => { + warn!( + "Double spend detected while trying to spend: {:?}", + spend_key + ); + double_spent_keys.insert(*spend_key); + } + Err(e) => { + warn!("Spend request errored out when sent to the network {spend_key:?}: {e}"); + errors.push((spend_key, e)); + } + Ok(()) => { + trace!("Spend request was successfully sent to the network: {spend_key:?}"); + } } } - if spent_cash_notes.is_empty() { - Ok(()) - } else { - Err(WalletError::DoubleSpendAttemptedForCashNotes( - spent_cash_notes, - )) + // report errors accordingly + // double spend errors in priority as they should be dealt with by the wallet + if !double_spent_keys.is_empty() { + return Err(WalletError::DoubleSpendAttemptedForCashNotes( + double_spent_keys, + )); } + if !errors.is_empty() { + let mut err_report = "Failed to send spend requests to the network:".to_string(); + for (spend_key, e) in &errors { + warn!("Failed to send spend request to the network: {spend_key:?}: {e}"); + err_report.push_str(&format!("{spend_key:?}: {e}")); + } + return Err(WalletError::CouldNotSendMoney(err_report)); + } + + Ok(()) } /// Receive a Transfer, verify and redeem CashNotes from the Network. diff --git a/sn_networking/src/error.rs b/sn_networking/src/error.rs index 6a856acee2..c04cc54313 100644 --- a/sn_networking/src/error.rs +++ b/sn_networking/src/error.rs @@ -114,6 +114,8 @@ pub enum Error { // GetRecord query errors #[error("GetRecord Query Error {0:?}")] GetRecordError(#[from] GetRecordError), + #[error("Record not stored by nodes, it could be invalid, else you should retry: {0:?}")] + RecordNotStoredByNodes(NetworkAddress), // The RecordKind that was obtained did not match with the expected one #[error("The RecordKind obtained from the Record did not match with the expected kind: {0}")] @@ -135,7 +137,6 @@ pub enum Error { // ---------- Spend Errors #[error("Spend not found: {0:?}")] NoSpendFoundInsideRecord(SpendAddress), - #[error("A double spend was detected. Two diverging signed spends: {0:?}, {1:?}")] DoubleSpendAttempt(Box<SignedSpend>, Box<SignedSpend>), diff --git a/sn_networking/src/lib.rs b/sn_networking/src/lib.rs index bdeaf516ff..fe21a6d608 100644 --- a/sn_networking/src/lib.rs +++ b/sn_networking/src/lib.rs @@ -172,7 +172,7 @@ impl Network { /// This function will only be called for the bootstrap nodes. pub async fn dial(&self, addr: Multiaddr) -> Result<()> { let (sender, receiver) = oneshot::channel(); - self.send_swarm_cmd(SwarmCmd::Dial { addr, sender })?; + self.send_swarm_cmd(SwarmCmd::Dial { addr, sender }); receiver.await? } @@ -194,7 +194,7 @@ impl Network { /// Does not include self pub async fn get_kbuckets(&self) -> Result<BTreeMap<u32, Vec<PeerId>>> { let (sender, receiver) = oneshot::channel(); - self.send_swarm_cmd(SwarmCmd::GetKBuckets { sender })?; + self.send_swarm_cmd(SwarmCmd::GetKBuckets { sender }); receiver .await .map_err(|_e| Error::InternalMsgChannelDropped) @@ -208,7 +208,7 @@ impl Network { self.send_swarm_cmd(SwarmCmd::GetCloseGroupLocalPeers { key: key.clone(), sender, - })?; + }); match receiver.await { Ok(close_peers) => { @@ -243,7 +243,7 @@ impl Network { /// Also contains our own PeerId. pub async fn get_all_local_peers(&self) -> Result<Vec<PeerId>> { let (sender, receiver) = oneshot::channel(); - self.send_swarm_cmd(SwarmCmd::GetAllLocalPeers { sender })?; + self.send_swarm_cmd(SwarmCmd::GetAllLocalPeers { sender }); receiver .await @@ -254,7 +254,7 @@ impl Network { /// Also contains our own PeerId. pub async fn get_closest_k_value_local_peers(&self) -> Result<Vec<PeerId>> { let (sender, receiver) = oneshot::channel(); - self.send_swarm_cmd(SwarmCmd::GetClosestKLocalPeers { sender })?; + self.send_swarm_cmd(SwarmCmd::GetClosestKLocalPeers { sender }); receiver .await @@ -396,21 +396,18 @@ impl Network { } /// Subscribe to given gossipsub topic - pub fn subscribe_to_topic(&self, topic_id: String) -> Result<()> { - self.send_swarm_cmd(SwarmCmd::GossipsubSubscribe(topic_id))?; - Ok(()) + pub fn subscribe_to_topic(&self, topic_id: String) { + self.send_swarm_cmd(SwarmCmd::GossipsubSubscribe(topic_id)); } /// Unsubscribe from given gossipsub topic - pub fn unsubscribe_from_topic(&self, topic_id: String) -> Result<()> { - self.send_swarm_cmd(SwarmCmd::GossipsubUnsubscribe(topic_id))?; - Ok(()) + pub fn unsubscribe_from_topic(&self, topic_id: String) { + self.send_swarm_cmd(SwarmCmd::GossipsubUnsubscribe(topic_id)); } /// Publish a msg on a given topic - pub fn publish_on_topic(&self, topic_id: String, msg: Bytes) -> Result<()> { - self.send_swarm_cmd(SwarmCmd::GossipsubPublish { topic_id, msg })?; - Ok(()) + pub fn publish_on_topic(&self, topic_id: String, msg: Bytes) { + self.send_swarm_cmd(SwarmCmd::GossipsubPublish { topic_id, msg }); } /// Get a record from the network @@ -428,7 +425,7 @@ impl Network { key: key.clone(), sender, cfg: cfg.clone(), - })?; + }); let result = receiver.await.map_err(|e| { error!("When fetching record {pretty_key:?}, encountered a channel error {e:?}"); Error::InternalMsgChannelDropped @@ -464,11 +461,7 @@ impl Network { key: key.clone(), sender, cfg: cfg.clone(), - }) - .map_err(|err| BackoffError::Transient { - err, - retry_after: None, - })?; + }); let result = receiver.await.map_err(|e| { error!("When fetching record {pretty_key:?}, encountered a channel error {e:?}"); Error::InternalMsgChannelDropped @@ -520,7 +513,7 @@ impl Network { /// Get the cost of storing the next record from the network pub async fn get_local_storecost(&self, key: RecordKey) -> Result<NanoTokens> { let (sender, receiver) = oneshot::channel(); - self.send_swarm_cmd(SwarmCmd::GetLocalStoreCost { key, sender })?; + self.send_swarm_cmd(SwarmCmd::GetLocalStoreCost { key, sender }); receiver .await @@ -528,9 +521,8 @@ impl Network { } /// Notify the node receicced a payment. - pub fn notify_payment_received(&self) -> Result<()> { - self.send_swarm_cmd(SwarmCmd::PaymentReceived)?; - Ok(()) + pub fn notify_payment_received(&self) { + self.send_swarm_cmd(SwarmCmd::PaymentReceived); } /// Get `Record` from the local RecordStore @@ -539,7 +531,7 @@ impl Network { self.send_swarm_cmd(SwarmCmd::GetLocalRecord { key: key.clone(), sender, - })?; + }); receiver .await @@ -549,7 +541,7 @@ impl Network { /// Whether the target peer is considered as `in trouble` by self pub async fn is_peer_bad(&self, target: NetworkAddress) -> Result<bool> { let (sender, receiver) = oneshot::channel(); - self.send_swarm_cmd(SwarmCmd::IsPeerInTrouble { target, sender })?; + self.send_swarm_cmd(SwarmCmd::IsPeerInTrouble { target, sender }); receiver .await @@ -605,13 +597,13 @@ impl Network { record: record.clone(), sender, quorum: cfg.put_quorum, - })?; + }); } else { self.send_swarm_cmd(SwarmCmd::PutRecord { record: record.clone(), sender, quorum: cfg.put_quorum, - })?; + }); } let response = receiver.await?; @@ -640,8 +632,26 @@ impl Network { ) .await?; } else { - self.get_record_from_network(record.key.clone(), get_cfg) - .await?; + match self + .get_record_from_network(record.key.clone(), get_cfg) + .await + { + Ok(_) => { + debug!("Record {pretty_key:?} verified to be stored."); + } + Err(Error::GetRecordError(GetRecordError::RecordNotFound)) => { + warn!("Record {pretty_key:?} not found after PUT, either rejected or not yet stored by nodes when we asked"); + return Err(Error::RecordNotStoredByNodes( + NetworkAddress::from_record_key(&record_key), + )); + } + Err(e) => { + debug!( + "Failed to verify record {pretty_key:?} to be stored with error: {e:?}" + ); + return Err(e); + } + } } } response @@ -649,7 +659,7 @@ impl Network { /// Put `Record` to the local RecordStore /// Must be called after the validations are performed on the Record - pub fn put_local_record(&self, record: Record) -> Result<()> { + pub fn put_local_record(&self, record: Record) { trace!( "Writing Record locally, for {:?} - length {:?}", PrettyPrintRecordKey::from(&record.key), @@ -664,7 +674,7 @@ impl Network { self.send_swarm_cmd(SwarmCmd::RecordStoreHasKey { key: key.clone(), sender, - })?; + }); receiver .await @@ -676,7 +686,7 @@ impl Network { &self, ) -> Result<HashMap<NetworkAddress, RecordType>> { let (sender, receiver) = oneshot::channel(); - self.send_swarm_cmd(SwarmCmd::GetAllLocalRecordAddresses { sender })?; + self.send_swarm_cmd(SwarmCmd::GetAllLocalRecordAddresses { sender }); receiver .await @@ -693,13 +703,13 @@ impl Network { req, peer, sender: Some(sender), - })?; + }); receiver.await? } /// Send `Request` to the given `PeerId` and do _not_ await a response here. /// Instead the Response will be handled by the common `response_handler` - pub fn send_req_ignore_reply(&self, req: Request, peer: PeerId) -> Result<()> { + pub fn send_req_ignore_reply(&self, req: Request, peer: PeerId) { let swarm_cmd = SwarmCmd::SendRequest { req, peer, @@ -709,40 +719,37 @@ impl Network { } /// Send a `Response` through the channel opened by the requester. - pub fn send_response(&self, resp: Response, channel: MsgResponder) -> Result<()> { + pub fn send_response(&self, resp: Response, channel: MsgResponder) { self.send_swarm_cmd(SwarmCmd::SendResponse { resp, channel }) } /// Return a `SwarmLocalState` with some information obtained from swarm's local state. pub async fn get_swarm_local_state(&self) -> Result<SwarmLocalState> { let (sender, receiver) = oneshot::channel(); - self.send_swarm_cmd(SwarmCmd::GetSwarmLocalState(sender))?; + self.send_swarm_cmd(SwarmCmd::GetSwarmLocalState(sender)); let state = receiver.await?; Ok(state) } - pub fn start_handle_gossip(&self) -> Result<()> { + pub fn start_handle_gossip(&self) { self.send_swarm_cmd(SwarmCmd::GossipHandler) } - pub fn trigger_interval_replication(&self) -> Result<()> { + pub fn trigger_interval_replication(&self) { self.send_swarm_cmd(SwarmCmd::TriggerIntervalReplication) } pub fn notify_node_status(&self, peer_id: PeerId, addrs: HashSet<Multiaddr>, is_bad: bool) { - if let Err(error) = self.send_swarm_cmd(SwarmCmd::SendNodeStatus { + self.send_swarm_cmd(SwarmCmd::SendNodeStatus { peer_id, addrs, is_bad, - }) { - error!("Error while notifying node status: {error:?}"); - } + }); } // Helper to send SwarmCmd - fn send_swarm_cmd(&self, cmd: SwarmCmd) -> Result<()> { + fn send_swarm_cmd(&self, cmd: SwarmCmd) { send_swarm_cmd(self.swarm_cmd_sender.clone(), cmd); - Ok(()) } /// Returns the closest peers to the given `XorName`, sorted by their distance to the xor_name. @@ -757,7 +764,7 @@ impl Network { self.send_swarm_cmd(SwarmCmd::GetClosestPeersToAddressFromNetwork { key: key.clone(), sender, - })?; + }); let k_bucket_peers = receiver.await?; // Count self in if among the CLOSE_GROUP_SIZE closest and sort the result diff --git a/sn_node/Cargo.toml b/sn_node/Cargo.toml index 53f9276177..dd141a725b 100644 --- a/sn_node/Cargo.toml +++ b/sn_node/Cargo.toml @@ -72,6 +72,7 @@ strum = { version = "0.25.0", features = ["derive"] } color-eyre = "0.6.2" [dev-dependencies] +assert_matches = "1.5.0" tempfile = "3.6.0" reqwest = { version="0.11.18", default-features=false, features = ["rustls"] } sn_protocol = { path = "../sn_protocol", version = "0.15.0", features = ["test-utils", "rpc"]} diff --git a/sn_node/src/bin/safenode/rpc_service.rs b/sn_node/src/bin/safenode/rpc_service.rs index 8e991e8231..9fdf9eff7b 100644 --- a/sn_node/src/bin/safenode/rpc_service.rs +++ b/sn_node/src/bin/safenode/rpc_service.rs @@ -237,20 +237,9 @@ impl SafeNode for SafeNodeRpcService { // Assuming the rpc subscription request also force the node to handle the gossip. // So far, this is only used during test to allow counting the gossip msgs received by node. - if let Err(err) = self.running_node.start_handle_gossip() { - return Err(Status::new( - Code::Internal, - format!("Failed to start handle gossip: {err}"), - )); - } - - match self.running_node.subscribe_to_topic(topic.clone()) { - Ok(()) => Ok(Response::new(GossipsubSubscribeResponse {})), - Err(err) => Err(Status::new( - Code::Internal, - format!("Failed to subscribe to topic '{topic}': {err}"), - )), - } + self.running_node.start_handle_gossip(); + self.running_node.subscribe_to_topic(topic.clone()); + Ok(Response::new(GossipsubSubscribeResponse {})) } async fn unsubscribe_from_topic( @@ -265,13 +254,8 @@ impl SafeNode for SafeNodeRpcService { let topic = &request.get_ref().topic; - match self.running_node.unsubscribe_from_topic(topic.clone()) { - Ok(()) => Ok(Response::new(GossipsubUnsubscribeResponse {})), - Err(err) => Err(Status::new( - Code::Internal, - format!("Failed to unsubscribe from topic '{topic}': {err}"), - )), - } + self.running_node.unsubscribe_from_topic(topic.clone()); + Ok(Response::new(GossipsubUnsubscribeResponse {})) } async fn publish_on_topic( @@ -288,13 +272,8 @@ impl SafeNode for SafeNodeRpcService { // Convert the message from Vec<u8> to Bytes let msg = Bytes::from(request.get_ref().msg.clone()); - match self.running_node.publish_on_topic(topic.clone(), msg) { - Ok(()) => Ok(Response::new(GossipsubPublishResponse {})), - Err(err) => Err(Status::new( - Code::Internal, - format!("Failed to publish on topic '{topic}': {err}"), - )), - } + self.running_node.publish_on_topic(topic.clone(), msg); + Ok(Response::new(GossipsubPublishResponse {})) } async fn stop(&self, request: Request<StopRequest>) -> Result<Response<StopResponse>, Status> { diff --git a/sn_node/src/lib.rs b/sn_node/src/lib.rs index 8d53d99086..606bcb210f 100644 --- a/sn_node/src/lib.rs +++ b/sn_node/src/lib.rs @@ -135,27 +135,23 @@ impl RunningNode { } /// Subscribe to given gossipsub topic - pub fn subscribe_to_topic(&self, topic_id: String) -> Result<()> { - self.network.subscribe_to_topic(topic_id)?; - Ok(()) + pub fn subscribe_to_topic(&self, topic_id: String) { + self.network.subscribe_to_topic(topic_id); } /// Starts handling gossipsub topics - pub fn start_handle_gossip(&self) -> Result<()> { - self.network.start_handle_gossip()?; - Ok(()) + pub fn start_handle_gossip(&self) { + self.network.start_handle_gossip(); } /// Unsubscribe from given gossipsub topic - pub fn unsubscribe_from_topic(&self, topic_id: String) -> Result<()> { - self.network.unsubscribe_from_topic(topic_id)?; - Ok(()) + pub fn unsubscribe_from_topic(&self, topic_id: String) { + self.network.unsubscribe_from_topic(topic_id); } /// Publish a message on a given gossipsub topic - pub fn publish_on_topic(&self, topic_id: String, msg: Bytes) -> Result<()> { - self.network.publish_on_topic(topic_id, msg)?; - Ok(()) + pub fn publish_on_topic(&self, topic_id: String, msg: Bytes) { + self.network.publish_on_topic(topic_id, msg); } /// Set a PublicKey to start decoding and accepting Transfer notifications received over gossipsub. diff --git a/sn_node/src/node.rs b/sn_node/src/node.rs index 1a9fadeba8..ff63c05c01 100644 --- a/sn_node/src/node.rs +++ b/sn_node/src/node.rs @@ -170,9 +170,8 @@ impl NodeBuilder { info!("Picked as a forwarding node to subscribe to the {ROYALTY_TRANSFER_NOTIF_TOPIC} topic"); // Forwarder only needs to forward topic msgs on libp2p level, // i.e. no need to handle topic msgs, hence not a `listener`. - running_node - .subscribe_to_topic(ROYALTY_TRANSFER_NOTIF_TOPIC.to_string()) - .map(|()| info!("Node has been subscribed to gossipsub topic '{ROYALTY_TRANSFER_NOTIF_TOPIC}' to receive network royalties payments notifications."))?; + running_node.subscribe_to_topic(ROYALTY_TRANSFER_NOTIF_TOPIC.to_string()); + info!("Node has been subscribed to gossipsub topic '{ROYALTY_TRANSFER_NOTIF_TOPIC}' to receive network royalties payments notifications."); } } @@ -258,11 +257,7 @@ impl Node { self.record_metrics(Marker::IntervalReplicationTriggered); let _handle = spawn(async move { - if let Err(err) = Self::try_interval_replication(network) - { - error!("Error while triggering replication {err:?}"); - } - + Self::try_interval_replication(network); trace!("Periodic replication took {:?}", start.elapsed()); }); } @@ -270,7 +265,7 @@ impl Node { match node_cmd { Ok(NodeCmd::TransferNotifsFilter(filter)) => { self.transfer_notifs_filter = filter; - let _ = self.network.start_handle_gossip(); + self.network.start_handle_gossip(); } Err(err) => error!("When trying to read from the NodeCmds channel/receiver: {err:?}") } @@ -314,9 +309,7 @@ impl Node { let net_clone = self.network.clone(); self.record_metrics(Marker::IntervalReplicationTriggered); let _handle = spawn(async move { - if let Err(err) = Self::try_interval_replication(net_clone) { - error!("Error while triggering replication {err:?}"); - } + Self::try_interval_replication(net_clone); }); } NetworkEvent::PeerRemoved(peer_id, connected_peers) => { @@ -327,9 +320,7 @@ impl Node { let net = self.network.clone(); self.record_metrics(Marker::IntervalReplicationTriggered); let _handle = spawn(async move { - if let Err(e) = Self::try_interval_replication(net) { - error!("Error while triggering replication {e:?}"); - } + Self::try_interval_replication(net); }); } NetworkEvent::NewListenAddr(_) => { @@ -371,9 +362,7 @@ impl Node { let res = Self::handle_query(&network, query, payment_address).await; trace!("Sending response {res:?}"); - if let Err(error) = network.send_response(res, channel) { - error!("Error while sending response form query req: {error:?}"); - } + network.send_response(res, channel); }); } NetworkEvent::UnverifiedRecord(record) => { diff --git a/sn_node/src/put_validation.rs b/sn_node/src/put_validation.rs index 5113c6913c..7253095342 100644 --- a/sn_node/src/put_validation.rs +++ b/sn_node/src/put_validation.rs @@ -285,7 +285,7 @@ impl Node { // finally store the Record directly into the local storage debug!("Storing chunk {chunk_name:?} as Record locally"); - self.network.put_local_record(record)?; + self.network.put_local_record(record); self.record_metrics(Marker::ValidChunkRecordPutFromNetwork(&pretty_key)); @@ -327,7 +327,7 @@ impl Node { let content_hash = XorName::from_content(&record.value); debug!("Storing register {reg_addr:?} as Record locally"); - self.network.put_local_record(record)?; + self.network.put_local_record(record); self.record_metrics(Marker::ValidRegisterRecordPutFromNetwork(&pretty_key)); @@ -389,7 +389,7 @@ impl Node { publisher: None, expires: None, }; - self.network.put_local_record(record)?; + self.network.put_local_record(record); // Notify the sender of any double spend if validated_spends.len() > 1 { @@ -503,7 +503,7 @@ impl Node { trace!("Received payment of {received_fee:?} for {pretty_key}"); // Notify `record_store` that the node received a payment. - let _ = self.network.notify_payment_received(); + self.network.notify_payment_received(); // deposit the CashNotes in our wallet wallet.deposit_and_store_to_disk(&cash_notes)?; @@ -540,9 +540,7 @@ impl Node { match royalties_cash_notes_r.serialize(&mut serialiser) { Ok(()) => { let msg = msg.into_inner().freeze(); - if let Err(err) = self.network.publish_on_topic(ROYALTY_TRANSFER_NOTIF_TOPIC.to_string(), msg) { - debug!("Failed to publish a network royalties payment notification over gossipsub for record {pretty_key} and beneficiary {royalties_pk:?}: {err:?}"); - } + self.network.publish_on_topic(ROYALTY_TRANSFER_NOTIF_TOPIC.to_string(), msg); } Err(err) => warn!("Failed to serialise network royalties payment data to publish a notification over gossipsub for record {pretty_key}: {err:?}"), } diff --git a/sn_node/src/replication.rs b/sn_node/src/replication.rs index 271fa915ca..561d7d1995 100644 --- a/sn_node/src/replication.rs +++ b/sn_node/src/replication.rs @@ -21,8 +21,8 @@ use tokio::task::{spawn, JoinHandle}; impl Node { /// Sends _all_ record keys every interval to all peers within the REPLICATE_RANGE. - pub(crate) fn try_interval_replication(network: Network) -> Result<()> { - Ok(network.trigger_interval_replication()?) + pub(crate) fn try_interval_replication(network: Network) { + network.trigger_interval_replication() } /// Get the Record from a peer or from the network without waiting. @@ -173,7 +173,7 @@ impl Node { keys: keys.clone(), }); - let _ = network.send_req_ignore_reply(request, *peer_id); + network.send_req_ignore_reply(request, *peer_id); } trace!( "Completed replicate fresh record {pretty_key:?} on store, in {:?}", diff --git a/sn_node/tests/double_spend.rs b/sn_node/tests/double_spend.rs index f8444c0be3..c5242017f8 100644 --- a/sn_node/tests/double_spend.rs +++ b/sn_node/tests/double_spend.rs @@ -9,11 +9,12 @@ mod common; use assert_fs::TempDir; +use assert_matches::assert_matches; use common::client::{get_gossip_client_and_funded_wallet, get_wallet}; use eyre::Result; use sn_logging::LogBuilder; use sn_transfers::{ - rng, DerivationIndex, Hash, HotWallet, MainSecretKey, NanoTokens, OfflineTransfer, + rng, DerivationIndex, Hash, HotWallet, MainSecretKey, NanoTokens, OfflineTransfer, WalletError, GENESIS_CASHNOTE, GENESIS_CASHNOTE_SK, }; use tracing::info; @@ -116,13 +117,13 @@ async fn genesis_double_spend_fail() -> Result<()> { let reason_hash = Hash::default(); let transfer = OfflineTransfer::new(genesis_cashnote, vec![recipient], change_addr, reason_hash)?; - std::mem::drop(exclusive_access); // send the transfer to the network which will mark genesis as a double spent // making its direct descendants unspendable let res = client .send_spends(transfer.all_spend_requests.iter(), false) .await; + std::mem::drop(exclusive_access); assert!(res.is_ok()); // put the bad cashnote in the first wallet @@ -148,13 +149,13 @@ async fn genesis_double_spend_fail() -> Result<()> { change_addr, reason_hash, )?; - std::mem::drop(exclusive_access); // send the transfer to the network which should reject it let res = client .send_spends(transfer2.all_spend_requests.iter(), false) .await; - assert!(res.is_err()); + std::mem::drop(exclusive_access); + assert_matches!(res, Err(WalletError::CouldNotSendMoney(_))); Ok(()) } diff --git a/sn_node/tests/nodes_rewards.rs b/sn_node/tests/nodes_rewards.rs index 81af185886..5187bea21a 100644 --- a/sn_node/tests/nodes_rewards.rs +++ b/sn_node/tests/nodes_rewards.rs @@ -370,7 +370,7 @@ mod tests { let sk = SecretKey::from_hex(sn_transfers::GENESIS_CASHNOTE_SK)?; let mut wallet = HotWallet::load_from_path(&temp_dir, Some(MainSecretKey::new(sk)))?; let royalties_pk = NETWORK_ROYALTIES_PK.public_key(); - client.subscribe_to_topic(ROYALTY_TRANSFER_NOTIF_TOPIC.to_string())?; + client.subscribe_to_topic(ROYALTY_TRANSFER_NOTIF_TOPIC.to_string()); let mut events_receiver = client.events_channel(); diff --git a/sn_transfers/src/cashnotes/unique_keys.rs b/sn_transfers/src/cashnotes/unique_keys.rs index d6d3f0bf0d..3d1a40c224 100644 --- a/sn_transfers/src/cashnotes/unique_keys.rs +++ b/sn_transfers/src/cashnotes/unique_keys.rs @@ -146,6 +146,12 @@ impl std::fmt::Debug for UniquePubkey { } } +impl std::fmt::Display for UniquePubkey { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.to_hex()) + } +} + /// This is the key that unlocks the value of a CashNote. /// Holding this key gives you access to the tokens of the /// CashNote with the corresponding UniquePubkey. diff --git a/sn_transfers/src/wallet/wallet_file.rs b/sn_transfers/src/wallet/wallet_file.rs index 4ba84e7db0..08f09d3497 100644 --- a/sn_transfers/src/wallet/wallet_file.rs +++ b/sn_transfers/src/wallet/wallet_file.rs @@ -22,7 +22,7 @@ use std::{ const WALLET_FILE_NAME: &str = "wallet"; const WALLET_LOCK_FILE_NAME: &str = "wallet.lock"; const CASHNOTES_DIR_NAME: &str = "cash_notes"; -const UNCONFRIMED_TX_NAME: &str = "unconfirmed_spend_requests"; +const UNCONFIRMED_TX_NAME: &str = "unconfirmed_spend_requests"; /// Writes the `KeyLessWallet` to the specified path. pub(super) fn store_wallet(wallet_dir: &Path, wallet: &KeyLessWallet) -> Result<()> { @@ -48,7 +48,7 @@ pub(super) fn store_unconfirmed_spend_requests( wallet_dir: &Path, unconfirmed_spend_requests: &BTreeSet<SignedSpend>, ) -> Result<()> { - let unconfirmed_spend_requests_path = wallet_dir.join(UNCONFRIMED_TX_NAME); + let unconfirmed_spend_requests_path = wallet_dir.join(UNCONFIRMED_TX_NAME); let mut file = fs::File::create(unconfirmed_spend_requests_path)?; let mut serialiser = rmp_serde::encode::Serializer::new(&mut file); @@ -61,7 +61,7 @@ pub(super) fn remove_unconfirmed_spend_requests( wallet_dir: &Path, _unconfirmed_spend_requests: &BTreeSet<SignedSpend>, ) -> Result<()> { - let unconfirmed_spend_requests_path = wallet_dir.join(UNCONFRIMED_TX_NAME); + let unconfirmed_spend_requests_path = wallet_dir.join(UNCONFIRMED_TX_NAME); debug!("Removing unconfirmed_spend_requests from {unconfirmed_spend_requests_path:?}"); fs::remove_file(unconfirmed_spend_requests_path)?; @@ -72,7 +72,7 @@ pub(super) fn remove_unconfirmed_spend_requests( pub(super) fn get_unconfirmed_spend_requests( wallet_dir: &Path, ) -> Result<Option<BTreeSet<SignedSpend>>> { - let path = wallet_dir.join(UNCONFRIMED_TX_NAME); + let path = wallet_dir.join(UNCONFIRMED_TX_NAME); if !path.is_file() { return Ok(None); }