From 4a64c406d972b7fe266e2e102e1e447c217febe7 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Tue, 17 Dec 2024 18:55:16 -0400 Subject: [PATCH] feat: idontwant on publish --- libp2p/protocols/pubsub/gossipsub.nim | 62 ++++++++++++++------- libp2p/protocols/pubsub/gossipsub/types.nim | 3 + libp2p/utility.nim | 8 +++ tests/pubsub/testgossipsub.nim | 33 +++++++++++ 4 files changed, 85 insertions(+), 21 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index bed5636cb3..b65210e340 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -102,6 +102,7 @@ proc init*( overheadRateLimit = Opt.none(tuple[bytes: int, interval: Duration]), disconnectPeerAboveRateLimit = false, maxNumElementsInNonPriorityQueue = DefaultMaxNumElementsInNonPriorityQueue, + sendIDontWantOnPublish = false, ): GossipSubParams = GossipSubParams( explicit: true, @@ -139,6 +140,7 @@ proc init*( overheadRateLimit: overheadRateLimit, disconnectPeerAboveRateLimit: disconnectPeerAboveRateLimit, maxNumElementsInNonPriorityQueue: maxNumElementsInNonPriorityQueue, + sendIDontWantOnPublish: sendIDontWantOnPublish, ) proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] = @@ -383,6 +385,40 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = trace "sending iwant reply messages", peer g.send(peer, RPCMsg(messages: messages), isHighPriority = false) +proc sendIDontWant( + g: GossipSub, + msg: Message, + msgId: MessageId, + peersToSendIDontWant: HashSet[PubSubPeer], +) = + # If the message is "large enough", let the mesh know that we do not want + # any more copies of it, regardless if it is valid or not. + # + # In the case that it is not valid, this leads to some redundancy + # (since the other peer should not send us an invalid message regardless), + # but the expectation is that this is rare (due to such peers getting + # descored) and that the savings from honest peers are greater than the + # cost a dishonest peer can incur in short time (since the IDONTWANT is + # small). + + # IDONTWANT is only supported by >= GossipSubCodec_12 + let peers = peersToSendIDontWant.filterIt( + it.codec != GossipSubCodec_10 and it.codec != GossipSubCodec_11 + ) + + g.broadcast( + peers, + RPCMsg( + control: some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])])) + ), + isHighPriority = true, + ) + +const iDontWantMessageSizeThreshold* = 512 + +proc isLargeMessage(msg: Message, msgId: MessageId): bool = + msg.data.len > max(iDontWantMessageSizeThreshold, msgId.len * 10) + proc validateAndRelay( g: GossipSub, msg: Message, msgId: MessageId, saltedId: SaltedId, peer: PubSubPeer ) {.async.} = @@ -399,29 +435,10 @@ proc validateAndRelay( toSendPeers.incl(peers[]) toSendPeers.excl(peer) - if msg.data.len > max(512, msgId.len * 10): - # If the message is "large enough", let the mesh know that we do not want - # any more copies of it, regardless if it is valid or not. - # - # In the case that it is not valid, this leads to some redundancy - # (since the other peer should not send us an invalid message regardless), - # but the expectation is that this is rare (due to such peers getting - # descored) and that the savings from honest peers are greater than the - # cost a dishonest peer can incur in short time (since the IDONTWANT is - # small). + if isLargeMessage(msg, msgId): var peersToSendIDontWant = HashSet[PubSubPeer]() addToSendPeers(peersToSendIDontWant) - peersToSendIDontWant.exclIfIt( - it.codec == GossipSubCodec_10 or it.codec == GossipSubCodec_11 - ) - g.broadcast( - peersToSendIDontWant, - RPCMsg( - control: - some(ControlMessage(idontwant: @[ControlIWant(messageIDs: @[msgId])])) - ), - isHighPriority = true, - ) + g.sendIDontWant(msg, msgId, peersToSendIDontWant) let validation = await g.validate(msg) @@ -782,6 +799,9 @@ method publish*(g: GossipSub, topic: string, data: seq[byte]): Future[int] {.asy g.mcache.put(msgId, msg) + if g.parameters.sendIDontWantOnPublish and isLargeMessage(msg, msgId): + g.sendIDontWant(msg, msgId, peers) + g.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true) if g.knownTopics.contains(topic): diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index d50e098240..0044be9d50 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -154,6 +154,9 @@ type # Max number of elements allowed in the non-priority queue. When this limit has been reached, the peer will be disconnected. maxNumElementsInNonPriorityQueue*: int + # Broadcast an IDONTWANT message automatically when the message exceeds the IDONTWANT message size threshold + sendIDontWantOnPublish*: bool + BackoffTable* = Table[string, Table[PeerId, Moment]] ValidationSeenTable* = Table[SaltedId, HashSet[PubSubPeer]] diff --git a/libp2p/utility.nim b/libp2p/utility.nim index 62fbe97bc6..cdfd19ca5d 100644 --- a/libp2p/utility.nim +++ b/libp2p/utility.nim @@ -149,3 +149,11 @@ template exclIfIt*[T](set: var HashSet[T], condition: untyped) = if condition: toExcl.incl(it) set.excl(toExcl) + +template filterIt*[T](set: HashSet[T], condition: untyped): HashSet[T] = + var filtered = HashSet[T]() + if set.len != 0: + for it {.inject.} in set: + if condition: + filtered.incl(it) + filtered diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 7277c36258..8ea780f90c 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -928,6 +928,39 @@ suite "GossipSub": await allFuturesThrowing(nodesFut.concat()) + asyncTest "e2e - iDontWant is broadcasted on publish": + func dumbMsgIdProvider(m: Message): Result[MessageId, ValidationResult] = + ok(newSeq[byte](10)) + let + nodes = generateNodes(2, gossip = true, msgIdProvider = dumbMsgIdProvider) + + nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start()) + + await nodes[0].switch.connect( + nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs + ) + + proc handlerA(topic: string, data: seq[byte]) {.async.} = + discard + + proc handlerB(topic: string, data: seq[byte]) {.async.} = + discard + + nodes[0].subscribe("foobar", handlerA) + nodes[1].subscribe("foobar", handlerB) + await waitSubGraph(nodes, "foobar") + + var gossip2: GossipSub = GossipSub(nodes[1]) + + tryPublish await nodes[0].publish("foobar", newSeq[byte](10000)), 1 + + checkUntilTimeout: + gossip2.mesh.getOrDefault("foobar").anyIt(it.iDontWants[^1].len == 1) + + await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop()) + + await allFuturesThrowing(nodesFut.concat()) + asyncTest "e2e - iDontWant is sent only for 1.2": # 3 nodes: A <=> B <=> C # (A & C are NOT connected). We pre-emptively send a dontwant from C to B,