From 65e5c012c0742d1c0a1b14d767cd59f6ecd2f8c0 Mon Sep 17 00:00:00 2001 From: Diego Date: Thu, 18 Jan 2024 18:46:13 +0100 Subject: [PATCH] use events to process the queue --- libp2p/protocols/pubsub/pubsubpeer.nim | 107 +++++++++++--------- libp2p/protocols/pubsub/rpcmessagequeue.nim | 54 ---------- 2 files changed, 57 insertions(+), 104 deletions(-) delete mode 100644 libp2p/protocols/pubsub/rpcmessagequeue.nim diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 4b0efd357d..d6b09277f1 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -54,6 +54,12 @@ type DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].} + RpcMessageQueue* = ref object + priorityQueue: AsyncQueue[seq[byte]] + nonPriorityQueue: AsyncQueue[seq[byte]] + messageAvailableEvent: AsyncEvent + queueProcessingTask: Future[void] + PubSubPeer* = ref object of RootObj getConn*: GetConn # callback to establish a new send connection onEvent*: OnEvent # Connectivity updates for peer @@ -76,7 +82,6 @@ type overheadRateLimitOpt*: Opt[TokenBucket] rpcmessagequeue: RpcMessageQueue - queueProcessingTask: Future[void] RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void] {.gcsafe, raises: [].} @@ -263,13 +268,14 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) { return if isHighPriority: - await p.rpcmessagequeue.addPriorityMessage(msg) + p.rpcmessagequeue.priorityQueue.putNoWait(msg) when defined(libp2p_expensive_metrics): libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId]) else: - await p.rpcmessagequeue.addNonPriorityMessage(msg) + p.rpcmessagequeue.nonPriorityQueue.putNoWait(msg) when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId]) + p.rpcmessagequeue.messageAvailableEvent.fire() iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] = ## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances. @@ -339,62 +345,63 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool = return true return false -proc getMessage(rpcMessageQueue: RpcMessageQueue, p: PubSubPeer): Opt[seq[byte]] = - var m = rpcMessageQueue.getPriorityMessage() - if m.isSome(): - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId]) - return m - else: - m = rpcMessageQueue.getNonPriorityMessage() - if m.isSome(): - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId]) - return m - else: - return Opt.none(seq[byte]) - proc processMessages(p: PubSubPeer) {.async.} = + proc sendMsg(msg: seq[byte]) {.async.} = + if p.sendConn == nil: + # Wait for a send conn to be setup. `connectOnce` will + # complete this even if the sendConn setup failed + await p.connectedFut + + var conn = p.sendConn + if conn == nil or conn.closed(): + debug "No send connection", msg = shortLog(msg) + return + + trace "sending encoded msgs to peer", p, conn, encoded = shortLog(msg) + + try: + await conn.writeLp(msg) + trace "sent pubsub message to remote", conn + except CatchableError as exc: # never cancelled + # Because we detach the send call from the currently executing task using + # asyncSpawn, no exceptions may leak out of it + trace "Unable to send to remote", conn, msg = exc.msg + # Next time sendConn is used, it will be have its close flag set and thus + # will be recycled + + await conn.close() # This will clean up the send connection while true: - let m = p.rpcmessagequeue.getMessage(p) - m.withValue(msg): - if p.sendConn == nil: - # Wait for a send conn to be setup. `connectOnce` will - # complete this even if the sendConn setup failed - await p.connectedFut - - var conn = p.sendConn - if conn == nil or conn.closed(): - debug "No send connection", msg = shortLog(msg) - return - - trace "sending encoded msgs to peer", p, conn, encoded = shortLog(msg) - - try: - await conn.writeLp(msg) - trace "sent pubsub message to remote", conn - except CatchableError as exc: # never cancelled - # Because we detach the send call from the currently executing task using - # asyncSpawn, no exceptions may leak out of it - trace "Unable to send to remote", conn, msg = exc.msg - # Next time sendConn is used, it will be have its close flag set and thus - # will be recycled - - await conn.close() # This will clean up the send connection - # Include a delay or a mechanism to prevent this loop from consuming too much CPU - await sleepAsync(10) # For example, a 10 ms sleep + await p.rpcmessagequeue.messageAvailableEvent.wait() # Wait for an event + p.rpcmessagequeue.messageAvailableEvent.clear() # Reset the event after handling + + if not p.rpcmessagequeue.priorityQueue.empty(): # Process messages from the priority queue first + let message = await p.rpcmessagequeue.priorityQueue.get() + await sendMsg(message) + elif not p.rpcmessagequeue.nonPriorityQueue.empty(): # Then process messages from the non-priority queue + let message = await p.rpcmessagequeue.nonPriorityQueue.get() + await sendMsg(message) + proc startProcessingMessages(p: PubSubPeer) = - if p.queueProcessingTask.isNil: - p.queueProcessingTask = p.processMessages() + if p.rpcmessagequeue.queueProcessingTask.isNil: + p.rpcmessagequeue.queueProcessingTask = p.processMessages() proc stopProcessingMessages*(p: PubSubPeer) = - if not p.queueProcessingTask.isNil: - p.queueProcessingTask.cancel() - p.rpcmessagequeue.clear() + if not p.rpcmessagequeue.queueProcessingTask.isNil: + p.rpcmessagequeue.queueProcessingTask.cancel() + p.rpcmessagequeue.queueProcessingTask = nil + p.rpcmessagequeue.priorityQueue.clear() + p.rpcmessagequeue.nonPriorityQueue.clear() libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0) libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0) +proc new(T: typedesc[RpcMessageQueue]): T = + return T( + priorityQueue: newAsyncQueue[seq[byte]](), + nonPriorityQueue: newAsyncQueue[seq[byte]](), + messageAvailableEvent: newAsyncEvent(), + ) + proc new*( T: typedesc[PubSubPeer], peerId: PeerId, diff --git a/libp2p/protocols/pubsub/rpcmessagequeue.nim b/libp2p/protocols/pubsub/rpcmessagequeue.nim deleted file mode 100644 index f90f7e315c..0000000000 --- a/libp2p/protocols/pubsub/rpcmessagequeue.nim +++ /dev/null @@ -1,54 +0,0 @@ -# Nim-LibP2P -# Copyright (c) 2023 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -# * MIT license ([LICENSE-MIT](LICENSE-MIT)) -# at your option. -# This file may not be copied, modified, or distributed except according to -# those terms. - -{.push raises: [].} - -import chronos, chronicles, stew/results -import ../../stream/connection - -type - RpcMessageQueue* = ref object - priorityQueue: AsyncQueue[seq[byte]] - nonPriorityQueue: AsyncQueue[seq[byte]] - -proc addPriorityMessage*(aq: RpcMessageQueue; msg: seq[byte]) {.async.} = - await aq.priorityQueue.put(msg) - -proc addNonPriorityMessage*(aq: RpcMessageQueue; msg: seq[byte]) {.async.} = - await aq.nonPriorityQueue.put(msg) - -proc new*(T: typedesc[RpcMessageQueue]): T = - return T( - priorityQueue: newAsyncQueue[seq[byte]](), - nonPriorityQueue: newAsyncQueue[seq[byte]]() - ) - -proc getPriorityMessage*(rpcMessageQueue: RpcMessageQueue): Opt[seq[byte]] = - return - if not rpcMessageQueue.priorityQueue.empty(): - try: - Opt.some(rpcMessageQueue.priorityQueue.getNoWait()) - except AsyncQueueEmptyError: - Opt.none(seq[byte]) - else: - Opt.none(seq[byte]) - -proc getNonPriorityMessage*(rpcMessageQueue: RpcMessageQueue): Opt[seq[byte]] = - return - if not rpcMessageQueue.nonPriorityQueue.empty(): - try: - Opt.some(rpcMessageQueue.nonPriorityQueue.getNoWait()) - except AsyncQueueEmptyError: - Opt.none(seq[byte]) - else: - Opt.none(seq[byte]) - -proc clear*(rpcMessageQueue: RpcMessageQueue) = - rpcMessageQueue.priorityQueue.clear() - rpcMessageQueue.nonPriorityQueue.clear() \ No newline at end of file