Skip to content

Commit

Permalink
remove default value from isHighPriority param
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomrsantos committed Feb 13, 2024
1 parent 189ab50 commit 558eca1
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 31 deletions.
4 changes: 2 additions & 2 deletions libp2p/protocols/pubsub/floodsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ method rpcHandler*(f: FloodSub,

# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
f.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
f.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false)
trace "Forwared message to peers", peers = toSendPeers.len

f.updateMetrics(rpcMsg)
Expand Down Expand Up @@ -219,7 +219,7 @@ method publish*(f: FloodSub,
return 0

# Try to send to all peers that are known to be interested
f.broadcast(peers, RPCMsg(messages: @[msg]))
f.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true)

when defined(libp2p_expensive_metrics):
libp2p_pubsub_messages_published.inc(labelValues = [topic])
Expand Down
14 changes: 7 additions & 7 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
trace "sending control message", msg = shortLog(respControl), peer
g.send(
peer,
RPCMsg(control: some(respControl)), true)
RPCMsg(control: some(respControl)), isHighPriority = true)

if messages.len > 0:
for smsg in messages:
Expand All @@ -314,7 +314,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
trace "sending iwant reply messages", peer
g.send(
peer,
RPCMsg(messages: messages), false)
RPCMsg(messages: messages), isHighPriority = false)

proc validateAndRelay(g: GossipSub,
msg: Message,
Expand Down Expand Up @@ -367,7 +367,7 @@ proc validateAndRelay(g: GossipSub,
if msg.data.len > msgId.len * 10:
g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(
idontwant: @[ControlIWant(messageIds: @[msgId])]
))))
))), isHighPriority = true)

for peer in toSendPeers:
for heDontWant in peer.heDontWants:
Expand All @@ -381,7 +381,7 @@ proc validateAndRelay(g: GossipSub,

# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), false)
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false)
trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer
for topic in msg.topicIds:
if topic notin g.topics: continue
Expand Down Expand Up @@ -452,7 +452,7 @@ method rpcHandler*(g: GossipSub,
peer.recvObservers(rpcMsg)

if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0:
g.send(peer, RPCMsg(pong: rpcMsg.ping), true)
g.send(peer, RPCMsg(pong: rpcMsg.ping), isHighPriority = true)
peer.pingBudget.dec
for i in 0..<min(g.topicsHigh, rpcMsg.subscriptions.len):
template sub: untyped = rpcMsg.subscriptions[i]
Expand Down Expand Up @@ -562,7 +562,7 @@ method onTopicSubscription*(g: GossipSub, topic: string, subscribed: bool) =
topicID: topic,
peers: g.peerExchangeList(topic),
backoff: g.parameters.unsubscribeBackoff.seconds.uint64)])))
g.broadcast(mpeers, msg)
g.broadcast(mpeers, msg, isHighPriority = true)

for peer in mpeers:
g.pruned(peer, topic, backoff = some(g.parameters.unsubscribeBackoff))
Expand Down Expand Up @@ -666,7 +666,7 @@ method publish*(g: GossipSub,

g.mcache.put(msgId, msg)

g.broadcast(peers, RPCMsg(messages: @[msg]), true)
g.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true)

if g.knownTopics.contains(topic):
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = [topic])
Expand Down
8 changes: 4 additions & 4 deletions libp2p/protocols/pubsub/gossipsub/behavior.nim
Original file line number Diff line number Diff line change
Expand Up @@ -530,14 +530,14 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
# Send changes to peers after table updates to avoid stale state
if grafts.len > 0:
let graft = RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)])))
g.broadcast(grafts, graft)
g.broadcast(grafts, graft, isHighPriority = true)
if prunes.len > 0:
let prune = RPCMsg(control: some(ControlMessage(
prune: @[ControlPrune(
topicID: topic,
peers: g.peerExchangeList(topic),
backoff: g.parameters.pruneBackoff.seconds.uint64)])))
g.broadcast(prunes, prune)
g.broadcast(prunes, prune, isHighPriority = true)

proc dropFanoutPeers*(g: GossipSub) {.raises: [].} =
# drop peers that we haven't published to in
Expand Down Expand Up @@ -669,7 +669,7 @@ proc onHeartbeat(g: GossipSub) {.raises: [].} =
topicID: t,
peers: g.peerExchangeList(t),
backoff: g.parameters.pruneBackoff.seconds.uint64)])))
g.broadcast(prunes, prune)
g.broadcast(prunes, prune, isHighPriority = true)

Check warning on line 672 in libp2p/protocols/pubsub/gossipsub/behavior.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/gossipsub/behavior.nim#L672

Added line #L672 was not covered by tests

# pass by ptr in order to both signal we want to update metrics
# and as well update the struct for each topic during this iteration
Expand All @@ -691,7 +691,7 @@ proc onHeartbeat(g: GossipSub) {.raises: [].} =
libp2p_pubsub_broadcast_ihave.inc(labelValues = [ihave.topicId])
else:
libp2p_pubsub_broadcast_ihave.inc(labelValues = ["generic"])
g.send(peer, RPCMsg(control: some(control)))
g.send(peer, RPCMsg(control: some(control)), isHighPriority = true)

g.mcache.shift() # shift the cache

Expand Down
6 changes: 3 additions & 3 deletions libp2p/protocols/pubsub/pubsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} =

libp2p_pubsub_peers.set(p.peers.len.int64)

proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool = false) {.raises: [].} =
proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool) {.raises: [].} =
## Attempt to send `msg` to remote peer
##

Expand All @@ -149,7 +149,7 @@ proc broadcast*(
p: PubSub,
sendPeers: auto, # Iteratble[PubSubPeer]
msg: RPCMsg,
isHighPriority: bool = false) {.raises: [].} =
isHighPriority: bool) {.raises: [].} =
## Attempt to send `msg` to the given peers

let npeers = sendPeers.len.int64
Expand Down Expand Up @@ -208,7 +208,7 @@ proc sendSubs*(p: PubSub,
topics: openArray[string],
subscribe: bool) =
## send subscriptions to remote peer
p.send(peer, RPCMsg.withSubs(topics, subscribe), true)
p.send(peer, RPCMsg.withSubs(topics, subscribe), isHighPriority = true)

for topic in topics:
if subscribe:
Expand Down
4 changes: 2 additions & 2 deletions libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ proc sendMsg(p: PubSubPeer, msg: seq[byte]) {.async.} =

await conn.close() # This will clean up the send connection

proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) {.async.} =
proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool) {.async.} =
doAssert(not isNil(p), "pubsubpeer nil!")

if msg.len <= 0:
Expand Down Expand Up @@ -340,7 +340,7 @@ iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize:
else:
trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg)

proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool = false) {.async.} =
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool) {.async.} =
# When sending messages, we take care to re-encode them with the right
# anonymization flag to ensure that we're not penalized for sending invalid
# or malicious data on the wire - in particular, re-encoding protects against
Expand Down
8 changes: 4 additions & 4 deletions tests/pubsub/testgossipinternal.nim
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ suite "GossipSub internal":

gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
ihave: @[ControlIHave(topicId: "foobar", messageIds: iwantMessageIds)]
))))
))), isHighPriority = false)

checkUntilTimeout: receivedMessages[] == sentMessages
check receivedMessages[].len == 2
Expand All @@ -796,7 +796,7 @@ suite "GossipSub internal":

gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)]
))))
))), isHighPriority = false)

await sleepAsync(300.milliseconds)
checkUntilTimeout: receivedMessages[].len == 0
Expand All @@ -813,7 +813,7 @@ suite "GossipSub internal":

gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)]
))))
))), isHighPriority = false)

checkUntilTimeout: receivedMessages[] == sentMessages
check receivedMessages[].len == 2
Expand All @@ -831,7 +831,7 @@ suite "GossipSub internal":

gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage(
ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)]
))))
))), isHighPriority = false)

var smallestSet: HashSet[seq[byte]]
let seqs = toSeq(sentMessages)
Expand Down
27 changes: 18 additions & 9 deletions tests/pubsub/testgossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ suite "GossipSub":

gossip3.broadcast(gossip3.mesh["foobar"], RPCMsg(control: some(ControlMessage(
idontwant: @[ControlIWant(messageIds: @[newSeq[byte](10)])]
))))
))), isHighPriority = true)
checkUntilTimeout: gossip2.mesh.getOrDefault("foobar").anyIt(it.heDontWants[^1].len == 1)

tryPublish await nodes[0].publish("foobar", newSeq[byte](10000)), 1
Expand Down Expand Up @@ -968,15 +968,21 @@ suite "GossipSub":
let rateLimitHits = currentRateLimitHits()
let (nodes, gossip0, gossip1) = await initializeGossipTest()

gossip0.broadcast(gossip0.mesh["foobar"], RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](10))]))
gossip0.broadcast(
gossip0.mesh["foobar"],
RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](10))]),
isHighPriority = true)
await sleepAsync(300.millis)

check currentRateLimitHits() == rateLimitHits
check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true

# Disconnect peer when rate limiting is enabled
gossip1.parameters.disconnectPeerAboveRateLimit = true
gossip0.broadcast(gossip0.mesh["foobar"], RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](12))]))
gossip0.broadcast(
gossip0.mesh["foobar"],
RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](12))]),
isHighPriority = true)
await sleepAsync(300.millis)

check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true
Expand All @@ -990,15 +996,15 @@ suite "GossipSub":
let (nodes, gossip0, gossip1) = await initializeGossipTest()

# Simulate sending an undecodable message
await gossip1.peers[gossip0.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](33, 1.byte))
await gossip1.peers[gossip0.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](33, 1.byte), isHighPriority = true)
await sleepAsync(300.millis)

check currentRateLimitHits() == rateLimitHits + 1
check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true

# Disconnect peer when rate limiting is enabled
gossip1.parameters.disconnectPeerAboveRateLimit = true
await gossip0.peers[gossip1.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](35, 1.byte))
await gossip0.peers[gossip1.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](35, 1.byte), isHighPriority = true)

checkUntilTimeout gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
check currentRateLimitHits() == rateLimitHits + 2
Expand All @@ -1014,7 +1020,7 @@ suite "GossipSub":
PeerInfoMsg(peerId: PeerId(data: newSeq[byte](33)))
], backoff: 123'u64)
])))
gossip0.broadcast(gossip0.mesh["foobar"], msg)
gossip0.broadcast(gossip0.mesh["foobar"], msg, isHighPriority = true)
await sleepAsync(300.millis)

check currentRateLimitHits() == rateLimitHits + 1
Expand All @@ -1027,7 +1033,7 @@ suite "GossipSub":
PeerInfoMsg(peerId: PeerId(data: newSeq[byte](35)))
], backoff: 123'u64)
])))
gossip0.broadcast(gossip0.mesh["foobar"], msg2)
gossip0.broadcast(gossip0.mesh["foobar"], msg2, isHighPriority = true)

checkUntilTimeout gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
check currentRateLimitHits() == rateLimitHits + 2
Expand All @@ -1049,15 +1055,18 @@ suite "GossipSub":

let msg = RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](40))])

gossip0.broadcast(gossip0.mesh[topic], msg)
gossip0.broadcast(gossip0.mesh[topic], msg, isHighPriority = true)
await sleepAsync(300.millis)

check currentRateLimitHits() == rateLimitHits + 1
check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true

# Disconnect peer when rate limiting is enabled
gossip1.parameters.disconnectPeerAboveRateLimit = true
gossip0.broadcast(gossip0.mesh[topic], RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](35))]))
gossip0.broadcast(
gossip0.mesh[topic],
RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](35))]),
isHighPriority = true)

checkUntilTimeout gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
check currentRateLimitHits() == rateLimitHits + 2
Expand Down

0 comments on commit 558eca1

Please sign in to comment.