diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 5a5dd621b4..cdb75d024c 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -318,7 +318,7 @@ proc validateAndRelay(g: GossipSub, of ValidationResult.Reject: debug "Dropping message after validation, reason: reject", msgId = shortLog(msgId), peer - g.punishInvalidMessage(peer, msg) + await g.punishInvalidMessage(peer, msg) return of ValidationResult.Ignore: debug "Dropping message after validation, reason: ignore", @@ -492,14 +492,14 @@ method rpcHandler*(g: GossipSub, # always validate if signature is present or required debug "Dropping message due to failed signature verification", msgId = shortLog(msgId), peer - g.punishInvalidMessage(peer, msg) + await g.punishInvalidMessage(peer, msg) continue if msg.seqno.len > 0 and msg.seqno.len != 8: # if we have seqno should be 8 bytes long debug "Dropping message due to invalid seqno length", msgId = shortLog(msgId), peer - g.punishInvalidMessage(peer, msg) + await g.punishInvalidMessage(peer, msg) continue # g.anonymize needs no evaluation when receiving messages diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index 4fe3060723..ee4f34da00 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -240,15 +240,15 @@ proc scoringHeartbeat*(g: GossipSub) {.async.} = trace "running scoring heartbeat", instance = cast[int](g) g.updateScores() -proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, msg: Message) = +proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, msg: Message) {.async.} = let uselessAppBytesNum = msg.data.len peer.overheadRateLimitOpt.withValue(overheadRateLimit): if not overheadRateLimit.tryConsume(uselessAppBytesNum): debug "Peer sent invalid message and it's above rate limit", peer, uselessAppBytesNum libp2p_gossipsub_peers_rate_limit_hits.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes. - # discard g.disconnectPeer(peer) - # debug "Peer disconnected", peer, uselessAppBytesNum - # raise newException(PeerRateLimitError, "Peer sent invalid message and it's above rate limit") + if g.parameters.disconnectPeerAboveRateLimit: + await g.disconnectPeer(peer) + raise newException(PeerRateLimitError, "Peer disconnected because it's above rate limit.") for tt in msg.topicIds: diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 25562b30e8..712829dcd6 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -952,6 +952,10 @@ suite "GossipSub": gossip1.subscribe("foobar", handle) await waitSubGraph(nodes, "foobar") + # Avoid being disconnected by failing signature verification + gossip0.verifySignature = false + gossip1.verifySignature = false + return (nodes, gossip0, gossip1) proc currentRateLimitHits(): float64 = @@ -964,8 +968,7 @@ suite "GossipSub": let rateLimitHits = currentRateLimitHits() let (nodes, gossip0, gossip1) = await initializeGossipTest() - let msg = RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: "Valid data".toBytes)]) - gossip0.broadcast(gossip0.mesh["foobar"], msg) + gossip0.broadcast(gossip0.mesh["foobar"], RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](10))])) await sleepAsync(300.millis) check currentRateLimitHits() == rateLimitHits @@ -973,9 +976,10 @@ suite "GossipSub": # Disconnect peer when rate limiting is enabled gossip1.parameters.disconnectPeerAboveRateLimit = true - gossip0.broadcast(gossip0.mesh["foobar"], msg) + gossip0.broadcast(gossip0.mesh["foobar"], RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](12))])) + await sleepAsync(300.millis) - checkExpiring gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true + check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true check currentRateLimitHits() == rateLimitHits await stopNodes(nodes) @@ -986,8 +990,7 @@ suite "GossipSub": let (nodes, gossip0, gossip1) = await initializeGossipTest() # Simulate sending an undecodable message - let msg = newSeqWith[byte](30, 1.byte) - await gossip1.peers[gossip0.switch.peerInfo.peerId].sendEncoded(msg) + await gossip1.peers[gossip0.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](33, 1.byte)) await sleepAsync(300.millis) check currentRateLimitHits() == rateLimitHits + 1 @@ -995,7 +998,7 @@ suite "GossipSub": # Disconnect peer when rate limiting is enabled gossip1.parameters.disconnectPeerAboveRateLimit = true - await gossip0.peers[gossip1.switch.peerInfo.peerId].sendEncoded(msg) + await gossip0.peers[gossip1.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](35, 1.byte)) checkExpiring gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false check currentRateLimitHits() == rateLimitHits + 2 @@ -1008,10 +1011,9 @@ suite "GossipSub": let msg = RPCMsg(control: some(ControlMessage(prune: @[ ControlPrune(topicID: "foobar", peers: @[ - PeerInfoMsg(peerId: PeerId(data: newSeq[byte](30))) + PeerInfoMsg(peerId: PeerId(data: newSeq[byte](33))) ], backoff: 123'u64) ]))) - gossip0.broadcast(gossip0.mesh["foobar"], msg) await sleepAsync(300.millis) @@ -1020,7 +1022,42 @@ suite "GossipSub": # Disconnect peer when rate limiting is enabled gossip1.parameters.disconnectPeerAboveRateLimit = true - gossip0.broadcast(gossip0.mesh["foobar"], msg) + let msg2 = RPCMsg(control: some(ControlMessage(prune: @[ + ControlPrune(topicID: "foobar", peers: @[ + PeerInfoMsg(peerId: PeerId(data: newSeq[byte](35))) + ], backoff: 123'u64) + ]))) + gossip0.broadcast(gossip0.mesh["foobar"], msg2) + + checkExpiring gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false + check currentRateLimitHits() == rateLimitHits + 2 + + await stopNodes(nodes) + + asyncTest "e2e - GossipSub should rate limit invalid messages above the size allowed": + let rateLimitHits = currentRateLimitHits() + let (nodes, gossip0, gossip1) = await initializeGossipTest() + + let topic = "foobar" + proc execValidator(topic: string, message: messages.Message): Future[ValidationResult] {.raises: [].} = + let res = newFuture[ValidationResult]() + res.complete(ValidationResult.Reject) + res + + gossip0.addValidator(topic, execValidator) + gossip1.addValidator(topic, execValidator) + + let msg = RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](40))]) + + gossip0.broadcast(gossip0.mesh[topic], msg) + await sleepAsync(300.millis) + + check currentRateLimitHits() == rateLimitHits + 1 + check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true + + # Disconnect peer when rate limiting is enabled + gossip1.parameters.disconnectPeerAboveRateLimit = true + gossip0.broadcast(gossip0.mesh[topic], RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](35))])) checkExpiring gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false check currentRateLimitHits() == rateLimitHits + 2