Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pr1015-fix #1028

Closed
wants to merge 9 commits into from
2 changes: 1 addition & 1 deletion libp2p/muxers/mplex/lpchannel.nim
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ when defined(libp2p_network_protocols_metrics):
## EOF marker

const
MaxWrites = 1024 ##\
MaxWrites = 10024 ##\
## Maximum number of in-flight writes - after this, we disconnect the peer

LPChannelTrackerName* = "LPChannel"
Expand Down
2 changes: 2 additions & 0 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ method onPubSubPeerEvent*(p: GossipSub, peer: PubSubPeer, event: PubSubPeerEvent
peers.excl(peer)
for _, peers in p.fanout.mpairs():
peers.excl(peer)
of PubSubPeerEventKind.PermanentlyDisconnected:
p.unsubscribePeer(peer.peerId)

procCall FloodSub(p).onPubSubPeerEvent(peer, event)

Expand Down
4 changes: 3 additions & 1 deletion libp2p/protocols/pubsub/pubsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool) {.rai
## priority messages have been sent.

trace "sending pubsub message to peer", peer, msg = shortLog(msg)
asyncSpawn peer.send(msg, p.anonymize, isHighPriority)
peer.send(msg, p.anonymize, isHighPriority)

proc broadcast*(
p: PubSub,
Expand Down Expand Up @@ -294,6 +294,8 @@ method onPubSubPeerEvent*(p: PubSub, peer: PubSubPeer, event: PubSubPeerEvent) {
p.sendSubs(peer, toSeq(p.topics.keys), true)
of PubSubPeerEventKind.Disconnected:
discard
of PubSubPeerEventKind.PermanentlyDisconnected:
discard

method getOrCreatePeer*(
p: PubSub,
Expand Down
79 changes: 45 additions & 34 deletions libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ when defined(libp2p_expensive_metrics):
declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"])
declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"])

when defined(pubsubpeer_queue_metrics):
declareGauge(libp2p_gossipsub_priority_queue_size, "the number of messages in the priority queue", labels = ["id"])
declareGauge(libp2p_gossipsub_non_priority_queue_size, "the number of messages in the non-priority queue", labels = ["id"])

Expand All @@ -42,8 +43,9 @@ type
onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [].}

PubSubPeerEventKind* {.pure.} = enum
Connected
Disconnected
Connected # Stream opened
Disconnected # Stream closed
PermanentlyDisconnected # Error when trying to open stream

PubSubPeerEvent* = object
kind*: PubSubPeerEventKind
Expand Down Expand Up @@ -95,6 +97,10 @@ when defined(libp2p_agents_metrics):
#so we have to read the parents short agent..
p.sendConn.getWrapped().shortAgent

proc emitEvent(p: PubSubPeer, event: PubSubPeerEventKind) =
if p.onEvent != nil:
p.onEvent(p, PubSubPeerEvent(kind: event))

proc getAgent*(peer: PubSubPeer): string =
return
when defined(libp2p_agents_metrics):
Expand Down Expand Up @@ -147,6 +153,19 @@ proc sendObservers(p: PubSubPeer, msg: var RPCMsg) =
if not(isNil(obs)): # TODO: should never be nil, but...
obs.onSend(p, msg)

proc stopSendNonPriorityTask*(p: PubSubPeer) =
if not p.rpcmessagequeue.sendNonPriorityTask.isNil:
debug "stopping sendNonPriorityTask", p
p.rpcmessagequeue.sendNonPriorityTask.cancel()
p.rpcmessagequeue.sendNonPriorityTask = nil
for f in p.rpcmessagequeue.sendPriorityQueue:
f.cancel()
p.rpcmessagequeue.sendPriorityQueue.clear()
p.rpcmessagequeue.nonPriorityQueue.clear()
when defined(pubsubpeer_queue_metrics):
libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0)

proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
debug "starting pubsub read loop",
conn, peer = p, closed = conn.closed
Expand All @@ -162,13 +181,10 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =

await p.handler(p, data)
data = newSeq[byte]() # Release memory
except PeerRateLimitError as exc:
debug "Peer rate limit exceeded, exiting read while", conn, peer = p, error = exc.msg
except CatchableError as exc:
debug "Exception occurred in PubSubPeer.handle",
conn, peer = p, closed = conn.closed, exc = exc.msg
finally:
await conn.close()
except PeerRateLimitError as exc:
debug "Peer rate limit exceeded, exiting read while", conn, peer = p, error = exc.msg
except CancelledError:
# This is top-level procedure which will work as separate task, so it
# do not need to propagate CancelledError.
Expand Down Expand Up @@ -201,8 +217,7 @@ proc connectOnce(p: PubSubPeer): Future[void] {.async.} =
p.sendConn = newConn
p.address = if p.sendConn.observedAddr.isSome: some(p.sendConn.observedAddr.get) else: none(MultiAddress)

if p.onEvent != nil:
p.onEvent(p, PubSubPeerEvent(kind: PubSubPeerEventKind.Connected))
p.emitEvent(PubSubPeerEventKind.Connected)

await handle(p, newConn)
finally:
Expand All @@ -215,24 +230,24 @@ proc connectOnce(p: PubSubPeer): Future[void] {.async.} =
p.connectedFut.complete()

try:
if p.onEvent != nil:
p.onEvent(p, PubSubPeerEvent(kind: PubSubPeerEventKind.Disconnected))
p.emitEvent(PubSubPeerEventKind.Disconnected)
except CancelledError as exc:
raise exc
except CatchableError as exc:
debug "Errors during diconnection events", error = exc.msg

# don't cleanup p.address else we leak some gossip stat table

proc connectImpl(p: PubSubPeer) {.async.} =
proc connectImpl(peer: PubSubPeer) {.async.} =
try:
# Keep trying to establish a connection while it's possible to do so - the
# send connection might get disconnected due to a timeout or an unrelated
# issue so we try to get a new on
while true:
await connectOnce(p)
await connectOnce(peer)
except CatchableError as exc: # never cancelled
debug "Could not establish send connection", msg = exc.msg
debug "Could not establish send connection", peer, msg = exc.msg
peer.emitEvent(PubSubPeerEventKind.PermanentlyDisconnected)

proc connect*(p: PubSubPeer) =
if p.connected:
Expand All @@ -251,10 +266,17 @@ template sendMetrics(msg: RPCMsg): untyped =
libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t])

proc clearSendPriorityQueue(p: PubSubPeer) =
while p.rpcmessagequeue.sendPriorityQueue.len > 0 and p.rpcmessagequeue.sendPriorityQueue[0].finished:
when defined(libp2p_expensive_metrics):
while p.rpcmessagequeue.sendPriorityQueue.len > 0:
# We can arrive here from sendNonPriorityTask in a state where the last future is finished, but the first isn't.
# Checking the first and the last is a workaround for that, but it needs investigation.
if p.rpcmessagequeue.sendPriorityQueue[0].finished:
discard p.rpcmessagequeue.sendPriorityQueue.popFirst()
elif p.rpcmessagequeue.sendPriorityQueue[^1].finished:
discard p.rpcmessagequeue.sendPriorityQueue.popLast()
else:
break
when defined(pubsubpeer_queue_metrics):
libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId])
discard p.rpcmessagequeue.sendPriorityQueue.popFirst()

proc sendMsg(p: PubSubPeer, msg: seq[byte]) {.async.} =
if p.sendConn == nil:
Expand Down Expand Up @@ -305,11 +327,11 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool) {.async.}
let f = p.sendMsg(msg)
if not f.finished:
p.rpcmessagequeue.sendPriorityQueue.addLast(f)
when defined(libp2p_expensive_metrics):
when defined(pubsubpeer_queue_metrics):
libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId])
else:
await p.rpcmessagequeue.nonPriorityQueue.addLast(msg)
when defined(libp2p_expensive_metrics):
when defined(pubsubpeer_queue_metrics):
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId])
trace "message queued", p, msg = shortLog(msg)

Expand Down Expand Up @@ -348,7 +370,7 @@ iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize:
else:
trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg)

proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool) {.async.} =
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool) {.raises: [].} =
## Asynchronously sends an `RPCMsg` to a specified `PubSubPeer` with an option for anonymization.
##
## Parameters:
Expand Down Expand Up @@ -377,11 +399,11 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool) {.

if encoded.len > p.maxMessageSize and msg.messages.len > 1:
for encodedSplitMsg in splitRPCMsg(p, msg, p.maxMessageSize, anonymize):
await p.sendEncoded(encodedSplitMsg, isHighPriority)
asyncSpawn p.sendEncoded(encodedSplitMsg, isHighPriority)
else:
# If the message size is within limits, send it as is
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
await p.sendEncoded(encoded, isHighPriority)
asyncSpawn p.sendEncoded(encoded, isHighPriority)

proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
for sentIHave in p.sentIHaves.mitems():
Expand All @@ -401,7 +423,7 @@ proc sendNonPriorityTask(p: PubSubPeer) {.async.} =
# to be finished already (since sends are processed in order).
if p.rpcmessagequeue.sendPriorityQueue.len > 0:
await p.rpcmessagequeue.sendPriorityQueue[^1]
when defined(libp2p_expensive_metrics):
when defined(pubsubpeer_queue_metrics):
libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId])
await p.sendMsg(msg)

Expand All @@ -410,17 +432,6 @@ proc startSendNonPriorityTask(p: PubSubPeer) =
if p.rpcmessagequeue.sendNonPriorityTask.isNil:
p.rpcmessagequeue.sendNonPriorityTask = p.sendNonPriorityTask()

proc stopSendNonPriorityTask*(p: PubSubPeer) =
if not p.rpcmessagequeue.sendNonPriorityTask.isNil:
debug "stopping sendNonPriorityTask", p
p.rpcmessagequeue.sendNonPriorityTask.cancel()
p.rpcmessagequeue.sendNonPriorityTask = nil
p.rpcmessagequeue.sendPriorityQueue.clear()
p.rpcmessagequeue.nonPriorityQueue.clear()
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0)

proc new(T: typedesc[RpcMessageQueue]): T =
return T(
sendPriorityQueue: initDeque[Future[void]](),
Expand Down
Loading