diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 1ff4c5969f..83e4c48d18 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -46,6 +46,9 @@ declareCounter(libp2p_gossipsub_saved_bytes, "bytes saved by gossipsub optimizat declareCounter(libp2p_gossipsub_duplicate, "number of duplicates received") declareCounter(libp2p_gossipsub_received, "number of messages received (deduplicated)") +export libp2p_gossipsub_duplicate_during_validation, libp2p_gossipsub_idontwant_saved_messages, + libp2p_gossipsub_saved_bytes, libp2p_gossipsub_duplicate, libp2p_gossipsub_received + proc init*(_: type[GossipSubParams]): GossipSubParams = GossipSubParams( explicit: true, @@ -270,7 +273,14 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = g.handlePrune(peer, control.prune) var respControl: ControlMessage - g.handleIDontWant(peer, control.idontwant) + var respCtrl: ControlMessage #for idontwant messages + + let ineed = g.handleIDontWant(peer, control.idontwant) + if ineed.messageIds.len > 0: + respCtrl.iwant.add(ineed) + var mess: seq[Message] #g.handleIWant(peer, control.idontwant) + g.send(peer, RPCMsg(control: some(respCtrl), messages: mess)) + let iwant = g.handleIHave(peer, control.ihave) if iwant.messageIds.len > 0: respControl.iwant.add(iwant) @@ -349,13 +359,7 @@ proc validateAndRelay(g: GossipSub, toSendPeers.excl(peer) toSendPeers.excl(seenPeers) - # IDontWant is only worth it if the message is substantially - # bigger than the messageId - if msg.data.len > msgId.len * 10: - g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage( - idontwant: @[ControlIWant(messageIds: @[msgId])] - )))) - + #we dont send idont want to peers from which we already received idontwant for peer in toSendPeers: for heDontWant in peer.heDontWants: if msgId in heDontWant: @@ -366,6 +370,21 @@ proc validateAndRelay(g: GossipSub, toSendPeers.excl(seenPeers) + # IDontWant is only worth it if the message is substantially bigger than the messageId + # We randomly send large message to a maximum of dLow peers, others get idontwant + if msg.data.len > msgId.len * 10: + var dontwantPeers: HashSet[PubSubPeer] + var lrgMsgPeers = toSendPeers.toSeq() + g.rng.shuffle(lrgMsgPeers) + for dpeer in lrgMsgPeers: + if (lrgMsgPeers.len - dontwantPeers.len) < g.parameters.dLow: break + dontwantPeers.incl(dpeer) + + g.broadcast(dontwantPeers, RPCMsg(control: some(ControlMessage( + idontwant: @[ControlIWant(messageIds: @[msgId])] + )))) + toSendPeers.excl(dontwantPeers) + # In theory, if topics are the same in all messages, we could batch - we'd # also have to be careful to only include validated messages g.broadcast(toSendPeers, RPCMsg(messages: @[msg])) diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index 008e197448..b0c75e9b55 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -266,12 +266,19 @@ proc handleIHave*(g: GossipSub, proc handleIDontWant*(g: GossipSub, peer: PubSubPeer, - iDontWants: seq[ControlIWant]) = + iDontWants: seq[ControlIWant]): ControlIWant {.raises: [].}= + var res: ControlIWant for dontWant in iDontWants: for messageId in dontWant.messageIds: if peer.heDontWants[^1].len > 1000: break if messageId.len > 100: continue peer.heDontWants[^1].incl(messageId) + #we can ask for unreceived message, learnt through idontwantmessage + if (not g.hasSeen(messageId)) and messageId notin g.outstandingIWANTs: + if peer.score < g.parameters.gossipThreshold: continue #dont request from low score peer + g.outstandingIWANTs[messageId] = IWANTRequest(messageId: messageId, peer: peer, timestamp: Moment.now()) + res.messageIds.add(messageId) + return res proc handleIWant*(g: GossipSub, peer: PubSubPeer,