Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

idontwant with d=dlow-1 and iwant to pull messages! #969

Open
wants to merge 1 commit into
base: p2p-research
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 27 additions & 8 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ declareCounter(libp2p_gossipsub_saved_bytes, "bytes saved by gossipsub optimizat
declareCounter(libp2p_gossipsub_duplicate, "number of duplicates received")
declareCounter(libp2p_gossipsub_received, "number of messages received (deduplicated)")

export libp2p_gossipsub_duplicate_during_validation, libp2p_gossipsub_idontwant_saved_messages,
libp2p_gossipsub_saved_bytes, libp2p_gossipsub_duplicate, libp2p_gossipsub_received

proc init*(_: type[GossipSubParams]): GossipSubParams =
GossipSubParams(
explicit: true,
Expand Down Expand Up @@ -270,7 +273,14 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
g.handlePrune(peer, control.prune)

var respControl: ControlMessage
g.handleIDontWant(peer, control.idontwant)
var respCtrl: ControlMessage #for idontwant messages

let ineed = g.handleIDontWant(peer, control.idontwant)
if ineed.messageIds.len > 0:
respCtrl.iwant.add(ineed)
var mess: seq[Message] #g.handleIWant(peer, control.idontwant)
g.send(peer, RPCMsg(control: some(respCtrl), messages: mess))

let iwant = g.handleIHave(peer, control.ihave)
if iwant.messageIds.len > 0:
respControl.iwant.add(iwant)
Expand Down Expand Up @@ -349,13 +359,7 @@ proc validateAndRelay(g: GossipSub,
toSendPeers.excl(peer)
toSendPeers.excl(seenPeers)

# IDontWant is only worth it if the message is substantially
# bigger than the messageId
if msg.data.len > msgId.len * 10:
g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(
idontwant: @[ControlIWant(messageIds: @[msgId])]
))))

#we dont send idont want to peers from which we already received idontwant
for peer in toSendPeers:
for heDontWant in peer.heDontWants:
if msgId in heDontWant:
Expand All @@ -366,6 +370,21 @@ proc validateAndRelay(g: GossipSub,
toSendPeers.excl(seenPeers)


# IDontWant is only worth it if the message is substantially bigger than the messageId
# We randomly send large message to a maximum of dLow peers, others get idontwant
if msg.data.len > msgId.len * 10:
var dontwantPeers: HashSet[PubSubPeer]
var lrgMsgPeers = toSendPeers.toSeq()
g.rng.shuffle(lrgMsgPeers)
for dpeer in lrgMsgPeers:
if (lrgMsgPeers.len - dontwantPeers.len) < g.parameters.dLow: break
dontwantPeers.incl(dpeer)

g.broadcast(dontwantPeers, RPCMsg(control: some(ControlMessage(
idontwant: @[ControlIWant(messageIds: @[msgId])]
))))
toSendPeers.excl(dontwantPeers)

# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
Expand Down
9 changes: 8 additions & 1 deletion libp2p/protocols/pubsub/gossipsub/behavior.nim
Original file line number Diff line number Diff line change
Expand Up @@ -266,12 +266,19 @@ proc handleIHave*(g: GossipSub,

proc handleIDontWant*(g: GossipSub,
peer: PubSubPeer,
iDontWants: seq[ControlIWant]) =
iDontWants: seq[ControlIWant]): ControlIWant {.raises: [].}=
var res: ControlIWant
for dontWant in iDontWants:
for messageId in dontWant.messageIds:
if peer.heDontWants[^1].len > 1000: break
if messageId.len > 100: continue
peer.heDontWants[^1].incl(messageId)
#we can ask for unreceived message, learnt through idontwantmessage
if (not g.hasSeen(messageId)) and messageId notin g.outstandingIWANTs:
if peer.score < g.parameters.gossipThreshold: continue #dont request from low score peer
g.outstandingIWANTs[messageId] = IWANTRequest(messageId: messageId, peer: peer, timestamp: Moment.now())
res.messageIds.add(messageId)
return res

proc handleIWant*(g: GossipSub,
peer: PubSubPeer,
Expand Down