From cbe70da1551bbbee2d99b6cc4b2545b34faccd50 Mon Sep 17 00:00:00 2001 From: Diego Date: Mon, 15 Jan 2024 18:40:14 +0100 Subject: [PATCH 01/17] make relayed messages non priority (don't use an explicit queue for priority msgs) --- libp2p/errors.nim | 4 +- libp2p/protocols/pubsub/gossipsub.nim | 13 ++- libp2p/protocols/pubsub/pubsub.nim | 11 +-- libp2p/protocols/pubsub/pubsubpeer.nim | 113 +++++++++++++++++++------ tests/pubsub/testgossipsub.nim | 12 +-- 5 files changed, 110 insertions(+), 43 deletions(-) diff --git a/libp2p/errors.nim b/libp2p/errors.nim index e7da7094da..4f52ce1c1e 100644 --- a/libp2p/errors.nim +++ b/libp2p/errors.nim @@ -45,8 +45,8 @@ macro checkFutures*[F](futs: seq[F], exclude: untyped = []): untyped = debug "A future has failed, enable trace logging for details", error=exc.name trace "Exception details", msg=exc.msg -proc allFuturesThrowing*[T](args: varargs[Future[T]]): Future[void] = - var futs: seq[Future[T]] +proc allFuturesThrowing*[F: FutureBase](args: varargs[F]): Future[void] = + var futs: seq[F] for fut in args: futs &= fut proc call() {.async.} = diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 5740f5d8d0..2d8edd1097 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -220,6 +220,8 @@ method unsubscribePeer*(g: GossipSub, peer: PeerId) = for topic, info in stats[].topicInfos.mpairs: info.firstMessageDeliveries = 0 + pubSubPeer.stopSendNonPriorityTask() + procCall FloodSub(g).unsubscribePeer(peer) proc handleSubscribe*(g: GossipSub, @@ -303,7 +305,10 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = trace "sending control message", msg = shortLog(respControl), peer g.send( peer, - RPCMsg(control: some(respControl), messages: messages)) + RPCMsg(control: some(respControl)), true) + g.send( + peer, + RPCMsg(messages: messages), false) proc validateAndRelay(g: GossipSub, msg: Message, @@ -370,7 +375,7 @@ proc validateAndRelay(g: GossipSub, # In theory, if topics are the same in all messages, we could batch - we'd # also have to be careful to only include validated messages - g.broadcast(toSendPeers, RPCMsg(messages: @[msg])) + g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), false) trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer for topic in msg.topicIds: if topic notin g.topics: continue @@ -441,7 +446,7 @@ method rpcHandler*(g: GossipSub, peer.recvObservers(rpcMsg) if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0: - g.send(peer, RPCMsg(pong: rpcMsg.ping)) + g.send(peer, RPCMsg(pong: rpcMsg.ping), true) peer.pingBudget.dec for i in 0.. 0: + peer.shortAgent + else: + "unknown" + else: + "unknown" + func hash*(p: PubSubPeer): Hash = p.peerId.hash @@ -227,17 +248,7 @@ template sendMetrics(msg: RPCMsg): untyped = # metrics libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t]) -proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.async.} = - doAssert(not isNil(p), "pubsubpeer nil!") - - if msg.len <= 0: - debug "empty message, skipping", p, msg = shortLog(msg) - return - - if msg.len > p.maxMessageSize: - info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len - return - +proc sendMsg(p: PubSubPeer, msg: seq[byte]) {.async.} = if p.sendConn == nil: # Wait for a send conn to be setup. `connectOnce` will # complete this even if the sendConn setup failed @@ -262,6 +273,27 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.async.} = await conn.close() # This will clean up the send connection +proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) {.async.} = + doAssert(not isNil(p), "pubsubpeer nil!") + + if msg.len <= 0: + debug "empty message, skipping", p, msg = shortLog(msg) + return + + if msg.len > p.maxMessageSize: + info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len + return + + if isHighPriority: + p.rpcmessagequeue.sendPriorityQueue.addLast(p.sendMsg(msg)) + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId]) + else: + await p.rpcmessagequeue.nonPriorityQueue.addLast(msg) + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId]) + trace "message queued", p, msg = shortLog(msg) + iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] = ## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances. ## Each new `RPCMsg` accumulates Messages until reaching the specified `maxSize`. If a single Message @@ -297,7 +329,7 @@ iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: else: trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg) -proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} = +proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool = false) {.async.} = # When sending messages, we take care to re-encode them with the right # anonymization flag to ensure that we're not penalized for sending invalid # or malicious data on the wire - in particular, re-encoding protects against @@ -317,11 +349,11 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} = if encoded.len > p.maxMessageSize and msg.messages.len > 1: for encodedSplitMsg in splitRPCMsg(p, msg, p.maxMessageSize, anonymize): - asyncSpawn p.sendEncoded(encodedSplitMsg) + await p.sendEncoded(encodedSplitMsg, isHighPriority) else: # If the message size is within limits, send it as is trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg) - asyncSpawn p.sendEncoded(encoded) + await p.sendEncoded(encoded, isHighPriority) proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool = for sentIHave in p.sentIHaves.mitems(): @@ -330,6 +362,43 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool = return true return false +proc clearSendPriorityQueue(p: PubSubPeer) = + while p.rpcmessagequeue.sendPriorityQueue.len > 0 and p.rpcmessagequeue.sendPriorityQueue[0].finished: + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId]) + discard p.rpcmessagequeue.sendPriorityQueue.popFirst() + +proc sendNonPriorityTask(p: PubSubPeer) {.async.} = + while true: + let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst() + while p.rpcmessagequeue.sendPriorityQueue.len > 0: + await p.rpcmessagequeue.sendPriorityQueue[0] + p.clearSendPriorityQueue() + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId]) + await p.sendMsg(msg) + +proc startSendNonPriorityTask(p: PubSubPeer) = + debug "starting sendNonPriorityTask", p + if p.rpcmessagequeue.sendNonPriorityTask.isNil: + p.rpcmessagequeue.sendNonPriorityTask = p.sendNonPriorityTask() + +proc stopSendNonPriorityTask*(p: PubSubPeer) = + if not p.rpcmessagequeue.sendNonPriorityTask.isNil: + debug "stopping sendNonPriorityTask", p + p.rpcmessagequeue.sendNonPriorityTask.cancel() + p.rpcmessagequeue.sendNonPriorityTask = nil + p.rpcmessagequeue.sendPriorityQueue.clear() + p.rpcmessagequeue.nonPriorityQueue.clear() + libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0) + libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0) + +proc new(T: typedesc[RpcMessageQueue]): T = + return T( + sendPriorityQueue: initDeque[Future[void]](), + nonPriorityQueue: newAsyncQueue[seq[byte]](), + ) + proc new*( T: typedesc[PubSubPeer], peerId: PeerId, @@ -346,17 +415,9 @@ proc new*( peerId: peerId, connectedFut: newFuture[void](), maxMessageSize: maxMessageSize, - overheadRateLimitOpt: overheadRateLimitOpt + overheadRateLimitOpt: overheadRateLimitOpt, + rpcmessagequeue: RpcMessageQueue.new(), ) result.sentIHaves.addFirst(default(HashSet[MessageId])) result.heDontWants.addFirst(default(HashSet[MessageId])) - -proc getAgent*(peer: PubSubPeer): string = - return - when defined(libp2p_agents_metrics): - if peer.shortAgent.len > 0: - peer.shortAgent - else: - "unknown" - else: - "unknown" + result.startSendNonPriorityTask() diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 1081fe2552..403f85f246 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -509,12 +509,12 @@ suite "GossipSub": var gossip1: GossipSub = GossipSub(nodes[0]) var gossip2: GossipSub = GossipSub(nodes[1]) - check: - "foobar" in gossip1.gossipsub - "foobar" in gossip2.gossipsub - gossip1.mesh.hasPeerId("foobar", gossip2.peerInfo.peerId) - not gossip1.fanout.hasPeerId("foobar", gossip2.peerInfo.peerId) - gossip2.mesh.hasPeerId("foobar", gossip1.peerInfo.peerId) + checkExpiring: + "foobar" in gossip1.gossipsub and + "foobar" in gossip2.gossipsub and + gossip1.mesh.hasPeerId("foobar", gossip2.peerInfo.peerId) and + not gossip1.fanout.hasPeerId("foobar", gossip2.peerInfo.peerId) and + gossip2.mesh.hasPeerId("foobar", gossip1.peerInfo.peerId) and not gossip2.fanout.hasPeerId("foobar", gossip1.peerInfo.peerId) await allFuturesThrowing( From 362c94bf34329c1726501e71967185f9ff00b2a1 Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 31 Jan 2024 12:11:19 +0100 Subject: [PATCH 02/17] use when defined(libp2p_expensive_metrics) --- libp2p/protocols/pubsub/pubsubpeer.nim | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 06717e59d5..2cde3ce308 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -32,8 +32,8 @@ when defined(libp2p_expensive_metrics): declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"]) declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"]) -declareGauge(libp2p_gossipsub_priority_queue_size, "the number of messages in the priority queue", labels = ["id"]) -declareGauge(libp2p_gossipsub_non_priority_queue_size, "the number of messages in the non-priority queue", labels = ["id"]) + declareGauge(libp2p_gossipsub_priority_queue_size, "the number of messages in the priority queue", labels = ["id"]) + declareGauge(libp2p_gossipsub_non_priority_queue_size, "the number of messages in the non-priority queue", labels = ["id"]) type PeerRateLimitError* = object of CatchableError @@ -390,8 +390,9 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) = p.rpcmessagequeue.sendNonPriorityTask = nil p.rpcmessagequeue.sendPriorityQueue.clear() p.rpcmessagequeue.nonPriorityQueue.clear() - libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0) - libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0) + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0) + libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0) proc new(T: typedesc[RpcMessageQueue]): T = return T( From 4158849521385e2efc4624fc350cf43fc716b86c Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 31 Jan 2024 12:13:54 +0100 Subject: [PATCH 03/17] clean sendPriorityQueue even if there is no non-priority msg --- libp2p/protocols/pubsub/pubsubpeer.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 2cde3ce308..ab3d9070e8 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -370,10 +370,10 @@ proc clearSendPriorityQueue(p: PubSubPeer) = proc sendNonPriorityTask(p: PubSubPeer) {.async.} = while true: - let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst() while p.rpcmessagequeue.sendPriorityQueue.len > 0: await p.rpcmessagequeue.sendPriorityQueue[0] p.clearSendPriorityQueue() + let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst() when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId]) await p.sendMsg(msg) From 0317d589cefe72ae7bb136409a700783122986ae Mon Sep 17 00:00:00 2001 From: Diego Date: Tue, 6 Feb 2024 14:18:00 +0100 Subject: [PATCH 04/17] remove unnecessary change --- libp2p/errors.nim | 4 ++-- libp2p/protocols/pubsub/pubsubpeer.nim | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/libp2p/errors.nim b/libp2p/errors.nim index 4f52ce1c1e..e7da7094da 100644 --- a/libp2p/errors.nim +++ b/libp2p/errors.nim @@ -45,8 +45,8 @@ macro checkFutures*[F](futs: seq[F], exclude: untyped = []): untyped = debug "A future has failed, enable trace logging for details", error=exc.name trace "Exception details", msg=exc.msg -proc allFuturesThrowing*[F: FutureBase](args: varargs[F]): Future[void] = - var futs: seq[F] +proc allFuturesThrowing*[T](args: varargs[Future[T]]): Future[void] = + var futs: seq[Future[T]] for fut in args: futs &= fut proc call() {.async.} = diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index ab3d9070e8..a0dcf925c9 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -19,8 +19,7 @@ import rpc/[messages, message, protobuf], ../../stream/connection, ../../crypto/crypto, ../../protobuf/minprotobuf, - ../../utility, - ../../utils/future + ../../utility export peerid, connection, deques From 07cab432ba912d99d7d48075e62eaf08a918ff42 Mon Sep 17 00:00:00 2001 From: Diego Date: Tue, 6 Feb 2024 15:02:40 +0100 Subject: [PATCH 05/17] add comments --- libp2p/protocols/pubsub/gossipsub.nim | 1 + libp2p/protocols/pubsub/pubsubpeer.nim | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 2d8edd1097..735534f14c 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -306,6 +306,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = g.send( peer, RPCMsg(control: some(respControl)), true) + # iwant replies have lower priority g.send( peer, RPCMsg(messages: messages), false) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index a0dcf925c9..6c15a5286d 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -53,8 +53,11 @@ type OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].} RpcMessageQueue* = ref object + # Tracks async tasks for sending high-priority peer-published messages. sendPriorityQueue: Deque[Future[void]] + # Queue for lower-priority messages, like "IWANT" replies and relay messages. nonPriorityQueue: AsyncQueue[seq[byte]] + # Task for processing non-priority message queue. sendNonPriorityTask: Future[void] PubSubPeer* = ref object of RootObj @@ -372,6 +375,7 @@ proc sendNonPriorityTask(p: PubSubPeer) {.async.} = while p.rpcmessagequeue.sendPriorityQueue.len > 0: await p.rpcmessagequeue.sendPriorityQueue[0] p.clearSendPriorityQueue() + # we send non-priority messages only if there are no pending priority messages let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst() when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId]) From 14d1787de8eb1545b1668aa2be71b0f9341efdfd Mon Sep 17 00:00:00 2001 From: Diego Date: Fri, 9 Feb 2024 12:59:53 +0100 Subject: [PATCH 06/17] fix test --- tests/pubsub/testgossipsub.nim | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 403f85f246..1081fe2552 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -509,12 +509,12 @@ suite "GossipSub": var gossip1: GossipSub = GossipSub(nodes[0]) var gossip2: GossipSub = GossipSub(nodes[1]) - checkExpiring: - "foobar" in gossip1.gossipsub and - "foobar" in gossip2.gossipsub and - gossip1.mesh.hasPeerId("foobar", gossip2.peerInfo.peerId) and - not gossip1.fanout.hasPeerId("foobar", gossip2.peerInfo.peerId) and - gossip2.mesh.hasPeerId("foobar", gossip1.peerInfo.peerId) and + check: + "foobar" in gossip1.gossipsub + "foobar" in gossip2.gossipsub + gossip1.mesh.hasPeerId("foobar", gossip2.peerInfo.peerId) + not gossip1.fanout.hasPeerId("foobar", gossip2.peerInfo.peerId) + gossip2.mesh.hasPeerId("foobar", gossip1.peerInfo.peerId) not gossip2.fanout.hasPeerId("foobar", gossip1.peerInfo.peerId) await allFuturesThrowing( From a1f3940c06a69b193e8583ad78722dca9396fcdc Mon Sep 17 00:00:00 2001 From: Diego Date: Fri, 9 Feb 2024 14:25:18 +0100 Subject: [PATCH 07/17] handle empty msgs correctly --- libp2p/protocols/pubsub/gossipsub.nim | 39 +++++++++++++++------------ 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 735534f14c..0b9194f420 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -281,32 +281,37 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = respControl.prune.add(g.handleGraft(peer, control.graft)) let messages = g.handleIWant(peer, control.iwant) - if - respControl.prune.len > 0 or - respControl.iwant.len > 0 or - messages.len > 0: - # iwant and prunes from here, also messages + let + isPruneNotEmpty = respControl.prune.len > 0 + isIWantNotEmpty = respControl.iwant.len > 0 - for smsg in messages: - for topic in smsg.topicIds: - if g.knownTopics.contains(topic): - libp2p_pubsub_broadcast_messages.inc(labelValues = [topic]) - else: - libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"]) + if isPruneNotEmpty or isIWantNotEmpty: - libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64) + if isIWantNotEmpty: + libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64) - for prune in respControl.prune: - if g.knownTopics.contains(prune.topicId): - libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicId]) - else: - libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"]) + if isPruneNotEmpty: + for prune in respControl.prune: + if g.knownTopics.contains(prune.topicId): + libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicId]) + else: + libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"]) trace "sending control message", msg = shortLog(respControl), peer g.send( peer, RPCMsg(control: some(respControl)), true) + + if messages.len > 0: + for smsg in messages: + for topic in smsg.topicIds: + if g.knownTopics.contains(topic): + libp2p_pubsub_broadcast_messages.inc(labelValues = [topic]) + else: + libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"]) + # iwant replies have lower priority + trace "sending iwant reply messages", peer g.send( peer, RPCMsg(messages: messages), false) From 20a8e57262d7fcc17582c8f42e87c8249585c799 Mon Sep 17 00:00:00 2001 From: Diego Date: Fri, 9 Feb 2024 14:42:06 +0100 Subject: [PATCH 08/17] wait for non-priority msg first before checking the priority queue --- libp2p/protocols/pubsub/pubsubpeer.nim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 6c15a5286d..23fc9fff11 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -372,11 +372,11 @@ proc clearSendPriorityQueue(p: PubSubPeer) = proc sendNonPriorityTask(p: PubSubPeer) {.async.} = while true: + # we send non-priority messages only if there are no pending priority messages + let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst() while p.rpcmessagequeue.sendPriorityQueue.len > 0: await p.rpcmessagequeue.sendPriorityQueue[0] p.clearSendPriorityQueue() - # we send non-priority messages only if there are no pending priority messages - let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst() when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId]) await p.sendMsg(msg) From 451637a644c45d26f279ba3573177a3558686ab4 Mon Sep 17 00:00:00 2001 From: Diego Date: Fri, 9 Feb 2024 16:11:43 +0100 Subject: [PATCH 09/17] await the last item --- libp2p/protocols/pubsub/pubsubpeer.nim | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 23fc9fff11..ddc97d43cf 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -375,8 +375,12 @@ proc sendNonPriorityTask(p: PubSubPeer) {.async.} = # we send non-priority messages only if there are no pending priority messages let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst() while p.rpcmessagequeue.sendPriorityQueue.len > 0: - await p.rpcmessagequeue.sendPriorityQueue[0] p.clearSendPriorityQueue() + # this minimizes the number of times we have to wait for something (each wait = performance cost) + # we will never wait for a finished future and by waiting for the last one, all that come before it are guaranteed + # to be finished already (since sends are processed in order). + if p.rpcmessagequeue.sendPriorityQueue.len > 0: + await p.rpcmessagequeue.sendPriorityQueue[^1] when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId]) await p.sendMsg(msg) From df0b98bfdde25d297650517a40519436db7c22b8 Mon Sep 17 00:00:00 2001 From: Diego Date: Fri, 9 Feb 2024 16:14:00 +0100 Subject: [PATCH 10/17] dont add finished future --- libp2p/protocols/pubsub/pubsubpeer.nim | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index ddc97d43cf..bb4d4f6c91 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -287,9 +287,11 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) { return if isHighPriority: - p.rpcmessagequeue.sendPriorityQueue.addLast(p.sendMsg(msg)) - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId]) + let f = p.sendMsg(msg) + if not f.finished: + p.rpcmessagequeue.sendPriorityQueue.addLast(f) + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId]) else: await p.rpcmessagequeue.nonPriorityQueue.addLast(msg) when defined(libp2p_expensive_metrics): From b16ec003276ddf250a18de54c00821111188c93d Mon Sep 17 00:00:00 2001 From: Diego Date: Fri, 9 Feb 2024 16:15:39 +0100 Subject: [PATCH 11/17] clear prio queue before adding to it --- libp2p/protocols/pubsub/pubsubpeer.nim | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index bb4d4f6c91..687243e6f9 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -250,6 +250,12 @@ template sendMetrics(msg: RPCMsg): untyped = # metrics libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t]) +proc clearSendPriorityQueue(p: PubSubPeer) = + while p.rpcmessagequeue.sendPriorityQueue.len > 0 and p.rpcmessagequeue.sendPriorityQueue[0].finished: + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId]) + discard p.rpcmessagequeue.sendPriorityQueue.popFirst() + proc sendMsg(p: PubSubPeer, msg: seq[byte]) {.async.} = if p.sendConn == nil: # Wait for a send conn to be setup. `connectOnce` will @@ -287,6 +293,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) { return if isHighPriority: + p.clearSendPriorityQueue() let f = p.sendMsg(msg) if not f.finished: p.rpcmessagequeue.sendPriorityQueue.addLast(f) @@ -366,12 +373,6 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool = return true return false -proc clearSendPriorityQueue(p: PubSubPeer) = - while p.rpcmessagequeue.sendPriorityQueue.len > 0 and p.rpcmessagequeue.sendPriorityQueue[0].finished: - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId]) - discard p.rpcmessagequeue.sendPriorityQueue.popFirst() - proc sendNonPriorityTask(p: PubSubPeer) {.async.} = while true: # we send non-priority messages only if there are no pending priority messages From a10b8af737577d8d0f4f8c2c5159215c678383c7 Mon Sep 17 00:00:00 2001 From: Diego Date: Mon, 15 Jan 2024 18:40:14 +0100 Subject: [PATCH 12/17] drop old msgs to be relayed --- libp2p/protocols/pubsub/pubsubpeer.nim | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 687243e6f9..c2f23b2d96 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -52,6 +52,10 @@ type DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].} + Ttlmessage* = object + msg*: seq[byte] + ttl*: Moment + RpcMessageQueue* = ref object # Tracks async tasks for sending high-priority peer-published messages. sendPriorityQueue: Deque[Future[void]] @@ -59,6 +63,8 @@ type nonPriorityQueue: AsyncQueue[seq[byte]] # Task for processing non-priority message queue. sendNonPriorityTask: Future[void] + # The max duration a message to be relayed can wait to be sent before it is dropped. The default is 500ms. + maxDurationInNonPriorityQueue: Duration PubSubPeer* = ref object of RootObj getConn*: GetConn # callback to establish a new send connection @@ -300,7 +306,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) { when defined(libp2p_expensive_metrics): libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId]) else: - await p.rpcmessagequeue.nonPriorityQueue.addLast(msg) + await p.rpcmessagequeue.nonPriorityQueue.addLast(Ttlmessage(msg: msg, ttl: Moment.now())) when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId]) trace "message queued", p, msg = shortLog(msg) @@ -386,7 +392,9 @@ proc sendNonPriorityTask(p: PubSubPeer) {.async.} = await p.rpcmessagequeue.sendPriorityQueue[^1] when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId]) - await p.sendMsg(msg) + if Moment.now() - ttlMsg.ttl >= p.rpcmessagequeue.maxDurationInNonPriorityQueue: + continue + await p.sendMsg(ttlMsg.msg) proc startSendNonPriorityTask(p: PubSubPeer) = debug "starting sendNonPriorityTask", p @@ -404,10 +412,11 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) = libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0) libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0) -proc new(T: typedesc[RpcMessageQueue]): T = +proc new(T: typedesc[RpcMessageQueue], maxDurationInNonPriorityQueue = 500.milliseconds): T = return T( sendPriorityQueue: initDeque[Future[void]](), - nonPriorityQueue: newAsyncQueue[seq[byte]](), + nonPriorityQueue: newAsyncQueue[Ttlmessage](), + maxDurationInNonPriorityQueue: maxDurationInNonPriorityQueue, ) proc new*( From 0d0ef3519fa5fe0bd13ecd42b95f9a513d8311c9 Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 31 Jan 2024 23:59:05 +0100 Subject: [PATCH 13/17] add metric --- libp2p/protocols/pubsub/pubsubpeer.nim | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index c2f23b2d96..40996ca3e1 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -34,6 +34,9 @@ when defined(libp2p_expensive_metrics): declareGauge(libp2p_gossipsub_priority_queue_size, "the number of messages in the priority queue", labels = ["id"]) declareGauge(libp2p_gossipsub_non_priority_queue_size, "the number of messages in the non-priority queue", labels = ["id"]) + declareCounter(libp2p_gossipsub_non_priority_msgs_dropped, "the number of dropped messages in the non-priority queue", labels = ["id"]) + + type PeerRateLimitError* = object of CatchableError @@ -393,6 +396,8 @@ proc sendNonPriorityTask(p: PubSubPeer) {.async.} = when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId]) if Moment.now() - ttlMsg.ttl >= p.rpcmessagequeue.maxDurationInNonPriorityQueue: + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_non_priority_msgs_dropped.inc(labelValues = [$p.peerId]) continue await p.sendMsg(ttlMsg.msg) From f6775d284358200beea28f459e60bb201bd57bf7 Mon Sep 17 00:00:00 2001 From: Diego Date: Thu, 1 Feb 2024 00:14:06 +0100 Subject: [PATCH 14/17] increase maxDurationInNonPriorityQueue to 1s --- libp2p/protocols/pubsub/pubsubpeer.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 40996ca3e1..27fd43ed93 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -417,7 +417,7 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) = libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0) libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0) -proc new(T: typedesc[RpcMessageQueue], maxDurationInNonPriorityQueue = 500.milliseconds): T = +proc new(T: typedesc[RpcMessageQueue], maxDurationInNonPriorityQueue = 1.seconds): T = return T( sendPriorityQueue: initDeque[Future[void]](), nonPriorityQueue: newAsyncQueue[Ttlmessage](), From 5d9478b0ecebb3096b02111953ee869470733164 Mon Sep 17 00:00:00 2001 From: Diego Date: Thu, 1 Feb 2024 13:09:06 +0100 Subject: [PATCH 15/17] rename ttl property and message type --- libp2p/protocols/pubsub/pubsubpeer.nim | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 27fd43ed93..362b1984d2 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -55,15 +55,15 @@ type DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].} - Ttlmessage* = object + QueuedMessage* = object msg*: seq[byte] - ttl*: Moment + addedAt*: Moment RpcMessageQueue* = ref object # Tracks async tasks for sending high-priority peer-published messages. sendPriorityQueue: Deque[Future[void]] # Queue for lower-priority messages, like "IWANT" replies and relay messages. - nonPriorityQueue: AsyncQueue[seq[byte]] + nonPriorityQueue: AsyncQueue[QueuedMessage] # Task for processing non-priority message queue. sendNonPriorityTask: Future[void] # The max duration a message to be relayed can wait to be sent before it is dropped. The default is 500ms. @@ -309,7 +309,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) { when defined(libp2p_expensive_metrics): libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId]) else: - await p.rpcmessagequeue.nonPriorityQueue.addLast(Ttlmessage(msg: msg, ttl: Moment.now())) + await p.rpcmessagequeue.nonPriorityQueue.addLast(QueuedMessage(msg: msg, addedAt: Moment.now())) when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId]) trace "message queued", p, msg = shortLog(msg) @@ -395,7 +395,7 @@ proc sendNonPriorityTask(p: PubSubPeer) {.async.} = await p.rpcmessagequeue.sendPriorityQueue[^1] when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId]) - if Moment.now() - ttlMsg.ttl >= p.rpcmessagequeue.maxDurationInNonPriorityQueue: + if Moment.now() - ttlMsg.addedAt >= p.rpcmessagequeue.maxDurationInNonPriorityQueue: when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_msgs_dropped.inc(labelValues = [$p.peerId]) continue @@ -420,7 +420,7 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) = proc new(T: typedesc[RpcMessageQueue], maxDurationInNonPriorityQueue = 1.seconds): T = return T( sendPriorityQueue: initDeque[Future[void]](), - nonPriorityQueue: newAsyncQueue[Ttlmessage](), + nonPriorityQueue: newAsyncQueue[QueuedMessage](), maxDurationInNonPriorityQueue: maxDurationInNonPriorityQueue, ) From a04f8d2757367f1cbe3829d83c0c3ba9e5fb5293 Mon Sep 17 00:00:00 2001 From: Diego Date: Tue, 13 Feb 2024 13:14:43 +0100 Subject: [PATCH 16/17] make maxDurationInNonPriorityQueue configurable and none by default --- libp2p/protocols/pubsub/gossipsub.nim | 4 +++- libp2p/protocols/pubsub/gossipsub/types.nim | 4 ++++ libp2p/protocols/pubsub/pubsubpeer.nim | 24 +++++++++++---------- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 0b9194f420..cc9ade6ca9 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -83,7 +83,8 @@ proc init*(_: type[GossipSubParams]): GossipSubParams = enablePX: false, bandwidthEstimatebps: 100_000_000, # 100 Mbps or 12.5 MBps overheadRateLimit: Opt.none(tuple[bytes: int, interval: Duration]), - disconnectPeerAboveRateLimit: false + disconnectPeerAboveRateLimit: false, + maxDurationInNonPriorityQueue: Opt.none(Duration), ) proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] = @@ -751,4 +752,5 @@ method getOrCreatePeer*( let peer = procCall PubSub(g).getOrCreatePeer(peerId, protos) g.parameters.overheadRateLimit.withValue(overheadRateLimit): peer.overheadRateLimitOpt = Opt.some(TokenBucket.new(overheadRateLimit.bytes, overheadRateLimit.interval)) + peer.rpcmessagequeue.maxDurationInNonPriorityQueue = g.parameters.maxDurationInNonPriorityQueue return peer diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index 06fa55eb30..81db8f3793 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -147,6 +147,10 @@ type overheadRateLimit*: Opt[tuple[bytes: int, interval: Duration]] disconnectPeerAboveRateLimit*: bool + # The maximum duration a message can stay in the non-priority queue. If it exceeds this duration, it will be discarded + # as soon as it is dequeued, instead of being sent to the remote peer. The default value is none, i.e., no maximum duration. + maxDurationInNonPriorityQueue*: Opt[Duration] + BackoffTable* = Table[string, Table[PeerId, Moment]] ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]] diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 362b1984d2..4a11344d0b 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -67,7 +67,7 @@ type # Task for processing non-priority message queue. sendNonPriorityTask: Future[void] # The max duration a message to be relayed can wait to be sent before it is dropped. The default is 500ms. - maxDurationInNonPriorityQueue: Duration + maxDurationInNonPriorityQueue*: Opt[Duration] PubSubPeer* = ref object of RootObj getConn*: GetConn # callback to establish a new send connection @@ -90,7 +90,7 @@ type behaviourPenalty*: float64 # the eventual penalty score overheadRateLimitOpt*: Opt[TokenBucket] - rpcmessagequeue: RpcMessageQueue + rpcmessagequeue*: RpcMessageQueue RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void] {.gcsafe, raises: [].} @@ -385,7 +385,7 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool = proc sendNonPriorityTask(p: PubSubPeer) {.async.} = while true: # we send non-priority messages only if there are no pending priority messages - let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst() + let queuedMsg = await p.rpcmessagequeue.nonPriorityQueue.popFirst() while p.rpcmessagequeue.sendPriorityQueue.len > 0: p.clearSendPriorityQueue() # this minimizes the number of times we have to wait for something (each wait = performance cost) @@ -395,11 +395,12 @@ proc sendNonPriorityTask(p: PubSubPeer) {.async.} = await p.rpcmessagequeue.sendPriorityQueue[^1] when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId]) - if Moment.now() - ttlMsg.addedAt >= p.rpcmessagequeue.maxDurationInNonPriorityQueue: - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_non_priority_msgs_dropped.inc(labelValues = [$p.peerId]) - continue - await p.sendMsg(ttlMsg.msg) + p.rpcmessagequeue.maxDurationInNonPriorityQueue.withValue(maxDurationInNonPriorityQueue): + if Moment.now() - queuedMsg.addedAt >= maxDurationInNonPriorityQueue: + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_non_priority_msgs_dropped.inc(labelValues = [$p.peerId]) + continue + await p.sendMsg(queuedMsg.msg) proc startSendNonPriorityTask(p: PubSubPeer) = debug "starting sendNonPriorityTask", p @@ -417,7 +418,7 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) = libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0) libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0) -proc new(T: typedesc[RpcMessageQueue], maxDurationInNonPriorityQueue = 1.seconds): T = +proc new(T: typedesc[RpcMessageQueue], maxDurationInNonPriorityQueue = Opt.none(Duration)): T = return T( sendPriorityQueue: initDeque[Future[void]](), nonPriorityQueue: newAsyncQueue[QueuedMessage](), @@ -431,7 +432,8 @@ proc new*( onEvent: OnEvent, codec: string, maxMessageSize: int, - overheadRateLimitOpt: Opt[TokenBucket] = Opt.none(TokenBucket)): T = + overheadRateLimitOpt: Opt[TokenBucket] = Opt.none(TokenBucket), + maxDurationInNonPriorityQueue = Opt.none(Duration)): T = result = T( getConn: getConn, @@ -441,7 +443,7 @@ proc new*( connectedFut: newFuture[void](), maxMessageSize: maxMessageSize, overheadRateLimitOpt: overheadRateLimitOpt, - rpcmessagequeue: RpcMessageQueue.new(), + rpcmessagequeue: RpcMessageQueue.new(maxDurationInNonPriorityQueue), ) result.sentIHaves.addFirst(default(HashSet[MessageId])) result.heDontWants.addFirst(default(HashSet[MessageId])) From 6484d3fce4f47640832043a0fb934983d00fbbb3 Mon Sep 17 00:00:00 2001 From: Diego Date: Tue, 13 Feb 2024 18:46:55 +0100 Subject: [PATCH 17/17] add test --- tests/pubsub/testgossipinternal.nim | 31 ++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index 51ae65ff9a..be9a8836fe 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -9,7 +9,7 @@ {.used.} -import std/[options, deques, sequtils, enumerate, algorithm] +import std/[options, deques, sequtils, enumerate, algorithm, os] import stew/byteutils import ../../libp2p/builders import ../../libp2p/errors @@ -718,15 +718,19 @@ suite "GossipSub internal": await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop() - proc setupTest(): Future[tuple[gossip0: GossipSub, gossip1: GossipSub, receivedMessages: ref HashSet[seq[byte]]]] {.async.} = + proc setupTest(maxDurationInNonPriorityQueue1: Opt[Duration] = Opt.none(Duration)): + Future[tuple[gossip0: GossipSub, gossip1: GossipSub, receivedMessages: ref HashSet[seq[byte]]]] {.async.} = let nodes = generateNodes(2, gossip = true, verifySignature = false) discard await allFinished( nodes[0].switch.start(), nodes[1].switch.start() ) + var gossip0: GossipSub = GossipSub(nodes[0]) + var gossip1: GossipSub = GossipSub(nodes[1]) - await nodes[1].switch.connect(nodes[0].switch.peerInfo.peerId, nodes[0].switch.peerInfo.addrs) + gossip1.parameters.maxDurationInNonPriorityQueue = maxDurationInNonPriorityQueue1 + await gossip1.switch.connect(gossip0.switch.peerInfo.peerId, gossip0.switch.peerInfo.addrs) var receivedMessages = new(HashSet[seq[byte]]) @@ -736,12 +740,10 @@ suite "GossipSub internal": proc handlerB(topic: string, data: seq[byte]) {.async.} = discard - nodes[0].subscribe("foobar", handlerA) - nodes[1].subscribe("foobar", handlerB) + gossip0.subscribe("foobar", handlerA) + gossip1.subscribe("foobar", handlerB) await waitSubGraph(nodes, "foobar") - var gossip0: GossipSub = GossipSub(nodes[0]) - var gossip1: GossipSub = GossipSub(nodes[1]) return (gossip0, gossip1, receivedMessages) @@ -844,3 +846,18 @@ suite "GossipSub internal": check receivedMessages[].len == 1 await teardownTest(gossip0, gossip1) + + asyncTest "e2e - drop msg if it is in the non-priority queue for too long": + # This test checks if two messages, both below the maxSize, are correctly processed and sent. + # Expected: Both messages should be received. + let maxDurationInNonPriorityQueueGossip1 = 100.millis + let (gossip0, gossip1, receivedMessages) = await setupTest(Opt.some(maxDurationInNonPriorityQueueGossip1)) + + let topic = "foobar" + gossip1.broadcast(gossip1.mesh[topic], RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](35))]), false) + sleep(100) # pause all tasks to ensure that the message stay in the non-priority queue longer than maxDurationInNonPriorityQueueGossip1 + gossip1.broadcast(gossip1.mesh[topic], RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](36))]), false) + await sleepAsync(100.milliseconds) # wait for the messages to be processed + check: receivedMessages[].len == 1 # only the second message should be received + + await teardownTest(gossip0, gossip1) \ No newline at end of file