diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 6ca23e046f..5410085eee 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -217,7 +217,7 @@ method unsubscribePeer*(g: GossipSub, peer: PeerId) = for topic, info in stats[].topicInfos.mpairs: info.firstMessageDeliveries = 0 - pubSubPeer.stopProcessingMessages() + pubSubPeer.stopSendNonPriorityTask() procCall FloodSub(g).unsubscribePeer(peer) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 9c951caf6c..dbb2001586 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -35,8 +35,6 @@ when defined(libp2p_expensive_metrics): declareGauge(libp2p_gossipsub_priority_queue_size, "the number of messages in the priority queue", labels = ["id"]) declareGauge(libp2p_gossipsub_non_priority_queue_size, "the number of messages in the non-priority queue", labels = ["id"]) -declareCounter(libp2p_gossipsub_priority_queue_messages_processed, "the number of messages processed in the priority queue", labels = ["id"]) -declareCounter(libp2p_gossipsub_non_priority_queue_messages_processed, "the number of messages processed in the non-priority queue", labels = ["id"]) type PeerRateLimitError* = object of CatchableError @@ -57,10 +55,9 @@ type OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].} RpcMessageQueue* = ref object - priorityQueue: AsyncQueue[seq[byte]] + sendPriorityQueue: Deque[Future[void]] nonPriorityQueue: AsyncQueue[seq[byte]] - queueProcessingTask: Future[void] - isProcessing: bool # Flag to indicate if processing is underway + sendNonPriorityTask: Future[void] PubSubPeer* = ref object of RootObj getConn*: GetConn # callback to establish a new send connection @@ -252,6 +249,31 @@ template sendMetrics(msg: RPCMsg): untyped = # metrics libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t]) +proc sendMsg(p: PubSubPeer, msg: seq[byte]) {.async.} = + if p.sendConn == nil: + # Wait for a send conn to be setup. `connectOnce` will + # complete this even if the sendConn setup failed + await p.connectedFut + + var conn = p.sendConn + if conn == nil or conn.closed(): + debug "No send connection", p, msg = shortLog(msg) + return + + trace "sending encoded msgs to peer", conn, encoded = shortLog(msg) + + try: + await conn.writeLp(msg) + trace "sent pubsub message to remote", conn + except CatchableError as exc: # never cancelled + # Because we detach the send call from the currently executing task using + # asyncSpawn, no exceptions may leak out of it + trace "Unable to send to remote", conn, msg = exc.msg + # Next time sendConn is used, it will be have its close flag set and thus + # will be recycled + + await conn.close() # This will clean up the send connection + proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) {.async.} = doAssert(not isNil(p), "pubsubpeer nil!") @@ -264,11 +286,11 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) { return if isHighPriority: - await p.rpcmessagequeue.priorityQueue.put(msg) + p.rpcmessagequeue.sendPriorityQueue.addLast(p.sendMsg(msg)) when defined(libp2p_expensive_metrics): libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId]) else: - await p.rpcmessagequeue.nonPriorityQueue.put(msg) + await p.rpcmessagequeue.nonPriorityQueue.addLast(msg) when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId]) trace "message queued", p, msg = shortLog(msg) @@ -341,69 +363,40 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool = return true return false -proc processMessages(p: PubSubPeer) {.async.} = - proc sendMsg(msg: seq[byte]) {.async.} = - if p.sendConn == nil: - # Wait for a send conn to be setup. `connectOnce` will - # complete this even if the sendConn setup failed - await p.connectedFut - - var conn = p.sendConn - if conn == nil or conn.closed(): - debug "No send connection", msg = shortLog(msg) - return - - trace "sending encoded msgs to peer", p, conn, encoded = shortLog(msg) +proc clearSendPriorityQueue(p: PubSubPeer) = + while p.rpcmessagequeue.sendPriorityQueue.len > 0 and p.rpcmessagequeue.sendPriorityQueue[0].finished: + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId]) + discard p.rpcmessagequeue.sendPriorityQueue.popFirst() - try: - await conn.writeLp(msg) - trace "sent pubsub message to remote", conn - except CatchableError as exc: # never cancelled - # Because we detach the send call from the currently executing task using - # asyncSpawn, no exceptions may leak out of it - trace "Unable to send to remote", conn, msg = exc.msg - # Next time sendConn is used, it will be have its close flag set and thus - # will be recycled - - await conn.close() # This will clean up the send connection +proc sendNonPriorityTask(p: PubSubPeer) {.async.} = while true: - var getFutures = @[p.rpcmessagequeue.priorityQueue.get(), p.rpcmessagequeue.nonPriorityQueue.get()] - while true: - let - finishedGet = await one(getFutures) - index = getFutures.find(finishedGet) - if index == 0: - getFutures[0] = p.rpcmessagequeue.priorityQueue.get() - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId]) - trace "message dequeued from priority queue", p - else: - getFutures[1] = p.rpcmessagequeue.nonPriorityQueue.get() - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId]) - trace "message dequeued from non-priority queue", p - - let message = await finishedGet - await sendMsg(message) - -proc startProcessingMessages(p: PubSubPeer) = - debug "starting processing messages", p - if p.rpcmessagequeue.queueProcessingTask.isNil: - p.rpcmessagequeue.queueProcessingTask = p.processMessages() - -proc stopProcessingMessages*(p: PubSubPeer) = - if not p.rpcmessagequeue.queueProcessingTask.isNil: - debug "stopping processing messages", p - p.rpcmessagequeue.queueProcessingTask.cancel() - p.rpcmessagequeue.queueProcessingTask = nil - p.rpcmessagequeue.priorityQueue.clear() + let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst() + while p.rpcmessagequeue.sendPriorityQueue.len > 0: + await p.rpcmessagequeue.sendPriorityQueue[0] + p.clearSendPriorityQueue() + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId]) + await p.sendMsg(msg) + +proc startSendNonPriorityTask(p: PubSubPeer) = + debug "starting sendNonPriorityTask", p + if p.rpcmessagequeue.sendNonPriorityTask.isNil: + p.rpcmessagequeue.sendNonPriorityTask = p.sendNonPriorityTask() + +proc stopSendNonPriorityTask*(p: PubSubPeer) = + if not p.rpcmessagequeue.sendNonPriorityTask.isNil: + debug "stopping sendNonPriorityTask", p + p.rpcmessagequeue.sendNonPriorityTask.cancel() + p.rpcmessagequeue.sendNonPriorityTask = nil + p.rpcmessagequeue.sendPriorityQueue.clear() p.rpcmessagequeue.nonPriorityQueue.clear() libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0) libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0) proc new(T: typedesc[RpcMessageQueue]): T = return T( - priorityQueue: newAsyncQueue[seq[byte]](), + sendPriorityQueue: initDeque[Future[void]](), nonPriorityQueue: newAsyncQueue[seq[byte]](), ) @@ -428,4 +421,4 @@ proc new*( ) result.sentIHaves.addFirst(default(HashSet[MessageId])) result.heDontWants.addFirst(default(HashSet[MessageId])) - result.startProcessingMessages() + result.startSendNonPriorityTask()