diff --git a/examples/tutorial_4_gossipsub.nim b/examples/tutorial_4_gossipsub.nim index d51cdfdf89..fa451cedb0 100644 --- a/examples/tutorial_4_gossipsub.nim +++ b/examples/tutorial_4_gossipsub.nim @@ -75,7 +75,9 @@ proc oneNode(node: Node, rng: ref HmacDrbgContext) {.async.} = # This procedure will handle one of the node of the network node.gossip.addValidator( ["metrics"], - proc(topic: string, message: Message): Future[ValidationResult] {.async.} = + proc( + topic: string, message: Message + ): Future[ValidationResult] {.async: (raises: []).} = let decoded = MetricList.decode(message.data) if decoded.isErr: return ValidationResult.Reject @@ -92,8 +94,12 @@ proc oneNode(node: Node, rng: ref HmacDrbgContext) {.async.} = if node.hostname == "John": node.gossip.subscribe( "metrics", - proc(topic: string, data: seq[byte]) {.async.} = - echo MetricList.decode(data).tryGet() + proc(topic: string, data: seq[byte]) {.async: (raises: []).} = + let m = MetricList.decode(data) + if m.isErr: + raiseAssert "failed to decode metric" + else: + echo m , ) else: diff --git a/examples/tutorial_6_game.nim b/examples/tutorial_6_game.nim index 996494b266..9260ab74c8 100644 --- a/examples/tutorial_6_game.nim +++ b/examples/tutorial_6_game.nim @@ -194,7 +194,7 @@ proc networking(g: Game) {.async.} = gossip.subscribe( "/tron/matchmaking", - proc(topic: string, data: seq[byte]) {.async.} = + proc(topic: string, data: seq[byte]) {.async: (raises: []).} = # If we are still looking for an opponent, # try to match anyone broadcasting its address if g.peerFound.finished or g.hasCandidate: diff --git a/libp2p/daemon/daemonapi.nim b/libp2p/daemon/daemonapi.nim index ac50b6d4fa..bd93efabe5 100644 --- a/libp2p/daemon/daemonapi.nim +++ b/libp2p/daemon/daemonapi.nim @@ -162,7 +162,7 @@ type .} P2PPubSubCallback* = proc( api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage - ): Future[bool] {.gcsafe, raises: [CatchableError].} + ): Future[bool] {.gcsafe, async: (raises: [CatchableError]).} DaemonError* = object of LPError DaemonRemoteError* = object of DaemonError @@ -1296,7 +1296,7 @@ proc pubsubLoop(api: DaemonAPI, ticket: PubsubTicket) {.async.} = proc pubsubSubscribe*( api: DaemonAPI, topic: string, handler: P2PPubSubCallback -): Future[PubsubTicket] {.async.} = +): Future[PubsubTicket] {.async: (raises: [CatchableError]).} = ## Subscribe to topic ``topic``. var transp = await api.newConnection() try: diff --git a/libp2p/protobuf/minprotobuf.nim b/libp2p/protobuf/minprotobuf.nim index af8485b4e7..3b246455aa 100644 --- a/libp2p/protobuf/minprotobuf.nim +++ b/libp2p/protobuf/minprotobuf.nim @@ -16,8 +16,6 @@ export results, utility {.push public.} -const MaxMessageSize = 1'u shl 22 - type ProtoFieldKind* = enum ## Protobuf's field types enum @@ -39,7 +37,6 @@ type buffer*: seq[byte] offset*: int length*: int - maxSize*: uint ProtoHeader* = object wire*: ProtoFieldKind @@ -63,7 +60,6 @@ type VarintDecode MessageIncomplete BufferOverflow - MessageTooBig BadWireType IncorrectBlob RequiredFieldMissing @@ -99,11 +95,14 @@ template getProtoHeader*(field: ProtoField): uint64 = template toOpenArray*(pb: ProtoBuffer): untyped = toOpenArray(pb.buffer, pb.offset, len(pb.buffer) - 1) +template lenu64*(x: untyped): untyped = + uint64(len(x)) + template isEmpty*(pb: ProtoBuffer): bool = len(pb.buffer) - pb.offset <= 0 -template isEnough*(pb: ProtoBuffer, length: int): bool = - len(pb.buffer) - pb.offset - length >= 0 +template isEnough*(pb: ProtoBuffer, length: uint64): bool = + pb.offset <= len(pb.buffer) and length <= uint64(len(pb.buffer) - pb.offset) template getPtr*(pb: ProtoBuffer): pointer = cast[pointer](unsafeAddr pb.buffer[pb.offset]) @@ -127,33 +126,25 @@ proc vsizeof*(field: ProtoField): int {.inline.} = 0 proc initProtoBuffer*( - data: seq[byte], offset = 0, options: set[ProtoFlags] = {}, maxSize = MaxMessageSize + data: seq[byte], offset = 0, options: set[ProtoFlags] = {} ): ProtoBuffer = ## Initialize ProtoBuffer with shallow copy of ``data``. result.buffer = data result.offset = offset result.options = options - result.maxSize = maxSize proc initProtoBuffer*( - data: openArray[byte], - offset = 0, - options: set[ProtoFlags] = {}, - maxSize = MaxMessageSize, + data: openArray[byte], offset = 0, options: set[ProtoFlags] = {} ): ProtoBuffer = ## Initialize ProtoBuffer with copy of ``data``. result.buffer = @data result.offset = offset result.options = options - result.maxSize = maxSize -proc initProtoBuffer*( - options: set[ProtoFlags] = {}, maxSize = MaxMessageSize -): ProtoBuffer = +proc initProtoBuffer*(options: set[ProtoFlags] = {}): ProtoBuffer = ## Initialize ProtoBuffer with new sequence of capacity ``cap``. result.buffer = newSeq[byte]() result.options = options - result.maxSize = maxSize if WithVarintLength in options: # Our buffer will start from position 10, so we can store length of buffer # in [0, 9]. @@ -194,12 +185,12 @@ proc write*[T: ProtoScalar](pb: var ProtoBuffer, field: int, value: T) = doAssert(vres.isOk()) pb.offset += length elif T is float32: - doAssert(pb.isEnough(sizeof(T))) + doAssert(pb.isEnough(uint64(sizeof(T)))) let u32 = cast[uint32](value) pb.buffer[pb.offset ..< pb.offset + sizeof(T)] = u32.toBytesLE() pb.offset += sizeof(T) elif T is float64: - doAssert(pb.isEnough(sizeof(T))) + doAssert(pb.isEnough(uint64(sizeof(T)))) let u64 = cast[uint64](value) pb.buffer[pb.offset ..< pb.offset + sizeof(T)] = u64.toBytesLE() pb.offset += sizeof(T) @@ -242,12 +233,12 @@ proc writePacked*[T: ProtoScalar]( doAssert(vres.isOk()) pb.offset += length elif T is float32: - doAssert(pb.isEnough(sizeof(T))) + doAssert(pb.isEnough(uint64(sizeof(T)))) let u32 = cast[uint32](item) pb.buffer[pb.offset ..< pb.offset + sizeof(T)] = u32.toBytesLE() pb.offset += sizeof(T) elif T is float64: - doAssert(pb.isEnough(sizeof(T))) + doAssert(pb.isEnough(uint64(sizeof(T)))) let u64 = cast[uint64](item) pb.buffer[pb.offset ..< pb.offset + sizeof(T)] = u64.toBytesLE() pb.offset += sizeof(T) @@ -268,7 +259,7 @@ proc write*[T: byte | char](pb: var ProtoBuffer, field: int, value: openArray[T] doAssert(lres.isOk()) pb.offset += length if len(value) > 0: - doAssert(pb.isEnough(len(value))) + doAssert(pb.isEnough(value.lenu64)) copyMem(addr pb.buffer[pb.offset], unsafeAddr value[0], len(value)) pb.offset += len(value) @@ -327,13 +318,13 @@ proc skipValue(data: var ProtoBuffer, header: ProtoHeader): ProtoResult[void] = else: err(ProtoError.VarintDecode) of ProtoFieldKind.Fixed32: - if data.isEnough(sizeof(uint32)): + if data.isEnough(uint64(sizeof(uint32))): data.offset += sizeof(uint32) ok() else: err(ProtoError.VarintDecode) of ProtoFieldKind.Fixed64: - if data.isEnough(sizeof(uint64)): + if data.isEnough(uint64(sizeof(uint64))): data.offset += sizeof(uint64) ok() else: @@ -343,14 +334,11 @@ proc skipValue(data: var ProtoBuffer, header: ProtoHeader): ProtoResult[void] = var bsize = 0'u64 if PB.getUVarint(data.toOpenArray(), length, bsize).isOk(): data.offset += length - if bsize <= uint64(data.maxSize): - if data.isEnough(int(bsize)): - data.offset += int(bsize) - ok() - else: - err(ProtoError.MessageIncomplete) + if data.isEnough(bsize): + data.offset += int(bsize) + ok() else: - err(ProtoError.MessageTooBig) + err(ProtoError.MessageIncomplete) else: err(ProtoError.VarintDecode) of ProtoFieldKind.StartGroup, ProtoFieldKind.EndGroup: @@ -382,7 +370,7 @@ proc getValue[T: ProtoScalar]( err(ProtoError.VarintDecode) elif T is float32: doAssert(header.wire == ProtoFieldKind.Fixed32) - if data.isEnough(sizeof(float32)): + if data.isEnough(uint64(sizeof(float32))): outval = cast[float32](fromBytesLE(uint32, data.toOpenArray())) data.offset += sizeof(float32) ok() @@ -390,7 +378,7 @@ proc getValue[T: ProtoScalar]( err(ProtoError.MessageIncomplete) elif T is float64: doAssert(header.wire == ProtoFieldKind.Fixed64) - if data.isEnough(sizeof(float64)): + if data.isEnough(uint64(sizeof(float64))): outval = cast[float64](fromBytesLE(uint64, data.toOpenArray())) data.offset += sizeof(float64) ok() @@ -410,22 +398,19 @@ proc getValue[T: byte | char]( outLength = 0 if PB.getUVarint(data.toOpenArray(), length, bsize).isOk(): data.offset += length - if bsize <= uint64(data.maxSize): - if data.isEnough(int(bsize)): - outLength = int(bsize) - if len(outBytes) >= int(bsize): - if bsize > 0'u64: - copyMem(addr outBytes[0], addr data.buffer[data.offset], int(bsize)) - data.offset += int(bsize) - ok() - else: - # Buffer overflow should not be critical failure - data.offset += int(bsize) - err(ProtoError.BufferOverflow) + if data.isEnough(bsize): + outLength = int(bsize) + if len(outBytes) >= int(bsize): + if bsize > 0'u64: + copyMem(addr outBytes[0], addr data.buffer[data.offset], int(bsize)) + data.offset += int(bsize) + ok() else: - err(ProtoError.MessageIncomplete) + # Buffer overflow should not be critical failure + data.offset += int(bsize) + err(ProtoError.BufferOverflow) else: - err(ProtoError.MessageTooBig) + err(ProtoError.MessageIncomplete) else: err(ProtoError.VarintDecode) @@ -439,17 +424,14 @@ proc getValue[T: seq[byte] | string]( if PB.getUVarint(data.toOpenArray(), length, bsize).isOk(): data.offset += length - if bsize <= uint64(data.maxSize): - if data.isEnough(int(bsize)): - outBytes.setLen(bsize) - if bsize > 0'u64: - copyMem(addr outBytes[0], addr data.buffer[data.offset], int(bsize)) - data.offset += int(bsize) - ok() - else: - err(ProtoError.MessageIncomplete) + if data.isEnough(bsize): + outBytes.setLen(bsize) + if bsize > 0'u64: + copyMem(addr outBytes[0], addr data.buffer[data.offset], int(bsize)) + data.offset += int(bsize) + ok() else: - err(ProtoError.MessageTooBig) + err(ProtoError.MessageIncomplete) else: err(ProtoError.VarintDecode) diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index b852538b9b..2ae5669035 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -101,10 +101,12 @@ method unsubscribePeer*(f: FloodSub, peer: PeerId) = procCall PubSub(f).unsubscribePeer(peer) -method rpcHandler*(f: FloodSub, peer: PubSubPeer, data: seq[byte]) {.async.} = +method rpcHandler*( + f: FloodSub, peer: PubSubPeer, data: seq[byte] +) {.async: (raises: [CancelledError, PeerMessageDecodeError, PeerRateLimitError]).} = var rpcMsg = decodeRpcMsg(data).valueOr: debug "failed to decode msg from peer", peer, err = error - raise newException(CatchableError, "Peer msg couldn't be decoded") + raise newException(PeerMessageDecodeError, "Peer msg couldn't be decoded") trace "decoded msg from peer", peer, payload = rpcMsg.shortLog # trigger hooks @@ -192,7 +194,9 @@ method init*(f: FloodSub) = f.handler = handler f.codec = FloodSubCodec -method publish*(f: FloodSub, topic: string, data: seq[byte]): Future[int] {.async.} = +method publish*( + f: FloodSub, topic: string, data: seq[byte] +): Future[int] {.async: (raises: [LPError]).} = # base returns always 0 discard await procCall PubSub(f).publish(topic, data) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index bed5636cb3..f730cebac6 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -385,7 +385,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = proc validateAndRelay( g: GossipSub, msg: Message, msgId: MessageId, saltedId: SaltedId, peer: PubSubPeer -) {.async.} = +) {.async: (raises: []).} = try: template topic(): string = msg.topic @@ -511,7 +511,9 @@ proc messageOverhead(g: GossipSub, msg: RPCMsg, msgSize: int): int = msgSize - payloadSize - controlSize -proc rateLimit*(g: GossipSub, peer: PubSubPeer, overhead: int) {.async.} = +proc rateLimit*( + g: GossipSub, peer: PubSubPeer, overhead: int +) {.async: (raises: [PeerRateLimitError]).} = peer.overheadRateLimitOpt.withValue(overheadRateLimit): if not overheadRateLimit.tryConsume(overhead): libp2p_gossipsub_peers_rate_limit_hits.inc(labelValues = [peer.getAgent()]) @@ -524,7 +526,9 @@ proc rateLimit*(g: GossipSub, peer: PubSubPeer, overhead: int) {.async.} = PeerRateLimitError, "Peer disconnected because it's above rate limit." ) -method rpcHandler*(g: GossipSub, peer: PubSubPeer, data: seq[byte]) {.async.} = +method rpcHandler*( + g: GossipSub, peer: PubSubPeer, data: seq[byte] +) {.async: (raises: [CancelledError, PeerMessageDecodeError, PeerRateLimitError]).} = let msgSize = data.len var rpcMsg = decodeRpcMsg(data).valueOr: debug "failed to decode msg from peer", peer, err = error @@ -534,7 +538,7 @@ method rpcHandler*(g: GossipSub, peer: PubSubPeer, data: seq[byte]) {.async.} = # TODO evaluate behaviour penalty values peer.behaviourPenalty += 0.1 - raise newException(CatchableError, "Peer msg couldn't be decoded") + raise newException(PeerMessageDecodeError, "Peer msg couldn't be decoded") when defined(libp2p_expensive_metrics): for m in rpcMsg.messages: @@ -682,7 +686,9 @@ method onTopicSubscription*(g: GossipSub, topic: string, subscribed: bool) = # Send unsubscribe (in reverse order to sub/graft) procCall PubSub(g).onTopicSubscription(topic, subscribed) -method publish*(g: GossipSub, topic: string, data: seq[byte]): Future[int] {.async.} = +method publish*( + g: GossipSub, topic: string, data: seq[byte] +): Future[int] {.async: (raises: [LPError]).} = logScope: topic @@ -792,7 +798,9 @@ method publish*(g: GossipSub, topic: string, data: seq[byte]): Future[int] {.asy trace "Published message to peers", peers = peers.len return peers.len -proc maintainDirectPeer(g: GossipSub, id: PeerId, addrs: seq[MultiAddress]) {.async.} = +proc maintainDirectPeer( + g: GossipSub, id: PeerId, addrs: seq[MultiAddress] +) {.async: (raises: [CancelledError]).} = if id notin g.peers: trace "Attempting to dial a direct peer", peer = id if g.switch.isConnected(id): @@ -808,11 +816,13 @@ proc maintainDirectPeer(g: GossipSub, id: PeerId, addrs: seq[MultiAddress]) {.as except CatchableError as exc: debug "Direct peer error dialing", description = exc.msg -proc addDirectPeer*(g: GossipSub, id: PeerId, addrs: seq[MultiAddress]) {.async.} = +proc addDirectPeer*( + g: GossipSub, id: PeerId, addrs: seq[MultiAddress] +) {.async: (raises: [CancelledError]).} = g.parameters.directPeers[id] = addrs await g.maintainDirectPeer(id, addrs) -proc maintainDirectPeers(g: GossipSub) {.async.} = +proc maintainDirectPeers(g: GossipSub) {.async: (raises: [CancelledError]).} = heartbeat "GossipSub DirectPeers", 1.minutes: for id, addrs in g.parameters.directPeers: await g.addDirectPeer(id, addrs) @@ -846,9 +856,9 @@ method stop*(g: GossipSub): Future[void] {.async: (raises: [], raw: true).} = return fut # stop heartbeat interval - g.directPeersLoop.cancel() - g.scoringHeartbeatFut.cancel() - g.heartbeatFut.cancel() + g.directPeersLoop.cancelSoon() + g.scoringHeartbeatFut.cancelSoon() + g.heartbeatFut.cancelSoon() g.heartbeatFut = nil fut diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index bbcfa0c075..b70762b7fa 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -770,7 +770,7 @@ proc onHeartbeat(g: GossipSub) = g.mcache.shift() # shift the cache -proc heartbeat*(g: GossipSub) {.async.} = +proc heartbeat*(g: GossipSub) {.async: (raises: [CancelledError]).} = heartbeat "GossipSub", g.parameters.heartbeatInterval: trace "running heartbeat", instance = cast[int](g) g.onHeartbeat() diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index e489d3a01a..c5a5f29891 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -131,7 +131,7 @@ proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 = else: 0.0 -proc disconnectPeer*(g: GossipSub, peer: PubSubPeer) {.async.} = +proc disconnectPeer*(g: GossipSub, peer: PubSubPeer) {.async: (raises: []).} = try: await g.switch.disconnect(peer.peerId) except CatchableError as exc: # Never cancelled @@ -313,12 +313,14 @@ proc updateScores*(g: GossipSub) = # avoid async trace "updated scores", peers = g.peers.len -proc scoringHeartbeat*(g: GossipSub) {.async.} = +proc scoringHeartbeat*(g: GossipSub) {.async: (raises: [CancelledError]).} = heartbeat "Gossipsub scoring", g.parameters.decayInterval: trace "running scoring heartbeat", instance = cast[int](g) g.updateScores() -proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, msg: Message) {.async.} = +proc punishInvalidMessage*( + g: GossipSub, peer: PubSubPeer, msg: Message +) {.async: (raises: [PeerRateLimitError]).} = let uselessAppBytesNum = msg.data.len peer.overheadRateLimitOpt.withValue(overheadRateLimit): if not overheadRateLimit.tryConsume(uselessAppBytesNum): diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index a0ad400855..d3cc962c7c 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -125,12 +125,14 @@ declarePublicCounter( type InitializationError* = object of LPError + PeerMessageDecodeError* = object of CatchableError + TopicHandler* {.public.} = - proc(topic: string, data: seq[byte]): Future[void] {.gcsafe, raises: [].} + proc(topic: string, data: seq[byte]): Future[void] {.gcsafe, async: (raises: []).} ValidatorHandler* {.public.} = proc( topic: string, message: Message - ): Future[ValidationResult] {.gcsafe, raises: [].} + ): Future[ValidationResult] {.gcsafe, async: (raises: []).} TopicPair* = tuple[topic: string, handler: TopicHandler] @@ -327,7 +329,9 @@ proc updateMetrics*(p: PubSub, rpcMsg: RPCMsg) = method rpcHandler*( p: PubSub, peer: PubSubPeer, data: seq[byte] -): Future[void] {.base, async.} = +): Future[void] {. + base, async: (raises: [CancelledError, PeerMessageDecodeError, PeerRateLimitError]) +.} = ## Handler that must be overridden by concrete implementation raiseAssert "Unimplemented" @@ -355,8 +359,11 @@ method getOrCreatePeer*( peer[].codec = protoNegotiated return peer[] - proc getConn(): Future[Connection] {.async.} = - return await p.switch.dial(peerId, protosToDial) + proc getConn(): Future[Connection] {.async: (raises: [GetConnDialError]).} = + try: + return await p.switch.dial(peerId, protosToDial) + except CatchableError as e: + raise (ref GetConnDialError)(parent: e) proc onEvent(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe.} = p.onPubSubPeerEvent(peer, event) @@ -376,7 +383,9 @@ method getOrCreatePeer*( return pubSubPeer -proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] = +proc handleData*( + p: PubSub, topic: string, data: seq[byte] +): Future[void] {.async: (raises: [], raw: true).} = # Start work on all data handlers without copying data into closure like # happens on {.async.} transformation p.topics.withValue(topic, handlers): @@ -389,7 +398,7 @@ proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] = futs.add(fut) if futs.len() > 0: - proc waiter(): Future[void] {.async.} = + proc waiter(): Future[void] {.async: (raises: []).} = # slow path - we have to wait for the handlers to complete try: futs = await allFinished(futs) @@ -397,12 +406,12 @@ proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] = # propagate cancellation for fut in futs: if not (fut.finished): - fut.cancel() + fut.cancelSoon() # check for errors in futures for fut in futs: if fut.failed: - let err = fut.readError() + let err = fut.error() warn "Error in topic handler", description = err.msg return waiter() @@ -412,7 +421,9 @@ proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] = res.complete() return res -method handleConn*(p: PubSub, conn: Connection, proto: string) {.base, async.} = +method handleConn*( + p: PubSub, conn: Connection, proto: string +) {.base, async: (raises: [CancelledError]).} = ## handle incoming connections ## ## this proc will: @@ -424,7 +435,9 @@ method handleConn*(p: PubSub, conn: Connection, proto: string) {.base, async.} = ## that we're interested in ## - proc handler(peer: PubSubPeer, data: seq[byte]): Future[void] = + proc handler( + peer: PubSubPeer, data: seq[byte] + ): Future[void] {.async: (raises: []).} = # call pubsub rpc handler p.rpcHandler(peer, data) @@ -436,7 +449,7 @@ method handleConn*(p: PubSub, conn: Connection, proto: string) {.base, async.} = trace "pubsub peer handler ended", conn except CancelledError as exc: raise exc - except CatchableError as exc: + except PeerMessageDecodeError as exc: trace "exception ocurred in pubsub handle", description = exc.msg, conn finally: await conn.closeWithEOF() @@ -542,7 +555,7 @@ proc subscribe*(p: PubSub, topic: string, handler: TopicHandler) {.public.} = method publish*( p: PubSub, topic: string, data: seq[byte] -): Future[int] {.base, async, public.} = +): Future[int] {.base, async: (raises: [LPError]), public.} = ## publish to a ``topic`` ## ## The return value is the number of neighbours that we attempted to send the @@ -581,7 +594,7 @@ method removeValidator*( method validate*( p: PubSub, message: Message -): Future[ValidationResult] {.async, base.} = +): Future[ValidationResult] {.async: (raises: [CancelledError]), base.} = var pending: seq[Future[ValidationResult]] trace "about to validate message" let topic = message.topic @@ -589,22 +602,30 @@ method validate*( topic = topic, registered = toSeq(p.validators.keys) if topic in p.validators: trace "running validators for topic", topic = topic - for validator in p.validators[topic]: - pending.add(validator(topic, message)) + try: + for validator in p.validators[topic]: + pending.add(validator(topic, message)) + except KeyError: + raiseAssert "topic checked before" - result = ValidationResult.Accept + var valResult = ValidationResult.Accept let futs = await allFinished(pending) for fut in futs: if fut.failed: - result = ValidationResult.Reject + valResult = ValidationResult.Reject break - let res = fut.read() - if res != ValidationResult.Accept: - result = res - if res == ValidationResult.Reject: - break - - case result + try: + let res = fut.read() + if res != ValidationResult.Accept: + valResult = res + if res == ValidationResult.Reject: + break + except CatchableError as e: + trace "validator for message could not be executed, ignoring", + topic = topic, err = e.msg + valResult = ValidationResult.Ignore + + case valResult of ValidationResult.Accept: libp2p_pubsub_validation_success.inc() of ValidationResult.Reject: @@ -612,6 +633,8 @@ method validate*( of ValidationResult.Ignore: libp2p_pubsub_validation_ignore.inc() + valResult + proc init*[PubParams: object | bool]( P: typedesc[PubSub], switch: Switch, diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index ca17a41b23..1c03865cda 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -64,6 +64,8 @@ const DefaultMaxNumElementsInNonPriorityQueue* = 1024 type PeerRateLimitError* = object of CatchableError + GetConnDialError* = object of CatchableError + PubSubObserver* = ref object onRecv*: proc(peer: PubSubPeer, msgs: var RPCMsg) {.gcsafe, raises: [].} onSend*: proc(peer: PubSubPeer, msgs: var RPCMsg) {.gcsafe, raises: [].} @@ -79,7 +81,9 @@ type PubSubPeerEvent* = object kind*: PubSubPeerEventKind - GetConn* = proc(): Future[Connection] {.gcsafe, raises: [].} + GetConn* = proc(): Future[Connection] {. + gcsafe, async: (raises: [GetConnDialError]), raises: [] + .} DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].} @@ -121,8 +125,9 @@ type # The max number of elements allowed in the non-priority queue. disconnected: bool - RPCHandler* = - proc(peer: PubSubPeer, data: seq[byte]): Future[void] {.gcsafe, raises: [].} + RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void] {. + gcsafe, async: (raises: []) + .} when defined(libp2p_agents_metrics): func shortAgent*(p: PubSubPeer): string = @@ -190,7 +195,7 @@ proc sendObservers(p: PubSubPeer, msg: var RPCMsg) = if not (isNil(obs.onSend)): obs.onSend(p, msg) -proc handle*(p: PubSubPeer, conn: Connection) {.async.} = +proc handle*(p: PubSubPeer, conn: Connection) {.async: (raises: []).} = debug "starting pubsub read loop", conn, peer = p, closed = conn.closed try: try: @@ -221,7 +226,9 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = finally: debug "exiting pubsub read loop", conn, peer = p, closed = conn.closed -proc closeSendConn(p: PubSubPeer, event: PubSubPeerEventKind) {.async.} = +proc closeSendConn( + p: PubSubPeer, event: PubSubPeerEventKind +) {.async: (raises: [CancelledError]).} = if p.sendConn != nil: trace "Removing send connection", p, conn = p.sendConn await p.sendConn.close() @@ -239,13 +246,18 @@ proc closeSendConn(p: PubSubPeer, event: PubSubPeerEventKind) {.async.} = debug "Errors during diconnection events", description = exc.msg # don't cleanup p.address else we leak some gossip stat table -proc connectOnce(p: PubSubPeer): Future[void] {.async.} = +proc connectOnce( + p: PubSubPeer +): Future[void] {.async: (raises: [CancelledError, GetConnDialError, LPError]).} = try: if p.connectedFut.finished: p.connectedFut = newFuture[void]() - let newConn = await p.getConn().wait(5.seconds) - if newConn.isNil: - raise (ref LPError)(msg: "Cannot establish send connection") + let newConn = + try: + await p.getConn().wait(5.seconds) + except AsyncTimeoutError as error: + trace "getConn timed out", description = error.msg + raise (ref LPError)(msg: "Cannot establish send connection") # When the send channel goes up, subscriptions need to be sent to the # remote peer - if we had multiple channels up and one goes down, all @@ -271,7 +283,7 @@ proc connectOnce(p: PubSubPeer): Future[void] {.async.} = finally: await p.closeSendConn(PubSubPeerEventKind.StreamClosed) -proc connectImpl(p: PubSubPeer) {.async.} = +proc connectImpl(p: PubSubPeer) {.async: (raises: []).} = try: # Keep trying to establish a connection while it's possible to do so - the # send connection might get disconnected due to a timeout or an unrelated @@ -317,7 +329,7 @@ proc clearSendPriorityQueue(p: PubSubPeer) = value = p.rpcmessagequeue.sendPriorityQueue.len.int64, labelValues = [$p.peerId] ) -proc sendMsgContinue(conn: Connection, msgFut: Future[void]) {.async.} = +proc sendMsgContinue(conn: Connection, msgFut: Future[void]) {.async: (raises: []).} = # Continuation for a pending `sendMsg` future from below try: await msgFut @@ -331,7 +343,7 @@ proc sendMsgContinue(conn: Connection, msgFut: Future[void]) {.async.} = await conn.close() # This will clean up the send connection -proc sendMsgSlow(p: PubSubPeer, msg: seq[byte]) {.async.} = +proc sendMsgSlow(p: PubSubPeer, msg: seq[byte]) {.async: (raises: [CancelledError]).} = # Slow path of `sendMsg` where msg is held in memory while send connection is # being set up if p.sendConn == nil: @@ -347,7 +359,7 @@ proc sendMsgSlow(p: PubSubPeer, msg: seq[byte]) {.async.} = trace "sending encoded msg to peer", conn, encoded = shortLog(msg) await sendMsgContinue(conn, conn.writeLp(msg)) -proc sendMsg(p: PubSubPeer, msg: seq[byte]): Future[void] = +proc sendMsg(p: PubSubPeer, msg: seq[byte]): Future[void] {.async: (raises: []).} = if p.sendConn != nil and not p.sendConn.closed(): # Fast path that avoids copying msg (which happens for {.async.}) let conn = p.sendConn @@ -493,7 +505,7 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool = return true return false -proc sendNonPriorityTask(p: PubSubPeer) {.async.} = +proc sendNonPriorityTask(p: PubSubPeer) {.async: (raises: [CancelledError]).} = while true: # we send non-priority messages only if there are no pending priority messages let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst() diff --git a/libp2p/protocols/pubsub/rpc/protobuf.nim b/libp2p/protocols/pubsub/rpc/protobuf.nim index ff9e672b7b..bfafa6218c 100644 --- a/libp2p/protocols/pubsub/rpc/protobuf.nim +++ b/libp2p/protocols/pubsub/rpc/protobuf.nim @@ -303,15 +303,14 @@ proc decodeMessages*(pb: ProtoBuffer): ProtoResult[seq[Message]] {.inline.} = if ?pb.getRepeatedField(2, msgpbs): trace "decodeMessages: read messages", count = len(msgpbs) for item in msgpbs: - # size is constrained at the network level - msgs.add(?decodeMessage(initProtoBuffer(item, maxSize = uint.high))) + msgs.add(?decodeMessage(initProtoBuffer(item))) else: trace "decodeMessages: no messages found" ok(msgs) proc encodeRpcMsg*(msg: RPCMsg, anonymize: bool): seq[byte] = trace "encodeRpcMsg: encoding message", payload = msg.shortLog() - var pb = initProtoBuffer(maxSize = uint.high) + var pb = initProtoBuffer() for item in msg.subscriptions: pb.write(1, item) for item in msg.messages: @@ -330,7 +329,7 @@ proc encodeRpcMsg*(msg: RPCMsg, anonymize: bool): seq[byte] = proc decodeRpcMsg*(msg: seq[byte]): ProtoResult[RPCMsg] {.inline.} = trace "decodeRpcMsg: decoding message", payload = msg.shortLog() - var pb = initProtoBuffer(msg, maxSize = uint.high) + var pb = initProtoBuffer(msg) var rpcMsg = RPCMsg() assign(rpcMsg.messages, ?pb.decodeMessages()) assign(rpcMsg.subscriptions, ?pb.decodeSubscriptions()) diff --git a/tests/commoninterop.nim b/tests/commoninterop.nim index c9232db39b..eb3a1d9c9d 100644 --- a/tests/commoninterop.nim +++ b/tests/commoninterop.nim @@ -69,7 +69,7 @@ proc testPubSubDaemonPublish( var finished = false var times = 0 - proc nativeHandler(topic: string, data: seq[byte]) {.async.} = + proc nativeHandler(topic: string, data: seq[byte]) {.async: (raises: []).} = let smsg = string.fromBytes(data) check smsg == pubsubData times.inc() @@ -83,7 +83,7 @@ proc testPubSubDaemonPublish( proc pubsubHandler( api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage - ): Future[bool] {.async.} = + ): Future[bool] {.async: (raises: [CatchableError]).} = result = true # don't cancel subscription asyncDiscard daemonNode.pubsubSubscribe(testTopic, pubsubHandler) @@ -137,7 +137,7 @@ proc testPubSubNodePublish( var finished = false proc pubsubHandler( api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage - ): Future[bool] {.async.} = + ): Future[bool] {.async: (raises: [CatchableError]).} = let smsg = string.fromBytes(message.data) check smsg == pubsubData times.inc() @@ -146,7 +146,7 @@ proc testPubSubNodePublish( result = true # don't cancel subscription discard await daemonNode.pubsubSubscribe(testTopic, pubsubHandler) - proc nativeHandler(topic: string, data: seq[byte]) {.async.} = + proc nativeHandler(topic: string, data: seq[byte]) {.async: (raises: []).} = discard pubsub.subscribe(testTopic, nativeHandler) diff --git a/tests/pubsub/testfloodsub.nim b/tests/pubsub/testfloodsub.nim index 899c558f0b..215a9b4612 100644 --- a/tests/pubsub/testfloodsub.nim +++ b/tests/pubsub/testfloodsub.nim @@ -45,7 +45,7 @@ suite "FloodSub": asyncTest "FloodSub basic publish/subscribe A -> B": var completionFut = newFuture[bool]() - proc handler(topic: string, data: seq[byte]) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} = check topic == "foobar" completionFut.complete(true) @@ -77,7 +77,7 @@ suite "FloodSub": asyncTest "FloodSub basic publish/subscribe B -> A": var completionFut = newFuture[bool]() - proc handler(topic: string, data: seq[byte]) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} = check topic == "foobar" completionFut.complete(true) @@ -102,7 +102,7 @@ suite "FloodSub": asyncTest "FloodSub validation should succeed": var handlerFut = newFuture[bool]() - proc handler(topic: string, data: seq[byte]) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} = check topic == "foobar" handlerFut.complete(true) @@ -120,7 +120,7 @@ suite "FloodSub": var validatorFut = newFuture[bool]() proc validator( topic: string, message: Message - ): Future[ValidationResult] {.async.} = + ): Future[ValidationResult] {.async: (raises: []).} = check topic == "foobar" validatorFut.complete(true) result = ValidationResult.Accept @@ -135,7 +135,7 @@ suite "FloodSub": await allFuturesThrowing(nodesFut) asyncTest "FloodSub validation should fail": - proc handler(topic: string, data: seq[byte]) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} = check false # if we get here, it should fail let @@ -151,7 +151,7 @@ suite "FloodSub": var validatorFut = newFuture[bool]() proc validator( topic: string, message: Message - ): Future[ValidationResult] {.async.} = + ): Future[ValidationResult] {.async: (raises: []).} = validatorFut.complete(true) result = ValidationResult.Reject @@ -165,7 +165,7 @@ suite "FloodSub": asyncTest "FloodSub validation one fails and one succeeds": var handlerFut = newFuture[bool]() - proc handler(topic: string, data: seq[byte]) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} = check topic == "foo" handlerFut.complete(true) @@ -183,7 +183,7 @@ suite "FloodSub": proc validator( topic: string, message: Message - ): Future[ValidationResult] {.async.} = + ): Future[ValidationResult] {.async: (raises: []).} = if topic == "foo": result = ValidationResult.Accept else: @@ -210,7 +210,7 @@ suite "FloodSub": futs[i] = ( fut, ( - proc(topic: string, data: seq[byte]) {.async.} = + proc(topic: string, data: seq[byte]) {.async: (raises: []).} = check topic == "foobar" inc counter[] if counter[] == runs - 1: @@ -257,7 +257,7 @@ suite "FloodSub": futs[i] = ( fut, ( - proc(topic: string, data: seq[byte]) {.async.} = + proc(topic: string, data: seq[byte]) {.async: (raises: []).} = check topic == "foobar" inc counter[] if counter[] == runs - 1: @@ -305,7 +305,7 @@ suite "FloodSub": asyncTest "FloodSub message size validation": var messageReceived = 0 - proc handler(topic: string, data: seq[byte]) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} = check data.len < 50 inc(messageReceived) @@ -343,7 +343,7 @@ suite "FloodSub": asyncTest "FloodSub message size validation 2": var messageReceived = 0 - proc handler(topic: string, data: seq[byte]) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} = inc(messageReceived) let diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index 2352fc6f78..b47a44be53 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -36,7 +36,9 @@ suite "GossipSub internal": asyncTest "subscribe/unsubscribeAll": let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(topic: string, data: seq[byte]): Future[void] {.gcsafe.} = + proc handler( + topic: string, data: seq[byte] + ): Future[void] {.gcsafe, async: (raises: []).} = discard let topic = "foobar" @@ -162,7 +164,7 @@ suite "GossipSub internal": asyncTest "`replenishFanout` Degree Lo": let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} = discard let topic = "foobar" @@ -189,7 +191,7 @@ suite "GossipSub internal": asyncTest "`dropFanoutPeers` drop expired fanout topics": let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} = discard let topic = "foobar" @@ -219,7 +221,7 @@ suite "GossipSub internal": asyncTest "`dropFanoutPeers` leave unexpired fanout topics": let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} = discard let topic1 = "foobar1" @@ -256,7 +258,7 @@ suite "GossipSub internal": asyncTest "`getGossipPeers` - should gather up to degree D non intersecting peers": let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} = discard let topic = "foobar" @@ -317,7 +319,7 @@ suite "GossipSub internal": asyncTest "`getGossipPeers` - should not crash on missing topics in mesh": let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} = discard let topic = "foobar" @@ -357,7 +359,7 @@ suite "GossipSub internal": asyncTest "`getGossipPeers` - should not crash on missing topics in fanout": let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} = discard let topic = "foobar" @@ -398,7 +400,7 @@ suite "GossipSub internal": asyncTest "`getGossipPeers` - should not crash on missing topics in gossip": let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} = discard let topic = "foobar" @@ -439,7 +441,7 @@ suite "GossipSub internal": asyncTest "Drop messages of topics without subscription": let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} = check false let topic = "foobar" @@ -473,7 +475,7 @@ suite "GossipSub internal": let gossipSub = TestGossipSub.init(newStandardSwitch()) gossipSub.parameters.disconnectBadPeers = true gossipSub.parameters.appSpecificWeight = 1.0 - proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} = check false let topic = "foobar" @@ -665,10 +667,10 @@ suite "GossipSub internal": asyncTest "handleIHave/Iwant tests": let gossipSub = TestGossipSub.init(newStandardSwitch()) - proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = + proc handler(peer: PubSubPeer, data: seq[byte]) {.async: (raises: []).} = check false - proc handler2(topic: string, data: seq[byte]) {.async.} = + proc handler2(topic: string, data: seq[byte]) {.async: (raises: []).} = discard let topic = "foobar" @@ -769,10 +771,10 @@ suite "GossipSub internal": var receivedMessages = new(HashSet[seq[byte]]) - proc handlerA(topic: string, data: seq[byte]) {.async.} = + proc handlerA(topic: string, data: seq[byte]) {.async: (raises: []).} = receivedMessages[].incl(data) - proc handlerB(topic: string, data: seq[byte]) {.async.} = + proc handlerB(topic: string, data: seq[byte]) {.async: (raises: []).} = discard nodes[0].subscribe("foobar", handlerA) diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 7277c36258..64823bbea3 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -54,7 +54,7 @@ suite "GossipSub": asyncTest "GossipSub validation should succeed": var handlerFut = newFuture[bool]() - proc handler(topic: string, data: seq[byte]) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} = check topic == "foobar" handlerFut.complete(true) @@ -78,7 +78,7 @@ suite "GossipSub": var validatorFut = newFuture[bool]() proc validator( topic: string, message: Message - ): Future[ValidationResult] {.async.} = + ): Future[ValidationResult] {.async: (raises: []).} = check topic == "foobar" validatorFut.complete(true) result = ValidationResult.Accept @@ -93,7 +93,7 @@ suite "GossipSub": await allFuturesThrowing(nodesFut.concat()) asyncTest "GossipSub validation should fail (reject)": - proc handler(topic: string, data: seq[byte]) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} = check false # if we get here, it should fail let @@ -119,7 +119,7 @@ suite "GossipSub": var validatorFut = newFuture[bool]() proc validator( topic: string, message: Message - ): Future[ValidationResult] {.async.} = + ): Future[ValidationResult] {.async: (raises: []).} = result = ValidationResult.Reject validatorFut.complete(true) @@ -133,7 +133,7 @@ suite "GossipSub": await allFuturesThrowing(nodesFut.concat()) asyncTest "GossipSub validation should fail (ignore)": - proc handler(topic: string, data: seq[byte]) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} = check false # if we get here, it should fail let @@ -159,7 +159,7 @@ suite "GossipSub": var validatorFut = newFuture[bool]() proc validator( topic: string, message: Message - ): Future[ValidationResult] {.async.} = + ): Future[ValidationResult] {.async: (raises: []).} = result = ValidationResult.Ignore validatorFut.complete(true) @@ -174,7 +174,7 @@ suite "GossipSub": asyncTest "GossipSub validation one fails and one succeeds": var handlerFut = newFuture[bool]() - proc handler(topic: string, data: seq[byte]) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} = check topic == "foo" handlerFut.complete(true) @@ -192,7 +192,7 @@ suite "GossipSub": var passed, failed: Future[bool] = newFuture[bool]() proc validator( topic: string, message: Message - ): Future[ValidationResult] {.async.} = + ): Future[ValidationResult] {.async: (raises: []).} = result = if topic == "foo": passed.complete(true) @@ -226,7 +226,7 @@ suite "GossipSub": sendCounter = 0 validatedCounter = 0 - proc handler(topic: string, data: seq[byte]) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} = discard proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) = @@ -254,7 +254,7 @@ suite "GossipSub": proc validator( topic: string, message: Message - ): Future[ValidationResult] {.async.} = + ): Future[ValidationResult] {.async: (raises: []).} = result = if topic == "foo": ValidationResult.Accept else: ValidationResult.Reject nodes[1].addValidator("foo", "bar", validator) @@ -279,7 +279,7 @@ suite "GossipSub": asyncTest "GossipSub unsub - resub faster than backoff": var handlerFut = newFuture[bool]() - proc handler(topic: string, data: seq[byte]) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} = check topic == "foobar" handlerFut.complete(true) @@ -311,7 +311,7 @@ suite "GossipSub": var validatorFut = newFuture[bool]() proc validator( topic: string, message: Message - ): Future[ValidationResult] {.async.} = + ): Future[ValidationResult] {.async: (raises: []).} = check topic == "foobar" validatorFut.complete(true) result = ValidationResult.Accept @@ -326,7 +326,7 @@ suite "GossipSub": await allFuturesThrowing(nodesFut.concat()) asyncTest "e2e - GossipSub should add remote peer topic subscriptions": - proc handler(topic: string, data: seq[byte]) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} = discard let @@ -352,7 +352,7 @@ suite "GossipSub": await allFuturesThrowing(nodesFut.concat()) asyncTest "e2e - GossipSub should add remote peer topic subscriptions if both peers are subscribed": - proc handler(topic: string, data: seq[byte]) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} = discard let @@ -395,7 +395,7 @@ suite "GossipSub": asyncTest "e2e - GossipSub send over fanout A -> B": var passed = newFuture[void]() - proc handler(topic: string, data: seq[byte]) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} = check topic == "foobar" passed.complete() @@ -443,7 +443,7 @@ suite "GossipSub": asyncTest "e2e - GossipSub send over fanout A -> B for subscribed topic": var passed = newFuture[void]() - proc handler(topic: string, data: seq[byte]) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} = check topic == "foobar" passed.complete() @@ -487,7 +487,7 @@ suite "GossipSub": asyncTest "e2e - GossipSub send over mesh A -> B": var passed: Future[bool] = newFuture[bool]() - proc handler(topic: string, data: seq[byte]) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} = check topic == "foobar" passed.complete(true) @@ -542,14 +542,14 @@ suite "GossipSub": var aReceived = 0 cReceived = 0 - proc handlerA(topic: string, data: seq[byte]) {.async.} = + proc handlerA(topic: string, data: seq[byte]) {.async: (raises: []).} = inc aReceived check aReceived < 2 - proc handlerB(topic: string, data: seq[byte]) {.async.} = + proc handlerB(topic: string, data: seq[byte]) {.async: (raises: []).} = discard - proc handlerC(topic: string, data: seq[byte]) {.async.} = + proc handlerC(topic: string, data: seq[byte]) {.async: (raises: []).} = inc cReceived check cReceived < 2 cRelayed.complete() @@ -565,20 +565,23 @@ suite "GossipSub": proc slowValidator( topic: string, message: Message - ): Future[ValidationResult] {.async.} = - await cRelayed - # Empty A & C caches to detect duplicates - gossip1.seen = TimedCache[SaltedId].init() - gossip3.seen = TimedCache[SaltedId].init() - let msgId = toSeq(gossip2.validationSeen.keys)[0] - checkUntilTimeout( - try: - gossip2.validationSeen[msgId].len > 0 - except: - false - ) - result = ValidationResult.Accept - bFinished.complete() + ): Future[ValidationResult] {.async: (raises: []).} = + try: + await cRelayed + # Empty A & C caches to detect duplicates + gossip1.seen = TimedCache[SaltedId].init() + gossip3.seen = TimedCache[SaltedId].init() + let msgId = toSeq(gossip2.validationSeen.keys)[0] + checkUntilTimeout( + try: + gossip2.validationSeen[msgId].len > 0 + except: + false + ) + result = ValidationResult.Accept + bFinished.complete() + except CatchableError: + raiseAssert "err on slowValidator" nodes[1].addValidator("foobar", slowValidator) @@ -598,7 +601,7 @@ suite "GossipSub": asyncTest "e2e - GossipSub send over floodPublish A -> B": var passed: Future[bool] = newFuture[bool]() - proc handler(topic: string, data: seq[byte]) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} = check topic == "foobar" passed.complete(true) @@ -643,7 +646,7 @@ suite "GossipSub": await allFuturesThrowing(nodes.mapIt(it.switch.stop())) proc connectNodes(nodes: seq[PubSub], target: PubSub) {.async.} = - proc handler(topic: string, data: seq[byte]) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} = check topic == "foobar" for node in nodes: @@ -656,7 +659,7 @@ suite "GossipSub": numPeersFirstMsg: int, numPeersSecondMsg: int, ) {.async.} = - proc handler(topic: string, data: seq[byte]) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} = check topic == "foobar" block setup: @@ -721,10 +724,15 @@ suite "GossipSub": var handler: TopicHandler closureScope: var peerName = $dialer.peerInfo.peerId - handler = proc(topic: string, data: seq[byte]) {.async, closure.} = - if peerName notin seen: - seen[peerName] = 0 - seen[peerName].inc + handler = proc( + topic: string, data: seq[byte] + ) {.async: (raises: []), closure.} = + try: + if peerName notin seen: + seen[peerName] = 0 + seen[peerName].inc + except KeyError: + raiseAssert "seen checked before" check topic == "foobar" if not seenFut.finished() and seen.len >= runs: seenFut.complete() @@ -771,10 +779,15 @@ suite "GossipSub": var handler: TopicHandler capture dialer, i: var peerName = $dialer.peerInfo.peerId - handler = proc(topic: string, data: seq[byte]) {.async, closure.} = - if peerName notin seen: - seen[peerName] = 0 - seen[peerName].inc + handler = proc( + topic: string, data: seq[byte] + ) {.async: (raises: []), closure.} = + try: + if peerName notin seen: + seen[peerName] = 0 + seen[peerName].inc + except KeyError: + raiseAssert "seen checked before" check topic == "foobar" if not seenFut.finished() and seen.len >= runs: seenFut.complete() @@ -811,7 +824,7 @@ suite "GossipSub": # PX to A & C # # C sent his SPR, not A - proc handler(topic: string, data: seq[byte]) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} = discard # not used in this test let @@ -880,13 +893,13 @@ suite "GossipSub": ) let bFinished = newFuture[void]() - proc handlerA(topic: string, data: seq[byte]) {.async.} = + proc handlerA(topic: string, data: seq[byte]) {.async: (raises: []).} = discard - proc handlerB(topic: string, data: seq[byte]) {.async.} = + proc handlerB(topic: string, data: seq[byte]) {.async: (raises: []).} = bFinished.complete() - proc handlerC(topic: string, data: seq[byte]) {.async.} = + proc handlerC(topic: string, data: seq[byte]) {.async: (raises: []).} = doAssert false nodes[0].subscribe("foobar", handlerA) @@ -957,10 +970,10 @@ suite "GossipSub": ) let bFinished = newFuture[void]() - proc handler(topic: string, data: seq[byte]) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} = discard - proc handlerB(topic: string, data: seq[byte]) {.async.} = + proc handlerB(topic: string, data: seq[byte]) {.async: (raises: []).} = bFinished.complete() nodeA.subscribe("foobar", handler) @@ -1000,7 +1013,7 @@ suite "GossipSub": await subscribeNodes(nodes) - proc handle(topic: string, data: seq[byte]) {.async.} = + proc handle(topic: string, data: seq[byte]) {.async: (raises: []).} = discard let gossip0 = GossipSub(nodes[0]) @@ -1131,7 +1144,7 @@ suite "GossipSub": let topic = "foobar" proc execValidator( topic: string, message: messages.Message - ): Future[ValidationResult] {.raises: [].} = + ): Future[ValidationResult] {.async: (raises: [], raw: true).} = let res = newFuture[ValidationResult]() res.complete(ValidationResult.Reject) res @@ -1177,7 +1190,7 @@ suite "GossipSub": node1.switch.peerInfo.peerId, node1.switch.peerInfo.addrs ) - proc handler(topic: string, data: seq[byte]) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} = discard node0.subscribe("foobar", handler) diff --git a/tests/pubsub/testgossipsub2.nim b/tests/pubsub/testgossipsub2.nim index ffc5c24e40..9e3d0e7bae 100644 --- a/tests/pubsub/testgossipsub2.nim +++ b/tests/pubsub/testgossipsub2.nim @@ -65,10 +65,15 @@ suite "GossipSub": var handler: TopicHandler closureScope: var peerName = $dialer.peerInfo.peerId - handler = proc(topic: string, data: seq[byte]) {.async, closure.} = - if peerName notin seen: - seen[peerName] = 0 - seen[peerName].inc + handler = proc( + topic: string, data: seq[byte] + ) {.async: (raises: []), closure.} = + try: + if peerName notin seen: + seen[peerName] = 0 + seen[peerName].inc + except KeyError: + raiseAssert "seen checked before" info "seen up", count = seen.len check topic == "foobar" if not seenFut.finished() and seen.len >= runs: @@ -98,7 +103,7 @@ suite "GossipSub": asyncTest "GossipSub invalid topic subscription": var handlerFut = newFuture[bool]() - proc handler(topic: string, data: seq[byte]) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} = check topic == "foobar" handlerFut.complete(true) @@ -154,7 +159,7 @@ suite "GossipSub": # DO NOT SUBSCRIBE, CONNECTION SHOULD HAPPEN ### await subscribeNodes(nodes) - proc handler(topic: string, data: seq[byte]) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} = discard nodes[1].subscribe("foobar", handler) @@ -186,11 +191,11 @@ suite "GossipSub": ) var handlerFut = newFuture[void]() - proc handler(topic: string, data: seq[byte]) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} = check topic == "foobar" handlerFut.complete() - proc noop(topic: string, data: seq[byte]) {.async.} = + proc noop(topic: string, data: seq[byte]) {.async: (raises: []).} = check topic == "foobar" nodes[0].subscribe("foobar", noop) @@ -230,7 +235,7 @@ suite "GossipSub": GossipSub(nodes[1]).parameters.graylistThreshold = 100000 var handlerFut = newFuture[void]() - proc handler(topic: string, data: seq[byte]) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} = check topic == "foobar" handlerFut.complete() @@ -274,10 +279,15 @@ suite "GossipSub": var handler: TopicHandler closureScope: var peerName = $dialer.peerInfo.peerId - handler = proc(topic: string, data: seq[byte]) {.async, closure.} = - if peerName notin seen: - seen[peerName] = 0 - seen[peerName].inc + handler = proc( + topic: string, data: seq[byte] + ) {.async: (raises: []), closure.} = + try: + if peerName notin seen: + seen[peerName] = 0 + seen[peerName].inc + except KeyError: + raiseAssert "seen checked before" check topic == "foobar" if not seenFut.finished() and seen.len >= runs: seenFut.complete() @@ -328,7 +338,7 @@ suite "GossipSub": # Adding again subscriptions - proc handler(topic: string, data: seq[byte]) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} = check topic == "foobar" for i in 0 ..< runs: @@ -364,7 +374,7 @@ suite "GossipSub": nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start()) var handlerFut = newFuture[void]() - proc handler(topic: string, data: seq[byte]) {.async.} = + proc handler(topic: string, data: seq[byte]) {.async: (raises: []).} = handlerFut.complete() await subscribeNodes(nodes) diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index 4eb6a3821e..ecd0e7b9a6 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -27,8 +27,11 @@ randomize() type TestGossipSub* = ref object of GossipSub proc getPubSubPeer*(p: TestGossipSub, peerId: PeerId): PubSubPeer = - proc getConn(): Future[Connection] = - p.switch.dial(peerId, GossipSubCodec_12) + proc getConn(): Future[Connection] {.async: (raises: [GetConnDialError]).} = + try: + return await p.switch.dial(peerId, GossipSubCodec_12) + except CatchableError as e: + raise (ref GetConnDialError)(parent: e) let pubSubPeer = PubSubPeer.new(peerId, getConn, nil, GossipSubCodec_12, 1024 * 1024) debug "created new pubsub peer", peerId diff --git a/tests/testdaemon.nim b/tests/testdaemon.nim index ceb7d44ff4..e0358fab47 100644 --- a/tests/testdaemon.nim +++ b/tests/testdaemon.nim @@ -62,7 +62,7 @@ proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} = proc pubsubHandler1( api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage - ): Future[bool] {.async.} = + ): Future[bool] {.async: (raises: [CatchableError]).} = let smsg = cast[string](message.data) if smsg == pubsubData: inc(resultsCount) @@ -72,7 +72,7 @@ proc pubsubTest(f: set[P2PDaemonFlags]): Future[bool] {.async.} = proc pubsubHandler2( api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage - ): Future[bool] {.async.} = + ): Future[bool] {.async: (raises: [CatchableError]).} = let smsg = cast[string](message.data) if smsg == pubsubData: inc(resultsCount) diff --git a/tests/testminprotobuf.nim b/tests/testminprotobuf.nim index 317cadaafb..469d2aab09 100644 --- a/tests/testminprotobuf.nim +++ b/tests/testminprotobuf.nim @@ -11,7 +11,8 @@ import unittest2 import ../libp2p/protobuf/minprotobuf -import stew/byteutils, strutils +import ../libp2p/varint +import stew/byteutils, strutils, sequtils suite "MinProtobuf test suite": const VarintVectors = [ @@ -615,29 +616,25 @@ suite "MinProtobuf test suite": res.get() == true value == "SOME" - test "[length] too big message test": - var pb1 = initProtoBuffer() - var bigString = newString(pb1.maxSize + 1) + test "[length] invalid message length": + let field = @[0x0a'u8] + let data = @[1'u8, 2'u8, 3'u8] - for i in 0 ..< len(bigString): - bigString[i] = 'A' - pb1.write(1, bigString) - pb1.finish() - block: - var pb2 = initProtoBuffer(pb1.buffer) - var value = newString(pb1.maxSize + 1) - var valueLen = 0 - let res = pb2.getField(1, value, valueLen) - check: - res.isErr() == true + var value: string + var valueLen = 0 - block: - var pb2 = initProtoBuffer(pb1.buffer, maxSize = uint.high) - var value = newString(pb1.maxSize + 1) - var valueLen = 0 - let res = pb2.getField(1, value, valueLen) - check: - res.isErr() == false + let invalidLength = LP.encodeVarint(uint64(int32.high) + 1'u64) + var pb1 = initProtoBuffer(field & invalidLength.get() & data) + let res1 = pb1.getField(1, value, valueLen) + check: + res1.isErr() == true + res1.error() == MessageIncomplete + + let validLength = LP.encodeVarint(data.lenu64) + var pb2 = initProtoBuffer(field & validLength.get() & data) + let res2 = pb2.getField(1, value, valueLen) + check: + res2.isErr() == false test "[length] Repeated field test": var pb1 = initProtoBuffer()