From cbe70da1551bbbee2d99b6cc4b2545b34faccd50 Mon Sep 17 00:00:00 2001 From: Diego Date: Mon, 15 Jan 2024 18:40:14 +0100 Subject: [PATCH 01/14] make relayed messages non priority (don't use an explicit queue for priority msgs) --- libp2p/errors.nim | 4 +- libp2p/protocols/pubsub/gossipsub.nim | 13 ++- libp2p/protocols/pubsub/pubsub.nim | 11 +-- libp2p/protocols/pubsub/pubsubpeer.nim | 113 +++++++++++++++++++------ tests/pubsub/testgossipsub.nim | 12 +-- 5 files changed, 110 insertions(+), 43 deletions(-) diff --git a/libp2p/errors.nim b/libp2p/errors.nim index e7da7094da..4f52ce1c1e 100644 --- a/libp2p/errors.nim +++ b/libp2p/errors.nim @@ -45,8 +45,8 @@ macro checkFutures*[F](futs: seq[F], exclude: untyped = []): untyped = debug "A future has failed, enable trace logging for details", error=exc.name trace "Exception details", msg=exc.msg -proc allFuturesThrowing*[T](args: varargs[Future[T]]): Future[void] = - var futs: seq[Future[T]] +proc allFuturesThrowing*[F: FutureBase](args: varargs[F]): Future[void] = + var futs: seq[F] for fut in args: futs &= fut proc call() {.async.} = diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 5740f5d8d0..2d8edd1097 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -220,6 +220,8 @@ method unsubscribePeer*(g: GossipSub, peer: PeerId) = for topic, info in stats[].topicInfos.mpairs: info.firstMessageDeliveries = 0 + pubSubPeer.stopSendNonPriorityTask() + procCall FloodSub(g).unsubscribePeer(peer) proc handleSubscribe*(g: GossipSub, @@ -303,7 +305,10 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = trace "sending control message", msg = shortLog(respControl), peer g.send( peer, - RPCMsg(control: some(respControl), messages: messages)) + RPCMsg(control: some(respControl)), true) + g.send( + peer, + RPCMsg(messages: messages), false) proc validateAndRelay(g: GossipSub, msg: Message, @@ -370,7 +375,7 @@ proc validateAndRelay(g: GossipSub, # In theory, if topics are the same in all messages, we could batch - we'd # also have to be careful to only include validated messages - g.broadcast(toSendPeers, RPCMsg(messages: @[msg])) + g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), false) trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer for topic in msg.topicIds: if topic notin g.topics: continue @@ -441,7 +446,7 @@ method rpcHandler*(g: GossipSub, peer.recvObservers(rpcMsg) if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0: - g.send(peer, RPCMsg(pong: rpcMsg.ping)) + g.send(peer, RPCMsg(pong: rpcMsg.ping), true) peer.pingBudget.dec for i in 0.. 0: + peer.shortAgent + else: + "unknown" + else: + "unknown" + func hash*(p: PubSubPeer): Hash = p.peerId.hash @@ -227,17 +248,7 @@ template sendMetrics(msg: RPCMsg): untyped = # metrics libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t]) -proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.async.} = - doAssert(not isNil(p), "pubsubpeer nil!") - - if msg.len <= 0: - debug "empty message, skipping", p, msg = shortLog(msg) - return - - if msg.len > p.maxMessageSize: - info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len - return - +proc sendMsg(p: PubSubPeer, msg: seq[byte]) {.async.} = if p.sendConn == nil: # Wait for a send conn to be setup. `connectOnce` will # complete this even if the sendConn setup failed @@ -262,6 +273,27 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.async.} = await conn.close() # This will clean up the send connection +proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) {.async.} = + doAssert(not isNil(p), "pubsubpeer nil!") + + if msg.len <= 0: + debug "empty message, skipping", p, msg = shortLog(msg) + return + + if msg.len > p.maxMessageSize: + info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len + return + + if isHighPriority: + p.rpcmessagequeue.sendPriorityQueue.addLast(p.sendMsg(msg)) + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId]) + else: + await p.rpcmessagequeue.nonPriorityQueue.addLast(msg) + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId]) + trace "message queued", p, msg = shortLog(msg) + iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] = ## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances. ## Each new `RPCMsg` accumulates Messages until reaching the specified `maxSize`. If a single Message @@ -297,7 +329,7 @@ iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: else: trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg) -proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} = +proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool = false) {.async.} = # When sending messages, we take care to re-encode them with the right # anonymization flag to ensure that we're not penalized for sending invalid # or malicious data on the wire - in particular, re-encoding protects against @@ -317,11 +349,11 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} = if encoded.len > p.maxMessageSize and msg.messages.len > 1: for encodedSplitMsg in splitRPCMsg(p, msg, p.maxMessageSize, anonymize): - asyncSpawn p.sendEncoded(encodedSplitMsg) + await p.sendEncoded(encodedSplitMsg, isHighPriority) else: # If the message size is within limits, send it as is trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg) - asyncSpawn p.sendEncoded(encoded) + await p.sendEncoded(encoded, isHighPriority) proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool = for sentIHave in p.sentIHaves.mitems(): @@ -330,6 +362,43 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool = return true return false +proc clearSendPriorityQueue(p: PubSubPeer) = + while p.rpcmessagequeue.sendPriorityQueue.len > 0 and p.rpcmessagequeue.sendPriorityQueue[0].finished: + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId]) + discard p.rpcmessagequeue.sendPriorityQueue.popFirst() + +proc sendNonPriorityTask(p: PubSubPeer) {.async.} = + while true: + let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst() + while p.rpcmessagequeue.sendPriorityQueue.len > 0: + await p.rpcmessagequeue.sendPriorityQueue[0] + p.clearSendPriorityQueue() + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId]) + await p.sendMsg(msg) + +proc startSendNonPriorityTask(p: PubSubPeer) = + debug "starting sendNonPriorityTask", p + if p.rpcmessagequeue.sendNonPriorityTask.isNil: + p.rpcmessagequeue.sendNonPriorityTask = p.sendNonPriorityTask() + +proc stopSendNonPriorityTask*(p: PubSubPeer) = + if not p.rpcmessagequeue.sendNonPriorityTask.isNil: + debug "stopping sendNonPriorityTask", p + p.rpcmessagequeue.sendNonPriorityTask.cancel() + p.rpcmessagequeue.sendNonPriorityTask = nil + p.rpcmessagequeue.sendPriorityQueue.clear() + p.rpcmessagequeue.nonPriorityQueue.clear() + libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0) + libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0) + +proc new(T: typedesc[RpcMessageQueue]): T = + return T( + sendPriorityQueue: initDeque[Future[void]](), + nonPriorityQueue: newAsyncQueue[seq[byte]](), + ) + proc new*( T: typedesc[PubSubPeer], peerId: PeerId, @@ -346,17 +415,9 @@ proc new*( peerId: peerId, connectedFut: newFuture[void](), maxMessageSize: maxMessageSize, - overheadRateLimitOpt: overheadRateLimitOpt + overheadRateLimitOpt: overheadRateLimitOpt, + rpcmessagequeue: RpcMessageQueue.new(), ) result.sentIHaves.addFirst(default(HashSet[MessageId])) result.heDontWants.addFirst(default(HashSet[MessageId])) - -proc getAgent*(peer: PubSubPeer): string = - return - when defined(libp2p_agents_metrics): - if peer.shortAgent.len > 0: - peer.shortAgent - else: - "unknown" - else: - "unknown" + result.startSendNonPriorityTask() diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 1081fe2552..403f85f246 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -509,12 +509,12 @@ suite "GossipSub": var gossip1: GossipSub = GossipSub(nodes[0]) var gossip2: GossipSub = GossipSub(nodes[1]) - check: - "foobar" in gossip1.gossipsub - "foobar" in gossip2.gossipsub - gossip1.mesh.hasPeerId("foobar", gossip2.peerInfo.peerId) - not gossip1.fanout.hasPeerId("foobar", gossip2.peerInfo.peerId) - gossip2.mesh.hasPeerId("foobar", gossip1.peerInfo.peerId) + checkExpiring: + "foobar" in gossip1.gossipsub and + "foobar" in gossip2.gossipsub and + gossip1.mesh.hasPeerId("foobar", gossip2.peerInfo.peerId) and + not gossip1.fanout.hasPeerId("foobar", gossip2.peerInfo.peerId) and + gossip2.mesh.hasPeerId("foobar", gossip1.peerInfo.peerId) and not gossip2.fanout.hasPeerId("foobar", gossip1.peerInfo.peerId) await allFuturesThrowing( From 362c94bf34329c1726501e71967185f9ff00b2a1 Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 31 Jan 2024 12:11:19 +0100 Subject: [PATCH 02/14] use when defined(libp2p_expensive_metrics) --- libp2p/protocols/pubsub/pubsubpeer.nim | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 06717e59d5..2cde3ce308 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -32,8 +32,8 @@ when defined(libp2p_expensive_metrics): declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"]) declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"]) -declareGauge(libp2p_gossipsub_priority_queue_size, "the number of messages in the priority queue", labels = ["id"]) -declareGauge(libp2p_gossipsub_non_priority_queue_size, "the number of messages in the non-priority queue", labels = ["id"]) + declareGauge(libp2p_gossipsub_priority_queue_size, "the number of messages in the priority queue", labels = ["id"]) + declareGauge(libp2p_gossipsub_non_priority_queue_size, "the number of messages in the non-priority queue", labels = ["id"]) type PeerRateLimitError* = object of CatchableError @@ -390,8 +390,9 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) = p.rpcmessagequeue.sendNonPriorityTask = nil p.rpcmessagequeue.sendPriorityQueue.clear() p.rpcmessagequeue.nonPriorityQueue.clear() - libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0) - libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0) + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0) + libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0) proc new(T: typedesc[RpcMessageQueue]): T = return T( From 4158849521385e2efc4624fc350cf43fc716b86c Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 31 Jan 2024 12:13:54 +0100 Subject: [PATCH 03/14] clean sendPriorityQueue even if there is no non-priority msg --- libp2p/protocols/pubsub/pubsubpeer.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 2cde3ce308..ab3d9070e8 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -370,10 +370,10 @@ proc clearSendPriorityQueue(p: PubSubPeer) = proc sendNonPriorityTask(p: PubSubPeer) {.async.} = while true: - let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst() while p.rpcmessagequeue.sendPriorityQueue.len > 0: await p.rpcmessagequeue.sendPriorityQueue[0] p.clearSendPriorityQueue() + let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst() when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId]) await p.sendMsg(msg) From 0317d589cefe72ae7bb136409a700783122986ae Mon Sep 17 00:00:00 2001 From: Diego Date: Tue, 6 Feb 2024 14:18:00 +0100 Subject: [PATCH 04/14] remove unnecessary change --- libp2p/errors.nim | 4 ++-- libp2p/protocols/pubsub/pubsubpeer.nim | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/libp2p/errors.nim b/libp2p/errors.nim index 4f52ce1c1e..e7da7094da 100644 --- a/libp2p/errors.nim +++ b/libp2p/errors.nim @@ -45,8 +45,8 @@ macro checkFutures*[F](futs: seq[F], exclude: untyped = []): untyped = debug "A future has failed, enable trace logging for details", error=exc.name trace "Exception details", msg=exc.msg -proc allFuturesThrowing*[F: FutureBase](args: varargs[F]): Future[void] = - var futs: seq[F] +proc allFuturesThrowing*[T](args: varargs[Future[T]]): Future[void] = + var futs: seq[Future[T]] for fut in args: futs &= fut proc call() {.async.} = diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index ab3d9070e8..a0dcf925c9 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -19,8 +19,7 @@ import rpc/[messages, message, protobuf], ../../stream/connection, ../../crypto/crypto, ../../protobuf/minprotobuf, - ../../utility, - ../../utils/future + ../../utility export peerid, connection, deques From 07cab432ba912d99d7d48075e62eaf08a918ff42 Mon Sep 17 00:00:00 2001 From: Diego Date: Tue, 6 Feb 2024 15:02:40 +0100 Subject: [PATCH 05/14] add comments --- libp2p/protocols/pubsub/gossipsub.nim | 1 + libp2p/protocols/pubsub/pubsubpeer.nim | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 2d8edd1097..735534f14c 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -306,6 +306,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = g.send( peer, RPCMsg(control: some(respControl)), true) + # iwant replies have lower priority g.send( peer, RPCMsg(messages: messages), false) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index a0dcf925c9..6c15a5286d 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -53,8 +53,11 @@ type OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].} RpcMessageQueue* = ref object + # Tracks async tasks for sending high-priority peer-published messages. sendPriorityQueue: Deque[Future[void]] + # Queue for lower-priority messages, like "IWANT" replies and relay messages. nonPriorityQueue: AsyncQueue[seq[byte]] + # Task for processing non-priority message queue. sendNonPriorityTask: Future[void] PubSubPeer* = ref object of RootObj @@ -372,6 +375,7 @@ proc sendNonPriorityTask(p: PubSubPeer) {.async.} = while p.rpcmessagequeue.sendPriorityQueue.len > 0: await p.rpcmessagequeue.sendPriorityQueue[0] p.clearSendPriorityQueue() + # we send non-priority messages only if there are no pending priority messages let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst() when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId]) From 14d1787de8eb1545b1668aa2be71b0f9341efdfd Mon Sep 17 00:00:00 2001 From: Diego Date: Fri, 9 Feb 2024 12:59:53 +0100 Subject: [PATCH 06/14] fix test --- tests/pubsub/testgossipsub.nim | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 403f85f246..1081fe2552 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -509,12 +509,12 @@ suite "GossipSub": var gossip1: GossipSub = GossipSub(nodes[0]) var gossip2: GossipSub = GossipSub(nodes[1]) - checkExpiring: - "foobar" in gossip1.gossipsub and - "foobar" in gossip2.gossipsub and - gossip1.mesh.hasPeerId("foobar", gossip2.peerInfo.peerId) and - not gossip1.fanout.hasPeerId("foobar", gossip2.peerInfo.peerId) and - gossip2.mesh.hasPeerId("foobar", gossip1.peerInfo.peerId) and + check: + "foobar" in gossip1.gossipsub + "foobar" in gossip2.gossipsub + gossip1.mesh.hasPeerId("foobar", gossip2.peerInfo.peerId) + not gossip1.fanout.hasPeerId("foobar", gossip2.peerInfo.peerId) + gossip2.mesh.hasPeerId("foobar", gossip1.peerInfo.peerId) not gossip2.fanout.hasPeerId("foobar", gossip1.peerInfo.peerId) await allFuturesThrowing( From a1f3940c06a69b193e8583ad78722dca9396fcdc Mon Sep 17 00:00:00 2001 From: Diego Date: Fri, 9 Feb 2024 14:25:18 +0100 Subject: [PATCH 07/14] handle empty msgs correctly --- libp2p/protocols/pubsub/gossipsub.nim | 39 +++++++++++++++------------ 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 735534f14c..0b9194f420 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -281,32 +281,37 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = respControl.prune.add(g.handleGraft(peer, control.graft)) let messages = g.handleIWant(peer, control.iwant) - if - respControl.prune.len > 0 or - respControl.iwant.len > 0 or - messages.len > 0: - # iwant and prunes from here, also messages + let + isPruneNotEmpty = respControl.prune.len > 0 + isIWantNotEmpty = respControl.iwant.len > 0 - for smsg in messages: - for topic in smsg.topicIds: - if g.knownTopics.contains(topic): - libp2p_pubsub_broadcast_messages.inc(labelValues = [topic]) - else: - libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"]) + if isPruneNotEmpty or isIWantNotEmpty: - libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64) + if isIWantNotEmpty: + libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64) - for prune in respControl.prune: - if g.knownTopics.contains(prune.topicId): - libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicId]) - else: - libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"]) + if isPruneNotEmpty: + for prune in respControl.prune: + if g.knownTopics.contains(prune.topicId): + libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicId]) + else: + libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"]) trace "sending control message", msg = shortLog(respControl), peer g.send( peer, RPCMsg(control: some(respControl)), true) + + if messages.len > 0: + for smsg in messages: + for topic in smsg.topicIds: + if g.knownTopics.contains(topic): + libp2p_pubsub_broadcast_messages.inc(labelValues = [topic]) + else: + libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"]) + # iwant replies have lower priority + trace "sending iwant reply messages", peer g.send( peer, RPCMsg(messages: messages), false) From 20a8e57262d7fcc17582c8f42e87c8249585c799 Mon Sep 17 00:00:00 2001 From: Diego Date: Fri, 9 Feb 2024 14:42:06 +0100 Subject: [PATCH 08/14] wait for non-priority msg first before checking the priority queue --- libp2p/protocols/pubsub/pubsubpeer.nim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 6c15a5286d..23fc9fff11 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -372,11 +372,11 @@ proc clearSendPriorityQueue(p: PubSubPeer) = proc sendNonPriorityTask(p: PubSubPeer) {.async.} = while true: + # we send non-priority messages only if there are no pending priority messages + let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst() while p.rpcmessagequeue.sendPriorityQueue.len > 0: await p.rpcmessagequeue.sendPriorityQueue[0] p.clearSendPriorityQueue() - # we send non-priority messages only if there are no pending priority messages - let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst() when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId]) await p.sendMsg(msg) From 451637a644c45d26f279ba3573177a3558686ab4 Mon Sep 17 00:00:00 2001 From: Diego Date: Fri, 9 Feb 2024 16:11:43 +0100 Subject: [PATCH 09/14] await the last item --- libp2p/protocols/pubsub/pubsubpeer.nim | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 23fc9fff11..ddc97d43cf 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -375,8 +375,12 @@ proc sendNonPriorityTask(p: PubSubPeer) {.async.} = # we send non-priority messages only if there are no pending priority messages let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst() while p.rpcmessagequeue.sendPriorityQueue.len > 0: - await p.rpcmessagequeue.sendPriorityQueue[0] p.clearSendPriorityQueue() + # this minimizes the number of times we have to wait for something (each wait = performance cost) + # we will never wait for a finished future and by waiting for the last one, all that come before it are guaranteed + # to be finished already (since sends are processed in order). + if p.rpcmessagequeue.sendPriorityQueue.len > 0: + await p.rpcmessagequeue.sendPriorityQueue[^1] when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId]) await p.sendMsg(msg) From df0b98bfdde25d297650517a40519436db7c22b8 Mon Sep 17 00:00:00 2001 From: Diego Date: Fri, 9 Feb 2024 16:14:00 +0100 Subject: [PATCH 10/14] dont add finished future --- libp2p/protocols/pubsub/pubsubpeer.nim | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index ddc97d43cf..bb4d4f6c91 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -287,9 +287,11 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) { return if isHighPriority: - p.rpcmessagequeue.sendPriorityQueue.addLast(p.sendMsg(msg)) - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId]) + let f = p.sendMsg(msg) + if not f.finished: + p.rpcmessagequeue.sendPriorityQueue.addLast(f) + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId]) else: await p.rpcmessagequeue.nonPriorityQueue.addLast(msg) when defined(libp2p_expensive_metrics): From b16ec003276ddf250a18de54c00821111188c93d Mon Sep 17 00:00:00 2001 From: Diego Date: Fri, 9 Feb 2024 16:15:39 +0100 Subject: [PATCH 11/14] clear prio queue before adding to it --- libp2p/protocols/pubsub/pubsubpeer.nim | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index bb4d4f6c91..687243e6f9 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -250,6 +250,12 @@ template sendMetrics(msg: RPCMsg): untyped = # metrics libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t]) +proc clearSendPriorityQueue(p: PubSubPeer) = + while p.rpcmessagequeue.sendPriorityQueue.len > 0 and p.rpcmessagequeue.sendPriorityQueue[0].finished: + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId]) + discard p.rpcmessagequeue.sendPriorityQueue.popFirst() + proc sendMsg(p: PubSubPeer, msg: seq[byte]) {.async.} = if p.sendConn == nil: # Wait for a send conn to be setup. `connectOnce` will @@ -287,6 +293,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) { return if isHighPriority: + p.clearSendPriorityQueue() let f = p.sendMsg(msg) if not f.finished: p.rpcmessagequeue.sendPriorityQueue.addLast(f) @@ -366,12 +373,6 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool = return true return false -proc clearSendPriorityQueue(p: PubSubPeer) = - while p.rpcmessagequeue.sendPriorityQueue.len > 0 and p.rpcmessagequeue.sendPriorityQueue[0].finished: - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId]) - discard p.rpcmessagequeue.sendPriorityQueue.popFirst() - proc sendNonPriorityTask(p: PubSubPeer) {.async.} = while true: # we send non-priority messages only if there are no pending priority messages From 189ab5085e34f951a96f9f2d2bd1bf3a9cd48fad Mon Sep 17 00:00:00 2001 From: Diego Date: Tue, 13 Feb 2024 13:55:05 +0100 Subject: [PATCH 12/14] make subscriptions msgs high priority --- libp2p/protocols/pubsub/pubsub.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 818b21f5aa..ca6fdf3218 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -208,7 +208,7 @@ proc sendSubs*(p: PubSub, topics: openArray[string], subscribe: bool) = ## send subscriptions to remote peer - p.send(peer, RPCMsg.withSubs(topics, subscribe)) + p.send(peer, RPCMsg.withSubs(topics, subscribe), true) for topic in topics: if subscribe: From 558eca1833d87140b92b499c68e775371754feb8 Mon Sep 17 00:00:00 2001 From: Diego Date: Tue, 13 Feb 2024 16:09:55 +0100 Subject: [PATCH 13/14] remove default value from isHighPriority param --- libp2p/protocols/pubsub/floodsub.nim | 4 +-- libp2p/protocols/pubsub/gossipsub.nim | 14 +++++----- .../protocols/pubsub/gossipsub/behavior.nim | 8 +++--- libp2p/protocols/pubsub/pubsub.nim | 6 ++--- libp2p/protocols/pubsub/pubsubpeer.nim | 4 +-- tests/pubsub/testgossipinternal.nim | 8 +++--- tests/pubsub/testgossipsub.nim | 27 ++++++++++++------- 7 files changed, 40 insertions(+), 31 deletions(-) diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 819161c0c7..8809d6fe71 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -157,7 +157,7 @@ method rpcHandler*(f: FloodSub, # In theory, if topics are the same in all messages, we could batch - we'd # also have to be careful to only include validated messages - f.broadcast(toSendPeers, RPCMsg(messages: @[msg])) + f.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false) trace "Forwared message to peers", peers = toSendPeers.len f.updateMetrics(rpcMsg) @@ -219,7 +219,7 @@ method publish*(f: FloodSub, return 0 # Try to send to all peers that are known to be interested - f.broadcast(peers, RPCMsg(messages: @[msg])) + f.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true) when defined(libp2p_expensive_metrics): libp2p_pubsub_messages_published.inc(labelValues = [topic]) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 0b9194f420..59ad4376da 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -300,7 +300,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = trace "sending control message", msg = shortLog(respControl), peer g.send( peer, - RPCMsg(control: some(respControl)), true) + RPCMsg(control: some(respControl)), isHighPriority = true) if messages.len > 0: for smsg in messages: @@ -314,7 +314,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = trace "sending iwant reply messages", peer g.send( peer, - RPCMsg(messages: messages), false) + RPCMsg(messages: messages), isHighPriority = false) proc validateAndRelay(g: GossipSub, msg: Message, @@ -367,7 +367,7 @@ proc validateAndRelay(g: GossipSub, if msg.data.len > msgId.len * 10: g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage( idontwant: @[ControlIWant(messageIds: @[msgId])] - )))) + ))), isHighPriority = true) for peer in toSendPeers: for heDontWant in peer.heDontWants: @@ -381,7 +381,7 @@ proc validateAndRelay(g: GossipSub, # In theory, if topics are the same in all messages, we could batch - we'd # also have to be careful to only include validated messages - g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), false) + g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false) trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer for topic in msg.topicIds: if topic notin g.topics: continue @@ -452,7 +452,7 @@ method rpcHandler*(g: GossipSub, peer.recvObservers(rpcMsg) if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0: - g.send(peer, RPCMsg(pong: rpcMsg.ping), true) + g.send(peer, RPCMsg(pong: rpcMsg.ping), isHighPriority = true) peer.pingBudget.dec for i in 0.. 0: let graft = RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)]))) - g.broadcast(grafts, graft) + g.broadcast(grafts, graft, isHighPriority = true) if prunes.len > 0: let prune = RPCMsg(control: some(ControlMessage( prune: @[ControlPrune( topicID: topic, peers: g.peerExchangeList(topic), backoff: g.parameters.pruneBackoff.seconds.uint64)]))) - g.broadcast(prunes, prune) + g.broadcast(prunes, prune, isHighPriority = true) proc dropFanoutPeers*(g: GossipSub) {.raises: [].} = # drop peers that we haven't published to in @@ -669,7 +669,7 @@ proc onHeartbeat(g: GossipSub) {.raises: [].} = topicID: t, peers: g.peerExchangeList(t), backoff: g.parameters.pruneBackoff.seconds.uint64)]))) - g.broadcast(prunes, prune) + g.broadcast(prunes, prune, isHighPriority = true) # pass by ptr in order to both signal we want to update metrics # and as well update the struct for each topic during this iteration @@ -691,7 +691,7 @@ proc onHeartbeat(g: GossipSub) {.raises: [].} = libp2p_pubsub_broadcast_ihave.inc(labelValues = [ihave.topicId]) else: libp2p_pubsub_broadcast_ihave.inc(labelValues = ["generic"]) - g.send(peer, RPCMsg(control: some(control))) + g.send(peer, RPCMsg(control: some(control)), isHighPriority = true) g.mcache.shift() # shift the cache diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index ca6fdf3218..14b99ecbff 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -138,7 +138,7 @@ method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} = libp2p_pubsub_peers.set(p.peers.len.int64) -proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool = false) {.raises: [].} = +proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool) {.raises: [].} = ## Attempt to send `msg` to remote peer ## @@ -149,7 +149,7 @@ proc broadcast*( p: PubSub, sendPeers: auto, # Iteratble[PubSubPeer] msg: RPCMsg, - isHighPriority: bool = false) {.raises: [].} = + isHighPriority: bool) {.raises: [].} = ## Attempt to send `msg` to the given peers let npeers = sendPeers.len.int64 @@ -208,7 +208,7 @@ proc sendSubs*(p: PubSub, topics: openArray[string], subscribe: bool) = ## send subscriptions to remote peer - p.send(peer, RPCMsg.withSubs(topics, subscribe), true) + p.send(peer, RPCMsg.withSubs(topics, subscribe), isHighPriority = true) for topic in topics: if subscribe: diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 687243e6f9..d810fd2813 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -281,7 +281,7 @@ proc sendMsg(p: PubSubPeer, msg: seq[byte]) {.async.} = await conn.close() # This will clean up the send connection -proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) {.async.} = +proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool) {.async.} = doAssert(not isNil(p), "pubsubpeer nil!") if msg.len <= 0: @@ -340,7 +340,7 @@ iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: else: trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg) -proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool = false) {.async.} = +proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool) {.async.} = # When sending messages, we take care to re-encode them with the right # anonymization flag to ensure that we're not penalized for sending invalid # or malicious data on the wire - in particular, re-encoding protects against diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index 51ae65ff9a..b5f6fd97f4 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -779,7 +779,7 @@ suite "GossipSub internal": gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage( ihave: @[ControlIHave(topicId: "foobar", messageIds: iwantMessageIds)] - )))) + ))), isHighPriority = false) checkUntilTimeout: receivedMessages[] == sentMessages check receivedMessages[].len == 2 @@ -796,7 +796,7 @@ suite "GossipSub internal": gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage( ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)] - )))) + ))), isHighPriority = false) await sleepAsync(300.milliseconds) checkUntilTimeout: receivedMessages[].len == 0 @@ -813,7 +813,7 @@ suite "GossipSub internal": gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage( ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)] - )))) + ))), isHighPriority = false) checkUntilTimeout: receivedMessages[] == sentMessages check receivedMessages[].len == 2 @@ -831,7 +831,7 @@ suite "GossipSub internal": gossip1.broadcast(gossip1.mesh["foobar"], RPCMsg(control: some(ControlMessage( ihave: @[ControlIHave(topicId: "foobar", messageIds: bigIWantMessageIds)] - )))) + ))), isHighPriority = false) var smallestSet: HashSet[seq[byte]] let seqs = toSeq(sentMessages) diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 1081fe2552..a843ed51d5 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -912,7 +912,7 @@ suite "GossipSub": gossip3.broadcast(gossip3.mesh["foobar"], RPCMsg(control: some(ControlMessage( idontwant: @[ControlIWant(messageIds: @[newSeq[byte](10)])] - )))) + ))), isHighPriority = true) checkUntilTimeout: gossip2.mesh.getOrDefault("foobar").anyIt(it.heDontWants[^1].len == 1) tryPublish await nodes[0].publish("foobar", newSeq[byte](10000)), 1 @@ -968,7 +968,10 @@ suite "GossipSub": let rateLimitHits = currentRateLimitHits() let (nodes, gossip0, gossip1) = await initializeGossipTest() - gossip0.broadcast(gossip0.mesh["foobar"], RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](10))])) + gossip0.broadcast( + gossip0.mesh["foobar"], + RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](10))]), + isHighPriority = true) await sleepAsync(300.millis) check currentRateLimitHits() == rateLimitHits @@ -976,7 +979,10 @@ suite "GossipSub": # Disconnect peer when rate limiting is enabled gossip1.parameters.disconnectPeerAboveRateLimit = true - gossip0.broadcast(gossip0.mesh["foobar"], RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](12))])) + gossip0.broadcast( + gossip0.mesh["foobar"], + RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](12))]), + isHighPriority = true) await sleepAsync(300.millis) check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true @@ -990,7 +996,7 @@ suite "GossipSub": let (nodes, gossip0, gossip1) = await initializeGossipTest() # Simulate sending an undecodable message - await gossip1.peers[gossip0.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](33, 1.byte)) + await gossip1.peers[gossip0.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](33, 1.byte), isHighPriority = true) await sleepAsync(300.millis) check currentRateLimitHits() == rateLimitHits + 1 @@ -998,7 +1004,7 @@ suite "GossipSub": # Disconnect peer when rate limiting is enabled gossip1.parameters.disconnectPeerAboveRateLimit = true - await gossip0.peers[gossip1.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](35, 1.byte)) + await gossip0.peers[gossip1.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](35, 1.byte), isHighPriority = true) checkUntilTimeout gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false check currentRateLimitHits() == rateLimitHits + 2 @@ -1014,7 +1020,7 @@ suite "GossipSub": PeerInfoMsg(peerId: PeerId(data: newSeq[byte](33))) ], backoff: 123'u64) ]))) - gossip0.broadcast(gossip0.mesh["foobar"], msg) + gossip0.broadcast(gossip0.mesh["foobar"], msg, isHighPriority = true) await sleepAsync(300.millis) check currentRateLimitHits() == rateLimitHits + 1 @@ -1027,7 +1033,7 @@ suite "GossipSub": PeerInfoMsg(peerId: PeerId(data: newSeq[byte](35))) ], backoff: 123'u64) ]))) - gossip0.broadcast(gossip0.mesh["foobar"], msg2) + gossip0.broadcast(gossip0.mesh["foobar"], msg2, isHighPriority = true) checkUntilTimeout gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false check currentRateLimitHits() == rateLimitHits + 2 @@ -1049,7 +1055,7 @@ suite "GossipSub": let msg = RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](40))]) - gossip0.broadcast(gossip0.mesh[topic], msg) + gossip0.broadcast(gossip0.mesh[topic], msg, isHighPriority = true) await sleepAsync(300.millis) check currentRateLimitHits() == rateLimitHits + 1 @@ -1057,7 +1063,10 @@ suite "GossipSub": # Disconnect peer when rate limiting is enabled gossip1.parameters.disconnectPeerAboveRateLimit = true - gossip0.broadcast(gossip0.mesh[topic], RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](35))])) + gossip0.broadcast( + gossip0.mesh[topic], + RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](35))]), + isHighPriority = true) checkUntilTimeout gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false check currentRateLimitHits() == rateLimitHits + 2 From ace2c68407d70534ad98bd51a09614fc318d6689 Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 14 Feb 2024 13:00:06 +0100 Subject: [PATCH 14/14] add docs --- libp2p/protocols/pubsub/pubsub.nim | 19 +++++++++++++++++-- libp2p/protocols/pubsub/pubsubpeer.nim | 17 +++++++++++++++++ 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 14b99ecbff..0cf736797e 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -139,8 +139,15 @@ method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} = libp2p_pubsub_peers.set(p.peers.len.int64) proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool) {.raises: [].} = - ## Attempt to send `msg` to remote peer + ## This procedure attempts to send a `msg` (of type `RPCMsg`) to the specified remote peer in the PubSub network. ## + ## Parameters: + ## - `p`: The `PubSub` instance. + ## - `peer`: An instance of `PubSubPeer` representing the peer to whom the message should be sent. + ## - `msg`: The `RPCMsg` instance that contains the message to be sent. + ## - `isHighPriority`: A boolean indicating whether the message should be treated as high priority. + ## High priority messages are sent immediately, while low priority messages are queued and sent only after all high + ## priority messages have been sent. trace "sending pubsub message to peer", peer, msg = shortLog(msg) asyncSpawn peer.send(msg, p.anonymize, isHighPriority) @@ -150,7 +157,15 @@ proc broadcast*( sendPeers: auto, # Iteratble[PubSubPeer] msg: RPCMsg, isHighPriority: bool) {.raises: [].} = - ## Attempt to send `msg` to the given peers + ## This procedure attempts to send a `msg` (of type `RPCMsg`) to a specified group of peers in the PubSub network. + ## + ## Parameters: + ## - `p`: The `PubSub` instance. + ## - `sendPeers`: An iterable of `PubSubPeer` instances representing the peers to whom the message should be sent. + ## - `msg`: The `RPCMsg` instance that contains the message to be broadcast. + ## - `isHighPriority`: A boolean indicating whether the message should be treated as high priority. + ## High priority messages are sent immediately, while low priority messages are queued and sent only after all high + ## priority messages have been sent. let npeers = sendPeers.len.int64 for sub in msg.subscriptions: diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index d810fd2813..d74ef133f2 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -282,6 +282,14 @@ proc sendMsg(p: PubSubPeer, msg: seq[byte]) {.async.} = await conn.close() # This will clean up the send connection proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool) {.async.} = + ## Asynchronously sends an encoded message to a specified `PubSubPeer`. + ## + ## Parameters: + ## - `p`: The `PubSubPeer` instance to which the message is to be sent. + ## - `msg`: The message to be sent, encoded as a sequence of bytes (`seq[byte]`). + ## - `isHighPriority`: A boolean indicating whether the message should be treated as high priority. + ## High priority messages are sent immediately, while low priority messages are queued and sent only after all high + ## priority messages have been sent. doAssert(not isNil(p), "pubsubpeer nil!") if msg.len <= 0: @@ -341,6 +349,15 @@ iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg) proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool) {.async.} = + ## Asynchronously sends an `RPCMsg` to a specified `PubSubPeer` with an option for anonymization. + ## + ## Parameters: + ## - `p`: The `PubSubPeer` instance to which the message is to be sent. + ## - `msg`: The `RPCMsg` instance representing the message to be sent. + ## - `anonymize`: A boolean flag indicating whether the message should be sent with anonymization. + ## - `isHighPriority`: A boolean flag indicating whether the message should be treated as high priority. + ## High priority messages are sent immediately, while low priority messages are queued and sent only after all high + ## priority messages have been sent. # When sending messages, we take care to re-encode them with the right # anonymization flag to ensure that we're not penalized for sending invalid # or malicious data on the wire - in particular, re-encoding protects against