diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 5740f5d8d0..cc9ade6ca9 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -83,7 +83,8 @@ proc init*(_: type[GossipSubParams]): GossipSubParams = enablePX: false, bandwidthEstimatebps: 100_000_000, # 100 Mbps or 12.5 MBps overheadRateLimit: Opt.none(tuple[bytes: int, interval: Duration]), - disconnectPeerAboveRateLimit: false + disconnectPeerAboveRateLimit: false, + maxDurationInNonPriorityQueue: Opt.none(Duration), ) proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] = @@ -220,6 +221,8 @@ method unsubscribePeer*(g: GossipSub, peer: PeerId) = for topic, info in stats[].topicInfos.mpairs: info.firstMessageDeliveries = 0 + pubSubPeer.stopSendNonPriorityTask() + procCall FloodSub(g).unsubscribePeer(peer) proc handleSubscribe*(g: GossipSub, @@ -279,12 +282,28 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = respControl.prune.add(g.handleGraft(peer, control.graft)) let messages = g.handleIWant(peer, control.iwant) - if - respControl.prune.len > 0 or - respControl.iwant.len > 0 or - messages.len > 0: - # iwant and prunes from here, also messages + let + isPruneNotEmpty = respControl.prune.len > 0 + isIWantNotEmpty = respControl.iwant.len > 0 + + if isPruneNotEmpty or isIWantNotEmpty: + + if isIWantNotEmpty: + libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64) + + if isPruneNotEmpty: + for prune in respControl.prune: + if g.knownTopics.contains(prune.topicId): + libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicId]) + else: + libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"]) + + trace "sending control message", msg = shortLog(respControl), peer + g.send( + peer, + RPCMsg(control: some(respControl)), true) + if messages.len > 0: for smsg in messages: for topic in smsg.topicIds: if g.knownTopics.contains(topic): @@ -292,18 +311,11 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = else: libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"]) - libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64) - - for prune in respControl.prune: - if g.knownTopics.contains(prune.topicId): - libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicId]) - else: - libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"]) - - trace "sending control message", msg = shortLog(respControl), peer + # iwant replies have lower priority + trace "sending iwant reply messages", peer g.send( peer, - RPCMsg(control: some(respControl), messages: messages)) + RPCMsg(messages: messages), false) proc validateAndRelay(g: GossipSub, msg: Message, @@ -370,7 +382,7 @@ proc validateAndRelay(g: GossipSub, # In theory, if topics are the same in all messages, we could batch - we'd # also have to be careful to only include validated messages - g.broadcast(toSendPeers, RPCMsg(messages: @[msg])) + g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), false) trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer for topic in msg.topicIds: if topic notin g.topics: continue @@ -441,7 +453,7 @@ method rpcHandler*(g: GossipSub, peer.recvObservers(rpcMsg) if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0: - g.send(peer, RPCMsg(pong: rpcMsg.ping)) + g.send(peer, RPCMsg(pong: rpcMsg.ping), true) peer.pingBudget.dec for i in 0.. 0: + peer.shortAgent + else: + "unknown" + else: + "unknown" + func hash*(p: PubSubPeer): Hash = p.peerId.hash @@ -227,17 +259,13 @@ template sendMetrics(msg: RPCMsg): untyped = # metrics libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t]) -proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.async.} = - doAssert(not isNil(p), "pubsubpeer nil!") - - if msg.len <= 0: - debug "empty message, skipping", p, msg = shortLog(msg) - return - - if msg.len > p.maxMessageSize: - info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len - return +proc clearSendPriorityQueue(p: PubSubPeer) = + while p.rpcmessagequeue.sendPriorityQueue.len > 0 and p.rpcmessagequeue.sendPriorityQueue[0].finished: + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId]) + discard p.rpcmessagequeue.sendPriorityQueue.popFirst() +proc sendMsg(p: PubSubPeer, msg: seq[byte]) {.async.} = if p.sendConn == nil: # Wait for a send conn to be setup. `connectOnce` will # complete this even if the sendConn setup failed @@ -262,6 +290,30 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.async.} = await conn.close() # This will clean up the send connection +proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) {.async.} = + doAssert(not isNil(p), "pubsubpeer nil!") + + if msg.len <= 0: + debug "empty message, skipping", p, msg = shortLog(msg) + return + + if msg.len > p.maxMessageSize: + info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len + return + + if isHighPriority: + p.clearSendPriorityQueue() + let f = p.sendMsg(msg) + if not f.finished: + p.rpcmessagequeue.sendPriorityQueue.addLast(f) + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId]) + else: + await p.rpcmessagequeue.nonPriorityQueue.addLast(QueuedMessage(msg: msg, addedAt: Moment.now())) + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId]) + trace "message queued", p, msg = shortLog(msg) + iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] = ## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances. ## Each new `RPCMsg` accumulates Messages until reaching the specified `maxSize`. If a single Message @@ -297,7 +349,7 @@ iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: else: trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg) -proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} = +proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool = false) {.async.} = # When sending messages, we take care to re-encode them with the right # anonymization flag to ensure that we're not penalized for sending invalid # or malicious data on the wire - in particular, re-encoding protects against @@ -317,11 +369,11 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} = if encoded.len > p.maxMessageSize and msg.messages.len > 1: for encodedSplitMsg in splitRPCMsg(p, msg, p.maxMessageSize, anonymize): - asyncSpawn p.sendEncoded(encodedSplitMsg) + await p.sendEncoded(encodedSplitMsg, isHighPriority) else: # If the message size is within limits, send it as is trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg) - asyncSpawn p.sendEncoded(encoded) + await p.sendEncoded(encoded, isHighPriority) proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool = for sentIHave in p.sentIHaves.mitems(): @@ -330,6 +382,49 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool = return true return false +proc sendNonPriorityTask(p: PubSubPeer) {.async.} = + while true: + # we send non-priority messages only if there are no pending priority messages + let queuedMsg = await p.rpcmessagequeue.nonPriorityQueue.popFirst() + while p.rpcmessagequeue.sendPriorityQueue.len > 0: + p.clearSendPriorityQueue() + # this minimizes the number of times we have to wait for something (each wait = performance cost) + # we will never wait for a finished future and by waiting for the last one, all that come before it are guaranteed + # to be finished already (since sends are processed in order). + if p.rpcmessagequeue.sendPriorityQueue.len > 0: + await p.rpcmessagequeue.sendPriorityQueue[^1] + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId]) + p.rpcmessagequeue.maxDurationInNonPriorityQueue.withValue(maxDurationInNonPriorityQueue): + if Moment.now() - queuedMsg.addedAt >= maxDurationInNonPriorityQueue: + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_non_priority_msgs_dropped.inc(labelValues = [$p.peerId]) + continue + await p.sendMsg(queuedMsg.msg) + +proc startSendNonPriorityTask(p: PubSubPeer) = + debug "starting sendNonPriorityTask", p + if p.rpcmessagequeue.sendNonPriorityTask.isNil: + p.rpcmessagequeue.sendNonPriorityTask = p.sendNonPriorityTask() + +proc stopSendNonPriorityTask*(p: PubSubPeer) = + if not p.rpcmessagequeue.sendNonPriorityTask.isNil: + debug "stopping sendNonPriorityTask", p + p.rpcmessagequeue.sendNonPriorityTask.cancel() + p.rpcmessagequeue.sendNonPriorityTask = nil + p.rpcmessagequeue.sendPriorityQueue.clear() + p.rpcmessagequeue.nonPriorityQueue.clear() + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0) + libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0) + +proc new(T: typedesc[RpcMessageQueue], maxDurationInNonPriorityQueue = Opt.none(Duration)): T = + return T( + sendPriorityQueue: initDeque[Future[void]](), + nonPriorityQueue: newAsyncQueue[QueuedMessage](), + maxDurationInNonPriorityQueue: maxDurationInNonPriorityQueue, + ) + proc new*( T: typedesc[PubSubPeer], peerId: PeerId, @@ -337,7 +432,8 @@ proc new*( onEvent: OnEvent, codec: string, maxMessageSize: int, - overheadRateLimitOpt: Opt[TokenBucket] = Opt.none(TokenBucket)): T = + overheadRateLimitOpt: Opt[TokenBucket] = Opt.none(TokenBucket), + maxDurationInNonPriorityQueue = Opt.none(Duration)): T = result = T( getConn: getConn, @@ -346,17 +442,9 @@ proc new*( peerId: peerId, connectedFut: newFuture[void](), maxMessageSize: maxMessageSize, - overheadRateLimitOpt: overheadRateLimitOpt + overheadRateLimitOpt: overheadRateLimitOpt, + rpcmessagequeue: RpcMessageQueue.new(maxDurationInNonPriorityQueue), ) result.sentIHaves.addFirst(default(HashSet[MessageId])) result.heDontWants.addFirst(default(HashSet[MessageId])) - -proc getAgent*(peer: PubSubPeer): string = - return - when defined(libp2p_agents_metrics): - if peer.shortAgent.len > 0: - peer.shortAgent - else: - "unknown" - else: - "unknown" + result.startSendNonPriorityTask() diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index 51ae65ff9a..be9a8836fe 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -9,7 +9,7 @@ {.used.} -import std/[options, deques, sequtils, enumerate, algorithm] +import std/[options, deques, sequtils, enumerate, algorithm, os] import stew/byteutils import ../../libp2p/builders import ../../libp2p/errors @@ -718,15 +718,19 @@ suite "GossipSub internal": await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop() - proc setupTest(): Future[tuple[gossip0: GossipSub, gossip1: GossipSub, receivedMessages: ref HashSet[seq[byte]]]] {.async.} = + proc setupTest(maxDurationInNonPriorityQueue1: Opt[Duration] = Opt.none(Duration)): + Future[tuple[gossip0: GossipSub, gossip1: GossipSub, receivedMessages: ref HashSet[seq[byte]]]] {.async.} = let nodes = generateNodes(2, gossip = true, verifySignature = false) discard await allFinished( nodes[0].switch.start(), nodes[1].switch.start() ) + var gossip0: GossipSub = GossipSub(nodes[0]) + var gossip1: GossipSub = GossipSub(nodes[1]) - await nodes[1].switch.connect(nodes[0].switch.peerInfo.peerId, nodes[0].switch.peerInfo.addrs) + gossip1.parameters.maxDurationInNonPriorityQueue = maxDurationInNonPriorityQueue1 + await gossip1.switch.connect(gossip0.switch.peerInfo.peerId, gossip0.switch.peerInfo.addrs) var receivedMessages = new(HashSet[seq[byte]]) @@ -736,12 +740,10 @@ suite "GossipSub internal": proc handlerB(topic: string, data: seq[byte]) {.async.} = discard - nodes[0].subscribe("foobar", handlerA) - nodes[1].subscribe("foobar", handlerB) + gossip0.subscribe("foobar", handlerA) + gossip1.subscribe("foobar", handlerB) await waitSubGraph(nodes, "foobar") - var gossip0: GossipSub = GossipSub(nodes[0]) - var gossip1: GossipSub = GossipSub(nodes[1]) return (gossip0, gossip1, receivedMessages) @@ -844,3 +846,18 @@ suite "GossipSub internal": check receivedMessages[].len == 1 await teardownTest(gossip0, gossip1) + + asyncTest "e2e - drop msg if it is in the non-priority queue for too long": + # This test checks if two messages, both below the maxSize, are correctly processed and sent. + # Expected: Both messages should be received. + let maxDurationInNonPriorityQueueGossip1 = 100.millis + let (gossip0, gossip1, receivedMessages) = await setupTest(Opt.some(maxDurationInNonPriorityQueueGossip1)) + + let topic = "foobar" + gossip1.broadcast(gossip1.mesh[topic], RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](35))]), false) + sleep(100) # pause all tasks to ensure that the message stay in the non-priority queue longer than maxDurationInNonPriorityQueueGossip1 + gossip1.broadcast(gossip1.mesh[topic], RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](36))]), false) + await sleepAsync(100.milliseconds) # wait for the messages to be processed + check: receivedMessages[].len == 1 # only the second message should be received + + await teardownTest(gossip0, gossip1) \ No newline at end of file