Skip to content

Commit

Permalink
Rate limit fixes (#965)
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomrsantos authored and romanzac committed Dec 13, 2023
1 parent c876904 commit b835100
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 17 deletions.
6 changes: 3 additions & 3 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ proc validateAndRelay(g: GossipSub,
of ValidationResult.Reject:
debug "Dropping message after validation, reason: reject",
msgId = shortLog(msgId), peer
g.punishInvalidMessage(peer, msg)
await g.punishInvalidMessage(peer, msg)
return
of ValidationResult.Ignore:
debug "Dropping message after validation, reason: ignore",
Expand Down Expand Up @@ -492,14 +492,14 @@ method rpcHandler*(g: GossipSub,
# always validate if signature is present or required
debug "Dropping message due to failed signature verification",
msgId = shortLog(msgId), peer
g.punishInvalidMessage(peer, msg)
await g.punishInvalidMessage(peer, msg)
continue

if msg.seqno.len > 0 and msg.seqno.len != 8:
# if we have seqno should be 8 bytes long
debug "Dropping message due to invalid seqno length",
msgId = shortLog(msgId), peer
g.punishInvalidMessage(peer, msg)
await g.punishInvalidMessage(peer, msg)
continue

# g.anonymize needs no evaluation when receiving messages
Expand Down
8 changes: 4 additions & 4 deletions libp2p/protocols/pubsub/gossipsub/scoring.nim
Original file line number Diff line number Diff line change
Expand Up @@ -240,15 +240,15 @@ proc scoringHeartbeat*(g: GossipSub) {.async.} =
trace "running scoring heartbeat", instance = cast[int](g)
g.updateScores()

proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, msg: Message) =
proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, msg: Message) {.async.} =
let uselessAppBytesNum = msg.data.len
peer.overheadRateLimitOpt.withValue(overheadRateLimit):
if not overheadRateLimit.tryConsume(uselessAppBytesNum):
debug "Peer sent invalid message and it's above rate limit", peer, uselessAppBytesNum
libp2p_gossipsub_peers_rate_limit_hits.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes.
# discard g.disconnectPeer(peer)
# debug "Peer disconnected", peer, uselessAppBytesNum
# raise newException(PeerRateLimitError, "Peer sent invalid message and it's above rate limit")
if g.parameters.disconnectPeerAboveRateLimit:
await g.disconnectPeer(peer)
raise newException(PeerRateLimitError, "Peer disconnected because it's above rate limit.")


for tt in msg.topicIds:
Expand Down
57 changes: 47 additions & 10 deletions tests/pubsub/testgossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,10 @@ suite "GossipSub":
gossip1.subscribe("foobar", handle)
await waitSubGraph(nodes, "foobar")

# Avoid being disconnected by failing signature verification
gossip0.verifySignature = false
gossip1.verifySignature = false

return (nodes, gossip0, gossip1)

proc currentRateLimitHits(): float64 =
Expand All @@ -964,18 +968,18 @@ suite "GossipSub":
let rateLimitHits = currentRateLimitHits()
let (nodes, gossip0, gossip1) = await initializeGossipTest()

let msg = RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: "Valid data".toBytes)])
gossip0.broadcast(gossip0.mesh["foobar"], msg)
gossip0.broadcast(gossip0.mesh["foobar"], RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](10))]))
await sleepAsync(300.millis)

check currentRateLimitHits() == rateLimitHits
check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true

# Disconnect peer when rate limiting is enabled
gossip1.parameters.disconnectPeerAboveRateLimit = true
gossip0.broadcast(gossip0.mesh["foobar"], msg)
gossip0.broadcast(gossip0.mesh["foobar"], RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](12))]))
await sleepAsync(300.millis)

checkExpiring gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true
check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true
check currentRateLimitHits() == rateLimitHits

await stopNodes(nodes)
Expand All @@ -986,16 +990,15 @@ suite "GossipSub":
let (nodes, gossip0, gossip1) = await initializeGossipTest()

# Simulate sending an undecodable message
let msg = newSeqWith[byte](30, 1.byte)
await gossip1.peers[gossip0.switch.peerInfo.peerId].sendEncoded(msg)
await gossip1.peers[gossip0.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](33, 1.byte))
await sleepAsync(300.millis)

check currentRateLimitHits() == rateLimitHits + 1
check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true

# Disconnect peer when rate limiting is enabled
gossip1.parameters.disconnectPeerAboveRateLimit = true
await gossip0.peers[gossip1.switch.peerInfo.peerId].sendEncoded(msg)
await gossip0.peers[gossip1.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](35, 1.byte))

checkExpiring gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
check currentRateLimitHits() == rateLimitHits + 2
Expand All @@ -1008,10 +1011,9 @@ suite "GossipSub":

let msg = RPCMsg(control: some(ControlMessage(prune: @[
ControlPrune(topicID: "foobar", peers: @[
PeerInfoMsg(peerId: PeerId(data: newSeq[byte](30)))
PeerInfoMsg(peerId: PeerId(data: newSeq[byte](33)))
], backoff: 123'u64)
])))

gossip0.broadcast(gossip0.mesh["foobar"], msg)
await sleepAsync(300.millis)

Expand All @@ -1020,7 +1022,42 @@ suite "GossipSub":

# Disconnect peer when rate limiting is enabled
gossip1.parameters.disconnectPeerAboveRateLimit = true
gossip0.broadcast(gossip0.mesh["foobar"], msg)
let msg2 = RPCMsg(control: some(ControlMessage(prune: @[
ControlPrune(topicID: "foobar", peers: @[
PeerInfoMsg(peerId: PeerId(data: newSeq[byte](35)))
], backoff: 123'u64)
])))
gossip0.broadcast(gossip0.mesh["foobar"], msg2)

checkExpiring gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
check currentRateLimitHits() == rateLimitHits + 2

await stopNodes(nodes)

asyncTest "e2e - GossipSub should rate limit invalid messages above the size allowed":
let rateLimitHits = currentRateLimitHits()
let (nodes, gossip0, gossip1) = await initializeGossipTest()

let topic = "foobar"
proc execValidator(topic: string, message: messages.Message): Future[ValidationResult] {.raises: [].} =
let res = newFuture[ValidationResult]()
res.complete(ValidationResult.Reject)
res

gossip0.addValidator(topic, execValidator)
gossip1.addValidator(topic, execValidator)

let msg = RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](40))])

gossip0.broadcast(gossip0.mesh[topic], msg)
await sleepAsync(300.millis)

check currentRateLimitHits() == rateLimitHits + 1
check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true

# Disconnect peer when rate limiting is enabled
gossip1.parameters.disconnectPeerAboveRateLimit = true
gossip0.broadcast(gossip0.mesh[topic], RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](35))]))

checkExpiring gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
check currentRateLimitHits() == rateLimitHits + 2
Expand Down

0 comments on commit b835100

Please sign in to comment.