diff --git a/libp2p/muxers/yamux/yamux.nim b/libp2p/muxers/yamux/yamux.nim index 23b8b16a28..a50f9cf0da 100644 --- a/libp2p/muxers/yamux/yamux.nim +++ b/libp2p/muxers/yamux/yamux.nim @@ -186,6 +186,7 @@ proc remoteClosed(channel: YamuxChannel) {.async.} = method closeImpl*(channel: YamuxChannel) {.async, gcsafe.} = if not channel.closedLocally: channel.closedLocally = true + channel.isEof = true if channel.isReset == false and channel.sendQueue.len == 0: await channel.conn.write(YamuxHeader.data(channel.id, 0, {Fin})) @@ -249,6 +250,7 @@ method readOnce*( await channel.closedRemotely or channel.receivedData.wait() if channel.closedRemotely.done() and channel.recvQueue.len == 0: channel.returnedEof = true + channel.isEof = true return 0 let toRead = min(channel.recvQueue.len, nbytes) diff --git a/tests/testrelayv2.nim b/tests/testrelayv2.nim index 8b9dd93d48..b928680311 100644 --- a/tests/testrelayv2.nim +++ b/tests/testrelayv2.nim @@ -37,99 +37,99 @@ proc createSwitch(r: Relay, useYamux: bool = false): Switch = suite "Circuit Relay V2": - # suite "Reservation": - # asyncTeardown: - # await allFutures(src1.stop(), src2.stop(), rel.stop()) - # checkTrackers() - # var - # ttl {.threadvar.}: int - # ldur {.threadvar.}: uint32 - # ldata {.threadvar.}: uint64 - # cl1 {.threadvar.}: RelayClient - # cl2 {.threadvar.}: RelayClient - # rv2 {.threadvar.}: Relay - # src1 {.threadvar.}: Switch - # src2 {.threadvar.}: Switch - # rel {.threadvar.}: Switch - # rsvp {.threadvar.}: Rsvp - # range {.threadvar.}: HSlice[times.DateTime, times.DateTime] - # - # asyncSetup: - # ttl = 3 - # ldur = 60 - # ldata = 2048 - # cl1 = RelayClient.new() - # cl2 = RelayClient.new() - # rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl), - # limitDuration=ldur, - # limitData=ldata, - # maxCircuit=1) - # src1 = createSwitch(cl1) - # src2 = createSwitch(cl2) - # rel = createSwitch(rv2) - # - # await src1.start() - # await src2.start() - # await rel.start() - # await src1.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - # await src2.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - # rsvp = await cl1.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) - # range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds - # check: - # rsvp.expire.int64.fromUnix.utc in range - # rsvp.limitDuration == ldur - # rsvp.limitData == ldata - # - # asyncTest "Too many reservations": - # let conn = await cl2.switch.dial(rel.peerInfo.peerId, rel.peerInfo.addrs, RelayV2HopCodec) - # let pb = encode(HopMessage(msgType: HopMessageType.Reserve)) - # await conn.writeLp(pb.buffer) - # let msg = HopMessage.decode(await conn.readLp(RelayMsgSize)).get() - # check: - # msg.msgType == HopMessageType.Status - # msg.status == Opt.some(StatusV2.ReservationRefused) - # - # asyncTest "Too many reservations + Reconnect": - # expect(ReservationError): - # discard await cl2.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) - # await rel.disconnect(src1.peerInfo.peerId) - # rsvp = await cl2.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) - # range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds - # check: - # rsvp.expire.int64.fromUnix.utc in range - # rsvp.limitDuration == ldur - # rsvp.limitData == ldata - # - # asyncTest "Reservation ttl expires": - # await sleepAsync(chronos.timer.seconds(ttl + 1)) - # rsvp = await cl1.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) - # range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds - # check: - # rsvp.expire.int64.fromUnix.utc in range - # rsvp.limitDuration == ldur - # rsvp.limitData == ldata - # - # asyncTest "Reservation over relay": - # let - # rv2add = Relay.new() - # addrs = @[ MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" & - # $rel.peerInfo.peerId & "/p2p-circuit").get() ] - # rv2add.setup(src2) - # await rv2add.start() - # src2.mount(rv2add) - # rv2.maxCircuit.inc() - # - # rsvp = await cl2.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) - # range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds - # check: - # rsvp.expire.int64.fromUnix.utc in range - # rsvp.limitDuration == ldur - # rsvp.limitData == ldata - # expect(ReservationError): - # discard await cl1.reserve(src2.peerInfo.peerId, addrs) - - suite "Connection": - for useYamux in [false, true]: + suite "Reservation": + asyncTeardown: + await allFutures(src1.stop(), src2.stop(), rel.stop()) + checkTrackers() + var + ttl {.threadvar.}: int + ldur {.threadvar.}: uint32 + ldata {.threadvar.}: uint64 + cl1 {.threadvar.}: RelayClient + cl2 {.threadvar.}: RelayClient + rv2 {.threadvar.}: Relay + src1 {.threadvar.}: Switch + src2 {.threadvar.}: Switch + rel {.threadvar.}: Switch + rsvp {.threadvar.}: Rsvp + range {.threadvar.}: HSlice[times.DateTime, times.DateTime] + + asyncSetup: + ttl = 3 + ldur = 60 + ldata = 2048 + cl1 = RelayClient.new() + cl2 = RelayClient.new() + rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl), + limitDuration=ldur, + limitData=ldata, + maxCircuit=1) + src1 = createSwitch(cl1) + src2 = createSwitch(cl2) + rel = createSwitch(rv2) + + await src1.start() + await src2.start() + await rel.start() + await src1.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) + await src2.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) + rsvp = await cl1.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) + range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds + check: + rsvp.expire.int64.fromUnix.utc in range + rsvp.limitDuration == ldur + rsvp.limitData == ldata + + asyncTest "Too many reservations": + let conn = await cl2.switch.dial(rel.peerInfo.peerId, rel.peerInfo.addrs, RelayV2HopCodec) + let pb = encode(HopMessage(msgType: HopMessageType.Reserve)) + await conn.writeLp(pb.buffer) + let msg = HopMessage.decode(await conn.readLp(RelayMsgSize)).get() + check: + msg.msgType == HopMessageType.Status + msg.status == Opt.some(StatusV2.ReservationRefused) + + asyncTest "Too many reservations + Reconnect": + expect(ReservationError): + discard await cl2.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) + await rel.disconnect(src1.peerInfo.peerId) + rsvp = await cl2.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) + range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds + check: + rsvp.expire.int64.fromUnix.utc in range + rsvp.limitDuration == ldur + rsvp.limitData == ldata + + asyncTest "Reservation ttl expires": + await sleepAsync(chronos.timer.seconds(ttl + 1)) + rsvp = await cl1.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) + range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds + check: + rsvp.expire.int64.fromUnix.utc in range + rsvp.limitDuration == ldur + rsvp.limitData == ldata + + asyncTest "Reservation over relay": + let + rv2add = Relay.new() + addrs = @[ MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" & + $rel.peerInfo.peerId & "/p2p-circuit").get() ] + rv2add.setup(src2) + await rv2add.start() + src2.mount(rv2add) + rv2.maxCircuit.inc() + + rsvp = await cl2.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) + range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds + check: + rsvp.expire.int64.fromUnix.utc in range + rsvp.limitDuration == ldur + rsvp.limitData == ldata + expect(ReservationError): + discard await cl1.reserve(src2.peerInfo.peerId, addrs) + + for (useYamux, muxName) in [(false, "Mplex"), (true, "Yamux")]: + suite "Circuit Relay V2 Connection using " & muxName: asyncTeardown: checkTrackers() var @@ -329,7 +329,7 @@ suite "Circuit Relay V2": raise newException(CatchableError, "Should not be here") let rel2Cl = RelayClient.new(canHop = true) - rel2 = createSwitch(rel2Cl) + rel2 = createSwitch(rel2Cl, useYamux) rv2 = Relay.new() rv2.setup(rel) rel.mount(rv2) @@ -354,7 +354,8 @@ suite "Circuit Relay V2": expect(DialFailedError): conn = await src.dial(dst.peerInfo.peerId, addrs, customProtoCodec) - await allFutures(conn.close()) + if not conn.isNil(): + await allFutures(conn.close()) await allFutures(src.stop(), dst.stop(), rel.stop(), rel2.stop()) asyncTest "Connection using ClientRelay": @@ -388,9 +389,9 @@ suite "Circuit Relay V2": clientA = RelayClient.new(canHop = true) clientB = RelayClient.new(canHop = true) clientC = RelayClient.new(canHop = true) - switchA = createSwitch(clientA) - switchB = createSwitch(clientB) - switchC = createSwitch(clientC) + switchA = createSwitch(clientA, useYamux) + switchB = createSwitch(clientB, useYamux) + switchC = createSwitch(clientC, useYamux) switchA.mount(protoBCA) switchB.mount(protoCAB)