Skip to content

Commit

Permalink
feat: make relayed messages non priority (don't use an explicit queue…
Browse files Browse the repository at this point in the history
… for priority msgs)
  • Loading branch information
diegomrsantos committed Jan 29, 2024
1 parent 5e205e2 commit 8a2f8db
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 64 deletions.
2 changes: 1 addition & 1 deletion libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ method unsubscribePeer*(g: GossipSub, peer: PeerId) =
for topic, info in stats[].topicInfos.mpairs:
info.firstMessageDeliveries = 0

pubSubPeer.stopProcessingMessages()
pubSubPeer.stopSendNonPriorityTask()

procCall FloodSub(g).unsubscribePeer(peer)

Expand Down
119 changes: 56 additions & 63 deletions libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ when defined(libp2p_expensive_metrics):

declareGauge(libp2p_gossipsub_priority_queue_size, "the number of messages in the priority queue", labels = ["id"])
declareGauge(libp2p_gossipsub_non_priority_queue_size, "the number of messages in the non-priority queue", labels = ["id"])
declareCounter(libp2p_gossipsub_priority_queue_messages_processed, "the number of messages processed in the priority queue", labels = ["id"])
declareCounter(libp2p_gossipsub_non_priority_queue_messages_processed, "the number of messages processed in the non-priority queue", labels = ["id"])

type
PeerRateLimitError* = object of CatchableError
Expand All @@ -57,10 +55,9 @@ type
OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].}

RpcMessageQueue* = ref object
priorityQueue: AsyncQueue[seq[byte]]
sendPriorityQueue: Deque[Future[void]]
nonPriorityQueue: AsyncQueue[seq[byte]]
queueProcessingTask: Future[void]
isProcessing: bool # Flag to indicate if processing is underway
sendNonPriorityTask: Future[void]

PubSubPeer* = ref object of RootObj
getConn*: GetConn # callback to establish a new send connection
Expand Down Expand Up @@ -252,6 +249,31 @@ template sendMetrics(msg: RPCMsg): untyped =
# metrics
libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t])

proc sendMsg(p: PubSubPeer, msg: seq[byte]) {.async.} =
if p.sendConn == nil:
# Wait for a send conn to be setup. `connectOnce` will
# complete this even if the sendConn setup failed
await p.connectedFut

var conn = p.sendConn
if conn == nil or conn.closed():
debug "No send connection", p, msg = shortLog(msg)
return

trace "sending encoded msgs to peer", conn, encoded = shortLog(msg)

try:
await conn.writeLp(msg)
trace "sent pubsub message to remote", conn
except CatchableError as exc: # never cancelled
# Because we detach the send call from the currently executing task using
# asyncSpawn, no exceptions may leak out of it
trace "Unable to send to remote", conn, msg = exc.msg
# Next time sendConn is used, it will be have its close flag set and thus
# will be recycled

await conn.close() # This will clean up the send connection

proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) {.async.} =
doAssert(not isNil(p), "pubsubpeer nil!")

Expand All @@ -264,11 +286,11 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) {
return

if isHighPriority:
await p.rpcmessagequeue.priorityQueue.put(msg)
p.rpcmessagequeue.sendPriorityQueue.addLast(p.sendMsg(msg))
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId])
else:
await p.rpcmessagequeue.nonPriorityQueue.put(msg)
await p.rpcmessagequeue.nonPriorityQueue.addLast(msg)
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId])
trace "message queued", p, msg = shortLog(msg)
Expand Down Expand Up @@ -341,69 +363,40 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
return true
return false

proc processMessages(p: PubSubPeer) {.async.} =
proc sendMsg(msg: seq[byte]) {.async.} =
if p.sendConn == nil:
# Wait for a send conn to be setup. `connectOnce` will
# complete this even if the sendConn setup failed
await p.connectedFut

var conn = p.sendConn
if conn == nil or conn.closed():
debug "No send connection", msg = shortLog(msg)
return

trace "sending encoded msgs to peer", p, conn, encoded = shortLog(msg)
proc clearSendPriorityQueue(p: PubSubPeer) =
while p.rpcmessagequeue.sendPriorityQueue.len > 0 and p.rpcmessagequeue.sendPriorityQueue[0].finished:
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId])
discard p.rpcmessagequeue.sendPriorityQueue.popFirst()

try:
await conn.writeLp(msg)
trace "sent pubsub message to remote", conn
except CatchableError as exc: # never cancelled
# Because we detach the send call from the currently executing task using
# asyncSpawn, no exceptions may leak out of it
trace "Unable to send to remote", conn, msg = exc.msg
# Next time sendConn is used, it will be have its close flag set and thus
# will be recycled

await conn.close() # This will clean up the send connection
proc sendNonPriorityTask(p: PubSubPeer) {.async.} =
while true:
var getFutures = @[p.rpcmessagequeue.priorityQueue.get(), p.rpcmessagequeue.nonPriorityQueue.get()]
while true:
let
finishedGet = await one(getFutures)
index = getFutures.find(finishedGet)
if index == 0:
getFutures[0] = p.rpcmessagequeue.priorityQueue.get()
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId])
trace "message dequeued from priority queue", p
else:
getFutures[1] = p.rpcmessagequeue.nonPriorityQueue.get()
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId])
trace "message dequeued from non-priority queue", p

let message = await finishedGet
await sendMsg(message)

proc startProcessingMessages(p: PubSubPeer) =
debug "starting processing messages", p
if p.rpcmessagequeue.queueProcessingTask.isNil:
p.rpcmessagequeue.queueProcessingTask = p.processMessages()

proc stopProcessingMessages*(p: PubSubPeer) =
if not p.rpcmessagequeue.queueProcessingTask.isNil:
debug "stopping processing messages", p
p.rpcmessagequeue.queueProcessingTask.cancel()
p.rpcmessagequeue.queueProcessingTask = nil
p.rpcmessagequeue.priorityQueue.clear()
let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst()
while p.rpcmessagequeue.sendPriorityQueue.len > 0:
await p.rpcmessagequeue.sendPriorityQueue[0]
p.clearSendPriorityQueue()
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId])
await p.sendMsg(msg)

proc startSendNonPriorityTask(p: PubSubPeer) =
debug "starting sendNonPriorityTask", p
if p.rpcmessagequeue.sendNonPriorityTask.isNil:
p.rpcmessagequeue.sendNonPriorityTask = p.sendNonPriorityTask()

proc stopSendNonPriorityTask*(p: PubSubPeer) =
if not p.rpcmessagequeue.sendNonPriorityTask.isNil:
debug "stopping sendNonPriorityTask", p
p.rpcmessagequeue.sendNonPriorityTask.cancel()
p.rpcmessagequeue.sendNonPriorityTask = nil
p.rpcmessagequeue.sendPriorityQueue.clear()
p.rpcmessagequeue.nonPriorityQueue.clear()
libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0)

proc new(T: typedesc[RpcMessageQueue]): T =
return T(
priorityQueue: newAsyncQueue[seq[byte]](),
sendPriorityQueue: initDeque[Future[void]](),
nonPriorityQueue: newAsyncQueue[seq[byte]](),
)

Expand All @@ -428,4 +421,4 @@ proc new*(
)
result.sentIHaves.addFirst(default(HashSet[MessageId]))
result.heDontWants.addFirst(default(HashSet[MessageId]))
result.startProcessingMessages()
result.startSendNonPriorityTask()

0 comments on commit 8a2f8db

Please sign in to comment.