Skip to content

Commit

Permalink
use events to process the queue
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomrsantos committed Jan 18, 2024
1 parent 8aa739a commit 65e5c01
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 104 deletions.
107 changes: 57 additions & 50 deletions libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ type
DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init
OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].}

RpcMessageQueue* = ref object
priorityQueue: AsyncQueue[seq[byte]]
nonPriorityQueue: AsyncQueue[seq[byte]]
messageAvailableEvent: AsyncEvent
queueProcessingTask: Future[void]

PubSubPeer* = ref object of RootObj
getConn*: GetConn # callback to establish a new send connection
onEvent*: OnEvent # Connectivity updates for peer
Expand All @@ -76,7 +82,6 @@ type
overheadRateLimitOpt*: Opt[TokenBucket]

rpcmessagequeue: RpcMessageQueue
queueProcessingTask: Future[void]

RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void]
{.gcsafe, raises: [].}
Expand Down Expand Up @@ -263,13 +268,14 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) {
return

if isHighPriority:
await p.rpcmessagequeue.addPriorityMessage(msg)
p.rpcmessagequeue.priorityQueue.putNoWait(msg)
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId])
else:
await p.rpcmessagequeue.addNonPriorityMessage(msg)
p.rpcmessagequeue.nonPriorityQueue.putNoWait(msg)
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId])
p.rpcmessagequeue.messageAvailableEvent.fire()

iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] =
## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances.
Expand Down Expand Up @@ -339,62 +345,63 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
return true
return false

proc getMessage(rpcMessageQueue: RpcMessageQueue, p: PubSubPeer): Opt[seq[byte]] =
var m = rpcMessageQueue.getPriorityMessage()
if m.isSome():
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId])
return m
else:
m = rpcMessageQueue.getNonPriorityMessage()
if m.isSome():
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId])
return m
else:
return Opt.none(seq[byte])

proc processMessages(p: PubSubPeer) {.async.} =
proc sendMsg(msg: seq[byte]) {.async.} =
if p.sendConn == nil:
# Wait for a send conn to be setup. `connectOnce` will
# complete this even if the sendConn setup failed
await p.connectedFut

var conn = p.sendConn
if conn == nil or conn.closed():
debug "No send connection", msg = shortLog(msg)
return

trace "sending encoded msgs to peer", p, conn, encoded = shortLog(msg)

try:
await conn.writeLp(msg)
trace "sent pubsub message to remote", conn
except CatchableError as exc: # never cancelled
# Because we detach the send call from the currently executing task using
# asyncSpawn, no exceptions may leak out of it
trace "Unable to send to remote", conn, msg = exc.msg
# Next time sendConn is used, it will be have its close flag set and thus
# will be recycled

await conn.close() # This will clean up the send connection
while true:
let m = p.rpcmessagequeue.getMessage(p)
m.withValue(msg):
if p.sendConn == nil:
# Wait for a send conn to be setup. `connectOnce` will
# complete this even if the sendConn setup failed
await p.connectedFut

var conn = p.sendConn
if conn == nil or conn.closed():
debug "No send connection", msg = shortLog(msg)
return

trace "sending encoded msgs to peer", p, conn, encoded = shortLog(msg)

try:
await conn.writeLp(msg)
trace "sent pubsub message to remote", conn
except CatchableError as exc: # never cancelled
# Because we detach the send call from the currently executing task using
# asyncSpawn, no exceptions may leak out of it
trace "Unable to send to remote", conn, msg = exc.msg
# Next time sendConn is used, it will be have its close flag set and thus
# will be recycled

await conn.close() # This will clean up the send connection
# Include a delay or a mechanism to prevent this loop from consuming too much CPU
await sleepAsync(10) # For example, a 10 ms sleep
await p.rpcmessagequeue.messageAvailableEvent.wait() # Wait for an event
p.rpcmessagequeue.messageAvailableEvent.clear() # Reset the event after handling

if not p.rpcmessagequeue.priorityQueue.empty(): # Process messages from the priority queue first
let message = await p.rpcmessagequeue.priorityQueue.get()
await sendMsg(message)
elif not p.rpcmessagequeue.nonPriorityQueue.empty(): # Then process messages from the non-priority queue
let message = await p.rpcmessagequeue.nonPriorityQueue.get()
await sendMsg(message)


proc startProcessingMessages(p: PubSubPeer) =
if p.queueProcessingTask.isNil:
p.queueProcessingTask = p.processMessages()
if p.rpcmessagequeue.queueProcessingTask.isNil:
p.rpcmessagequeue.queueProcessingTask = p.processMessages()

proc stopProcessingMessages*(p: PubSubPeer) =
if not p.queueProcessingTask.isNil:
p.queueProcessingTask.cancel()
p.rpcmessagequeue.clear()
if not p.rpcmessagequeue.queueProcessingTask.isNil:
p.rpcmessagequeue.queueProcessingTask.cancel()
p.rpcmessagequeue.queueProcessingTask = nil
p.rpcmessagequeue.priorityQueue.clear()
p.rpcmessagequeue.nonPriorityQueue.clear()
libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0)

proc new(T: typedesc[RpcMessageQueue]): T =
return T(
priorityQueue: newAsyncQueue[seq[byte]](),
nonPriorityQueue: newAsyncQueue[seq[byte]](),
messageAvailableEvent: newAsyncEvent(),
)

proc new*(
T: typedesc[PubSubPeer],
peerId: PeerId,
Expand Down
54 changes: 0 additions & 54 deletions libp2p/protocols/pubsub/rpcmessagequeue.nim

This file was deleted.

0 comments on commit 65e5c01

Please sign in to comment.