Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rate limit fixes #965

Merged
merged 3 commits into from
Nov 9, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
@@ -319,7 +319,7 @@ proc validateAndRelay(g: GossipSub,
of ValidationResult.Reject:
debug "Dropping message after validation, reason: reject",
msgId = shortLog(msgId), peer
g.punishInvalidMessage(peer, msg)
await g.punishInvalidMessage(peer, msg)
return
of ValidationResult.Ignore:
debug "Dropping message after validation, reason: ignore",
@@ -496,14 +496,14 @@ method rpcHandler*(g: GossipSub,
# always validate if signature is present or required
debug "Dropping message due to failed signature verification",
msgId = shortLog(msgId), peer
g.punishInvalidMessage(peer, msg)
await g.punishInvalidMessage(peer, msg)
continue

if msg.seqno.len > 0 and msg.seqno.len != 8:
# if we have seqno should be 8 bytes long
debug "Dropping message due to invalid seqno length",
msgId = shortLog(msgId), peer
g.punishInvalidMessage(peer, msg)
await g.punishInvalidMessage(peer, msg)
continue

# g.anonymize needs no evaluation when receiving messages
8 changes: 4 additions & 4 deletions libp2p/protocols/pubsub/gossipsub/scoring.nim
Original file line number Diff line number Diff line change
@@ -240,15 +240,15 @@ proc scoringHeartbeat*(g: GossipSub) {.async.} =
trace "running scoring heartbeat", instance = cast[int](g)
g.updateScores()

proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, msg: Message) =
proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, msg: Message) {.async.} =
let uselessAppBytesNum = msg.data.len
peer.overheadRateLimitOpt.withValue(overheadRateLimit):
if not overheadRateLimit.tryConsume(uselessAppBytesNum):
debug "Peer sent invalid message and it's above rate limit", peer, uselessAppBytesNum
libp2p_gossipsub_peers_rate_limit_hits.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes.
# discard g.disconnectPeer(peer)
# debug "Peer disconnected", peer, uselessAppBytesNum
# raise newException(PeerRateLimitError, "Peer sent invalid message and it's above rate limit")
if g.parameters.disconnectPeerAboveRateLimit:
await g.disconnectPeer(peer)
raise newException(PeerRateLimitError, "Peer disconnected because it's above rate limit.")


for tt in msg.topicIds:
57 changes: 47 additions & 10 deletions tests/pubsub/testgossipsub.nim
Original file line number Diff line number Diff line change
@@ -964,18 +964,22 @@ suite "GossipSub":
let rateLimitHits = currentRateLimitHits()
let (nodes, gossip0, gossip1) = await initializeGossipTest()

let msg = RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: "Valid data".toBytes)])
gossip0.broadcast(gossip0.mesh["foobar"], msg)
# Avoid being disconnected by failing signature verification
gossip0.verifySignature = false
gossip1.verifySignature = false

gossip0.broadcast(gossip0.mesh["foobar"], RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](10))]))
await sleepAsync(300.millis)

check currentRateLimitHits() == rateLimitHits
check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true

# Disconnect peer when rate limiting is enabled
gossip1.parameters.disconnectPeerAboveRateLimit = true
gossip0.broadcast(gossip0.mesh["foobar"], msg)
gossip0.broadcast(gossip0.mesh["foobar"], RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](12))]))
await sleepAsync(300.millis)

checkExpiring gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true
check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true
check currentRateLimitHits() == rateLimitHits

await stopNodes(nodes)
@@ -986,16 +990,15 @@ suite "GossipSub":
let (nodes, gossip0, gossip1) = await initializeGossipTest()

# Simulate sending an undecodable message
let msg = newSeqWith[byte](30, 1.byte)
await gossip1.peers[gossip0.switch.peerInfo.peerId].sendEncoded(msg)
await gossip1.peers[gossip0.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](33, 1.byte))
await sleepAsync(300.millis)

check currentRateLimitHits() == rateLimitHits + 1
check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true

# Disconnect peer when rate limiting is enabled
gossip1.parameters.disconnectPeerAboveRateLimit = true
await gossip0.peers[gossip1.switch.peerInfo.peerId].sendEncoded(msg)
await gossip0.peers[gossip1.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](35, 1.byte))

checkExpiring gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
check currentRateLimitHits() == rateLimitHits + 2
@@ -1008,10 +1011,9 @@ suite "GossipSub":

let msg = RPCMsg(control: some(ControlMessage(prune: @[
ControlPrune(topicID: "foobar", peers: @[
PeerInfoMsg(peerId: PeerId(data: newSeq[byte](30)))
PeerInfoMsg(peerId: PeerId(data: newSeq[byte](33)))
], backoff: 123'u64)
])))

gossip0.broadcast(gossip0.mesh["foobar"], msg)
await sleepAsync(300.millis)

@@ -1020,7 +1022,42 @@ suite "GossipSub":

# Disconnect peer when rate limiting is enabled
gossip1.parameters.disconnectPeerAboveRateLimit = true
gossip0.broadcast(gossip0.mesh["foobar"], msg)
let msg2 = RPCMsg(control: some(ControlMessage(prune: @[
ControlPrune(topicID: "foobar", peers: @[
PeerInfoMsg(peerId: PeerId(data: newSeq[byte](35)))
], backoff: 123'u64)
])))
gossip0.broadcast(gossip0.mesh["foobar"], msg2)

checkExpiring gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
check currentRateLimitHits() == rateLimitHits + 2

await stopNodes(nodes)

asyncTest "e2e - GossipSub should rate limit invalid messages above the size allowed":
let rateLimitHits = currentRateLimitHits()
let (nodes, gossip0, gossip1) = await initializeGossipTest()

let topic = "foobar"
proc execValidator(topic: string, message: messages.Message): Future[ValidationResult] {.raises: [].} =
let res = newFuture[ValidationResult]()
res.complete(ValidationResult.Reject)
res

gossip0.addValidator(topic, execValidator)
gossip1.addValidator(topic, execValidator)

let msg = RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](40))])

gossip0.broadcast(gossip0.mesh[topic], msg)
await sleepAsync(300.millis)

check currentRateLimitHits() == rateLimitHits + 1
check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true

# Disconnect peer when rate limiting is enabled
gossip1.parameters.disconnectPeerAboveRateLimit = true
gossip0.broadcast(gossip0.mesh[topic], RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](35))]))

checkExpiring gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
check currentRateLimitHits() == rateLimitHits + 2