Skip to content

Commit

Permalink
fix: code review
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Jan 13, 2025
1 parent e1b48e5 commit 97f4bdc
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 46 deletions.
7 changes: 2 additions & 5 deletions examples/tutorial_4_gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,8 @@ proc oneNode(node: Node, rng: ref HmacDrbgContext) {.async.} =
node.gossip.subscribe(
"metrics",
proc(topic: string, data: seq[byte]) {.async: (raises: []).} =
let m = MetricList.decode(data)
if m.isErr:
raiseAssert "failed to decode metric"
else:
echo m
let m = MetricList.decode(data).expect("metric can be decoded")
echo m
,
)
else:
Expand Down
2 changes: 0 additions & 2 deletions libp2p/protocols/pubsub/floodsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,6 @@ method init*(f: FloodSub) =
# This is top-level procedure which will work as separate task, so it
# do not need to propagate CancelledError.
trace "Unexpected cancellation in floodsub handler", conn
except CatchableError as exc:
trace "FloodSub handler leaks an error", description = exc.msg, conn

f.handler = handler
f.codec = FloodSubCodec
Expand Down
6 changes: 3 additions & 3 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,6 @@ method init*(g: GossipSub) =
# This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError.
trace "Unexpected cancellation in gossipsub handler", conn
except CatchableError as exc:
trace "GossipSub handler leaks an error", description = exc.msg, conn

g.handler = handler
g.codecs &= GossipSubCodec_12
Expand Down Expand Up @@ -490,7 +488,9 @@ proc validateAndRelay(
)

await handleData(g, topic, msg.data)
except CatchableError as exc:
except CancelledError as exc:
info "validateAndRelay failed", description = exc.msg
except PeerRateLimitError as exc:
info "validateAndRelay failed", description = exc.msg

proc dataAndTopicsIdSize(msgs: seq[Message]): int =
Expand Down
7 changes: 2 additions & 5 deletions libp2p/protocols/pubsub/pubsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -602,12 +602,9 @@ method validate*(
topic = topic, registered = toSeq(p.validators.keys)
if topic in p.validators:
trace "running validators for topic", topic = topic
try:
for validator in p.validators[topic]:
p.validators.withValue(topic, validators):
for validator in validators[]:
pending.add(validator(topic, message))
except KeyError:
raiseAssert "topic checked before"

var valResult = ValidationResult.Accept
let futs = await allFinished(pending)
for fut in futs:
Expand Down
17 changes: 7 additions & 10 deletions libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,7 @@ type
PubSubPeerEvent* = object
kind*: PubSubPeerEventKind

GetConn* = proc(): Future[Connection] {.
gcsafe, async: (raises: [GetConnDialError]), raises: []
.}
GetConn* = proc(): Future[Connection] {.gcsafe, async: (raises: [GetConnDialError]).}
DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].}
# have to pass peer as it's unknown during init
OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].}
Expand Down Expand Up @@ -211,7 +209,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async: (raises: []).} =
except PeerRateLimitError as exc:
debug "Peer rate limit exceeded, exiting read while",
conn, peer = p, description = exc.msg
except CatchableError as exc:
except LPStreamError as exc:
debug "Exception occurred in PubSubPeer.handle",
conn, peer = p, closed = conn.closed, description = exc.msg
finally:
Expand All @@ -220,9 +218,6 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async: (raises: []).} =
# This is top-level procedure which will work as separate task, so it
# do not need to propagate CancelledError.
trace "Unexpected cancellation in PubSubPeer.handle"
except CatchableError as exc:
trace "Exception occurred in PubSubPeer.handle",
conn, peer = p, closed = conn.closed, description = exc.msg
finally:
debug "exiting pubsub read loop", conn, peer = p, closed = conn.closed

Expand All @@ -242,8 +237,6 @@ proc closeSendConn(
p.onEvent(p, PubSubPeerEvent(kind: event))
except CancelledError as exc:
raise exc
except CatchableError as exc:
debug "Errors during diconnection events", description = exc.msg
# don't cleanup p.address else we leak some gossip stat table

proc connectOnce(
Expand Down Expand Up @@ -294,7 +287,11 @@ proc connectImpl(p: PubSubPeer) {.async: (raises: []).} =
p.connectedFut.complete()
return
await connectOnce(p)
except CatchableError as exc: # never cancelled
except CancelledError as exc:
debug "Could not establish send connection", description = exc.msg
except LPError as exc:
debug "Could not establish send connection", description = exc.msg
except GetConnDialError as exc:
debug "Could not establish send connection", description = exc.msg

proc connect*(p: PubSubPeer) =
Expand Down
7 changes: 1 addition & 6 deletions tests/pubsub/testgossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -727,12 +727,7 @@ suite "GossipSub":
handler = proc(
topic: string, data: seq[byte]
) {.async: (raises: []), closure.} =
try:
if peerName notin seen:
seen[peerName] = 0
seen[peerName].inc
except KeyError:
raiseAssert "seen checked before"
seen.mgetOrPut(peerName, 0).inc()
check topic == "foobar"
if not seenFut.finished() and seen.len >= runs:
seenFut.complete()
Expand Down
18 changes: 3 additions & 15 deletions tests/pubsub/testgossipsub2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,8 @@ suite "GossipSub":
var handler: TopicHandler
closureScope:
var peerName = $dialer.peerInfo.peerId
handler = proc(
topic: string, data: seq[byte]
) {.async: (raises: []), closure.} =
try:
if peerName notin seen:
seen[peerName] = 0
seen[peerName].inc
except KeyError:
raiseAssert "seen checked before"
handler = proc(topic: string, data: seq[byte]) {.async: (raises: []).} =
seen.mgetOrPut(peerName, 0).inc()
info "seen up", count = seen.len
check topic == "foobar"
if not seenFut.finished() and seen.len >= runs:
Expand Down Expand Up @@ -282,12 +275,7 @@ suite "GossipSub":
handler = proc(
topic: string, data: seq[byte]
) {.async: (raises: []), closure.} =
try:
if peerName notin seen:
seen[peerName] = 0
seen[peerName].inc
except KeyError:
raiseAssert "seen checked before"
seen.mgetOrPut(peerName, 0).inc()
check topic == "foobar"
if not seenFut.finished() and seen.len >= runs:
seenFut.complete()
Expand Down

0 comments on commit 97f4bdc

Please sign in to comment.