From 38c0af4e78b906e839b86c0097f16595646f1f95 Mon Sep 17 00:00:00 2001 From: Diego Date: Thu, 1 Feb 2024 13:09:06 +0100 Subject: [PATCH] rename ttl property and message type --- libp2p/protocols/pubsub/pubsubpeer.nim | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 9ca672ee4b..c735bc0a3f 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -55,15 +55,15 @@ type DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].} - Ttlmessage* = object + QueuedMessage* = object msg*: seq[byte] - ttl*: Moment + addedAt*: Moment RpcMessageQueue* = ref object # Tracks async tasks for sending high-priority peer-published messages. sendPriorityQueue: Deque[Future[void]] # Queue for lower-priority messages, like "IWANT" replies and relay messages. - nonPriorityQueue: AsyncQueue[seq[byte]] + nonPriorityQueue: AsyncQueue[QueuedMessage] # Task for processing non-priority message queue. sendNonPriorityTask: Future[void] # The max duration a message to be relayed can wait to be sent before it is dropped. The default is 500ms. @@ -300,7 +300,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) { when defined(libp2p_expensive_metrics): libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId]) else: - await p.rpcmessagequeue.nonPriorityQueue.addLast(Ttlmessage(msg: msg, ttl: Moment.now())) + await p.rpcmessagequeue.nonPriorityQueue.addLast(QueuedMessage(msg: msg, addedAt: Moment.now())) when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId]) trace "message queued", p, msg = shortLog(msg) @@ -388,7 +388,7 @@ proc sendNonPriorityTask(p: PubSubPeer) {.async.} = let ttlMsg = await p.rpcmessagequeue.nonPriorityQueue.popFirst() when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId]) - if Moment.now() - ttlMsg.ttl >= p.rpcmessagequeue.maxDurationInNonPriorityQueue: + if Moment.now() - ttlMsg.addedAt >= p.rpcmessagequeue.maxDurationInNonPriorityQueue: when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_msgs_dropped.inc(labelValues = [$p.peerId]) continue @@ -413,7 +413,7 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) = proc new(T: typedesc[RpcMessageQueue], maxDurationInNonPriorityQueue = 1.seconds): T = return T( sendPriorityQueue: initDeque[Future[void]](), - nonPriorityQueue: newAsyncQueue[Ttlmessage](), + nonPriorityQueue: newAsyncQueue[QueuedMessage](), maxDurationInNonPriorityQueue: maxDurationInNonPriorityQueue, )