From 5da2681ee06ecd04903eb66c1e2f219be55af294 Mon Sep 17 00:00:00 2001 From: Diego Date: Tue, 7 Nov 2023 00:39:28 +0100 Subject: [PATCH 1/4] Reproducing bug --- libp2p/builders.nim | 1 + libp2p/muxers/yamux/yamux.nim | 1 + tests/testrelayv2.nim | 797 +++++++++++++++++----------------- 3 files changed, 404 insertions(+), 395 deletions(-) diff --git a/libp2p/builders.nim b/libp2p/builders.nim index 63a216d46d..6174f51d47 100644 --- a/libp2p/builders.nim +++ b/libp2p/builders.nim @@ -322,6 +322,7 @@ proc newStandardSwitch*( .withMaxConnsPerPeer(maxConnsPerPeer) .withPeerStore(capacity=peerStoreCapacity) .withMplex(inTimeout, outTimeout) + .withYamux() .withTcpTransport(transportFlags) .withNameResolver(nameResolver) .withNoise() diff --git a/libp2p/muxers/yamux/yamux.nim b/libp2p/muxers/yamux/yamux.nim index e71772da1b..23b8b16a28 100644 --- a/libp2p/muxers/yamux/yamux.nim +++ b/libp2p/muxers/yamux/yamux.nim @@ -454,6 +454,7 @@ method handle*(m: Yamux) {.async, gcsafe.} = if header.streamId in m.flushed: m.flushed.del(header.streamId) if header.streamId mod 2 == m.currentId mod 2: + debug "Peer used our reserved stream id, skipping", id=header.streamId, currentId=m.currentId, peerId=m.connection.peerId raise newException(YamuxError, "Peer used our reserved stream id") let newStream = m.createStream(header.streamId, false) if m.channels.len >= m.maxChannCount: diff --git a/tests/testrelayv2.nim b/tests/testrelayv2.nim index 6802e8d0ef..8b9dd93d48 100644 --- a/tests/testrelayv2.nim +++ b/tests/testrelayv2.nim @@ -19,411 +19,418 @@ import ./helpers import std/times import stew/byteutils -proc createSwitch(r: Relay): Switch = - result = SwitchBuilder.new() +proc createSwitch(r: Relay, useYamux: bool = false): Switch = + var builder = SwitchBuilder.new() .withRng(newRng()) .withAddresses(@[ MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() ]) .withTcpTransport() - .withMplex() + + if useYamux: + builder = builder.withYamux() + else: + builder = builder.withMplex() + + return builder .withNoise() .withCircuitRelay(r) .build() suite "Circuit Relay V2": - suite "Reservation": - asyncTeardown: - await allFutures(src1.stop(), src2.stop(), rel.stop()) - checkTrackers() - var - ttl {.threadvar.}: int - ldur {.threadvar.}: uint32 - ldata {.threadvar.}: uint64 - cl1 {.threadvar.}: RelayClient - cl2 {.threadvar.}: RelayClient - rv2 {.threadvar.}: Relay - src1 {.threadvar.}: Switch - src2 {.threadvar.}: Switch - rel {.threadvar.}: Switch - rsvp {.threadvar.}: Rsvp - range {.threadvar.}: HSlice[times.DateTime, times.DateTime] - - asyncSetup: - ttl = 3 - ldur = 60 - ldata = 2048 - cl1 = RelayClient.new() - cl2 = RelayClient.new() - rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl), - limitDuration=ldur, - limitData=ldata, - maxCircuit=1) - src1 = createSwitch(cl1) - src2 = createSwitch(cl2) - rel = createSwitch(rv2) - - await src1.start() - await src2.start() - await rel.start() - await src1.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - await src2.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - rsvp = await cl1.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) - range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds - check: - rsvp.expire.int64.fromUnix.utc in range - rsvp.limitDuration == ldur - rsvp.limitData == ldata - - asyncTest "Too many reservations": - let conn = await cl2.switch.dial(rel.peerInfo.peerId, rel.peerInfo.addrs, RelayV2HopCodec) - let pb = encode(HopMessage(msgType: HopMessageType.Reserve)) - await conn.writeLp(pb.buffer) - let msg = HopMessage.decode(await conn.readLp(RelayMsgSize)).get() - check: - msg.msgType == HopMessageType.Status - msg.status == Opt.some(StatusV2.ReservationRefused) - - asyncTest "Too many reservations + Reconnect": - expect(ReservationError): - discard await cl2.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) - await rel.disconnect(src1.peerInfo.peerId) - rsvp = await cl2.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) - range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds - check: - rsvp.expire.int64.fromUnix.utc in range - rsvp.limitDuration == ldur - rsvp.limitData == ldata - - asyncTest "Reservation ttl expires": - await sleepAsync(chronos.timer.seconds(ttl + 1)) - rsvp = await cl1.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) - range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds - check: - rsvp.expire.int64.fromUnix.utc in range - rsvp.limitDuration == ldur - rsvp.limitData == ldata - - asyncTest "Reservation over relay": - let - rv2add = Relay.new() - addrs = @[ MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" & - $rel.peerInfo.peerId & "/p2p-circuit").get() ] - rv2add.setup(src2) - await rv2add.start() - src2.mount(rv2add) - rv2.maxCircuit.inc() - - rsvp = await cl2.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) - range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds - check: - rsvp.expire.int64.fromUnix.utc in range - rsvp.limitDuration == ldur - rsvp.limitData == ldata - expect(ReservationError): - discard await cl1.reserve(src2.peerInfo.peerId, addrs) + # suite "Reservation": + # asyncTeardown: + # await allFutures(src1.stop(), src2.stop(), rel.stop()) + # checkTrackers() + # var + # ttl {.threadvar.}: int + # ldur {.threadvar.}: uint32 + # ldata {.threadvar.}: uint64 + # cl1 {.threadvar.}: RelayClient + # cl2 {.threadvar.}: RelayClient + # rv2 {.threadvar.}: Relay + # src1 {.threadvar.}: Switch + # src2 {.threadvar.}: Switch + # rel {.threadvar.}: Switch + # rsvp {.threadvar.}: Rsvp + # range {.threadvar.}: HSlice[times.DateTime, times.DateTime] + # + # asyncSetup: + # ttl = 3 + # ldur = 60 + # ldata = 2048 + # cl1 = RelayClient.new() + # cl2 = RelayClient.new() + # rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl), + # limitDuration=ldur, + # limitData=ldata, + # maxCircuit=1) + # src1 = createSwitch(cl1) + # src2 = createSwitch(cl2) + # rel = createSwitch(rv2) + # + # await src1.start() + # await src2.start() + # await rel.start() + # await src1.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) + # await src2.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) + # rsvp = await cl1.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) + # range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds + # check: + # rsvp.expire.int64.fromUnix.utc in range + # rsvp.limitDuration == ldur + # rsvp.limitData == ldata + # + # asyncTest "Too many reservations": + # let conn = await cl2.switch.dial(rel.peerInfo.peerId, rel.peerInfo.addrs, RelayV2HopCodec) + # let pb = encode(HopMessage(msgType: HopMessageType.Reserve)) + # await conn.writeLp(pb.buffer) + # let msg = HopMessage.decode(await conn.readLp(RelayMsgSize)).get() + # check: + # msg.msgType == HopMessageType.Status + # msg.status == Opt.some(StatusV2.ReservationRefused) + # + # asyncTest "Too many reservations + Reconnect": + # expect(ReservationError): + # discard await cl2.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) + # await rel.disconnect(src1.peerInfo.peerId) + # rsvp = await cl2.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) + # range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds + # check: + # rsvp.expire.int64.fromUnix.utc in range + # rsvp.limitDuration == ldur + # rsvp.limitData == ldata + # + # asyncTest "Reservation ttl expires": + # await sleepAsync(chronos.timer.seconds(ttl + 1)) + # rsvp = await cl1.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) + # range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds + # check: + # rsvp.expire.int64.fromUnix.utc in range + # rsvp.limitDuration == ldur + # rsvp.limitData == ldata + # + # asyncTest "Reservation over relay": + # let + # rv2add = Relay.new() + # addrs = @[ MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" & + # $rel.peerInfo.peerId & "/p2p-circuit").get() ] + # rv2add.setup(src2) + # await rv2add.start() + # src2.mount(rv2add) + # rv2.maxCircuit.inc() + # + # rsvp = await cl2.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) + # range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds + # check: + # rsvp.expire.int64.fromUnix.utc in range + # rsvp.limitDuration == ldur + # rsvp.limitData == ldata + # expect(ReservationError): + # discard await cl1.reserve(src2.peerInfo.peerId, addrs) suite "Connection": - asyncTeardown: - checkTrackers() - var - customProtoCodec {.threadvar.}: string - proto {.threadvar.}: LPProtocol - ttl {.threadvar.}: int - ldur {.threadvar.}: uint32 - ldata {.threadvar.}: uint64 - srcCl {.threadvar.}: RelayClient - dstCl {.threadvar.}: RelayClient - rv2 {.threadvar.}: Relay - src {.threadvar.}: Switch - dst {.threadvar.}: Switch - rel {.threadvar.}: Switch - rsvp {.threadvar.}: Rsvp - conn {.threadvar.}: Connection - - asyncSetup: - customProtoCodec = "/test" - proto = new LPProtocol - proto.codec = customProtoCodec - ttl = 60 - ldur = 120 - ldata = 16384 - srcCl = RelayClient.new() - dstCl = RelayClient.new() - src = createSwitch(srcCl) - dst = createSwitch(dstCl) - rel = newStandardSwitch() - - asyncTest "Connection succeed": - proto.handler = proc(conn: Connection, proto: string) {.async.} = - check: "test1" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("test2") - check: "test3" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("test4") - await conn.close() - rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl), - limitDuration=ldur, - limitData=ldata) - rv2.setup(rel) - rel.mount(rv2) - dst.mount(proto) - - await rel.start() - await src.start() - await dst.start() - - let addrs = MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" & - $rel.peerInfo.peerId & "/p2p-circuit").get() - - await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - await dst.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - - rsvp = await dstCl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) - - conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec) - await conn.writeLp("test1") - check: "test2" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("test3") - check: "test4" == string.fromBytes(await conn.readLp(1024)) - await allFutures(conn.close()) - await allFutures(src.stop(), dst.stop(), rel.stop()) - - asyncTest "Connection duration exceeded": - ldur = 3 - proto.handler = proc(conn: Connection, proto: string) {.async.} = - check "wanna sleep?" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("yeah!") - check "go!" == string.fromBytes(await conn.readLp(1024)) - await sleepAsync(chronos.timer.seconds(ldur + 1)) - await conn.writeLp("that was a cool power nap") - await conn.close() - rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl), - limitDuration=ldur, - limitData=ldata) - rv2.setup(rel) - rel.mount(rv2) - dst.mount(proto) - - await rel.start() - await src.start() - await dst.start() - - let addrs = MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" & - $rel.peerInfo.peerId & "/p2p-circuit").get() - - await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - await dst.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - - rsvp = await dstCl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) - conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec) - await conn.writeLp("wanna sleep?") - check: "yeah!" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("go!") - expect(LPStreamEOFError): - discard await conn.readLp(1024) - await allFutures(conn.close()) - await allFutures(src.stop(), dst.stop(), rel.stop()) - - asyncTest "Connection data exceeded": - ldata = 1000 - proto.handler = proc(conn: Connection, proto: string) {.async.} = - check "count me the better story you know" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("do you expect a lorem ipsum or...?") - check "surprise me!" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("""Call me Ishmael. Some years ago--never mind how long -precisely--having little or no money in my purse, and nothing -particular to interest me on shore, I thought I would sail about a -little and see the watery part of the world. It is a way I have of -driving off the spleen and regulating the circulation. Whenever I -find myself growing grim about the mouth; whenever it is a damp, -drizzly November in my soul; whenever I find myself involuntarily -pausing before coffin warehouses, and bringing up the rear of every -funeral I meet; and especially whenever my hypos get such an upper -hand of me, that it requires a strong moral principle to prevent me -from deliberately stepping into the street, and methodically knocking -people's hats off--then, I account it high time to get to sea as soon -as I can. This is my substitute for pistol and ball. With a -philosophical flourish Cato throws himself upon his sword; I quietly -take to the ship.""") - rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl), - limitDuration=ldur, - limitData=ldata) - rv2.setup(rel) - rel.mount(rv2) - dst.mount(proto) - - await rel.start() - await src.start() - await dst.start() - - let addrs = MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" & - $rel.peerInfo.peerId & "/p2p-circuit").get() - - await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - await dst.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - - rsvp = await dstCl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) - conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec) - await conn.writeLp("count me the better story you know") - check: "do you expect a lorem ipsum or...?" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("surprise me!") - expect(LPStreamEOFError): - discard await conn.readLp(1024) - await allFutures(conn.close()) - await allFutures(src.stop(), dst.stop(), rel.stop()) - - asyncTest "Reservation ttl expire during connection": - ttl = 3 - proto.handler = proc(conn: Connection, proto: string) {.async.} = - check: "test1" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("test2") - check: "test3" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("test4") - await conn.close() - rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl), - limitDuration=ldur, - limitData=ldata) - rv2.setup(rel) - rel.mount(rv2) - dst.mount(proto) - - await rel.start() - await src.start() - await dst.start() - - let addrs = MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" & - $rel.peerInfo.peerId & "/p2p-circuit").get() - - await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - await dst.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - - rsvp = await dstCl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) - conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec) - await conn.writeLp("test1") - check: "test2" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("test3") - check: "test4" == string.fromBytes(await conn.readLp(1024)) - await src.disconnect(rel.peerInfo.peerId) - await sleepAsync(chronos.timer.seconds(ttl + 1)) - - expect(DialFailedError): - check: conn.atEof() - await conn.close() + for useYamux in [false, true]: + asyncTeardown: + checkTrackers() + var + customProtoCodec {.threadvar.}: string + proto {.threadvar.}: LPProtocol + ttl {.threadvar.}: int + ldur {.threadvar.}: uint32 + ldata {.threadvar.}: uint64 + srcCl {.threadvar.}: RelayClient + dstCl {.threadvar.}: RelayClient + rv2 {.threadvar.}: Relay + src {.threadvar.}: Switch + dst {.threadvar.}: Switch + rel {.threadvar.}: Switch + rsvp {.threadvar.}: Rsvp + conn {.threadvar.}: Connection + + asyncSetup: + customProtoCodec = "/test" + proto = new LPProtocol + proto.codec = customProtoCodec + ttl = 60 + ldur = 120 + ldata = 16384 + srcCl = RelayClient.new() + dstCl = RelayClient.new() + src = createSwitch(srcCl, useYamux) + dst = createSwitch(dstCl, useYamux) + rel = newStandardSwitch() + + asyncTest "Connection succeed": + proto.handler = proc(conn: Connection, proto: string) {.async.} = + check: "test1" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("test2") + check: "test3" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("test4") + await conn.close() + rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl), + limitDuration=ldur, + limitData=ldata) + rv2.setup(rel) + rel.mount(rv2) + dst.mount(proto) + + await rel.start() + await src.start() + await dst.start() + + let addrs = MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" & + $rel.peerInfo.peerId & "/p2p-circuit").get() + await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) + await dst.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) + + rsvp = await dstCl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) + conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec) - await allFutures(conn.close()) - await allFutures(src.stop(), dst.stop(), rel.stop()) - - asyncTest "Connection over relay": - # src => rel => rel2 => dst - # rel2 reserve rel - # dst reserve rel2 - # src try to connect with dst - proto.handler = proc(conn: Connection, proto: string) {.async.} = - raise newException(CatchableError, "Should not be here") - let - rel2Cl = RelayClient.new(canHop = true) - rel2 = createSwitch(rel2Cl) - rv2 = Relay.new() - rv2.setup(rel) - rel.mount(rv2) - dst.mount(proto) - await rel.start() - await rel2.start() - await src.start() - await dst.start() - - let - addrs = @[ MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" & - $rel.peerInfo.peerId & "/p2p-circuit/p2p/" & - $rel2.peerInfo.peerId & "/p2p/" & - $rel2.peerInfo.peerId & "/p2p-circuit").get() ] - - await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - await rel2.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - await dst.connect(rel2.peerInfo.peerId, rel2.peerInfo.addrs) - - rsvp = await rel2Cl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) - let rsvp2 = await dstCl.reserve(rel2.peerInfo.peerId, rel2.peerInfo.addrs) - - expect(DialFailedError): - conn = await src.dial(dst.peerInfo.peerId, addrs, customProtoCodec) - await allFutures(conn.close()) - await allFutures(src.stop(), dst.stop(), rel.stop(), rel2.stop()) - - asyncTest "Connection using ClientRelay": - var - protoABC = new LPProtocol - protoBCA = new LPProtocol - protoCAB = new LPProtocol - protoABC.codec = "/abctest" - protoABC.handler = proc(conn: Connection, proto: string) {.async.} = - check: "testABC1" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("testABC2") - check: "testABC3" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("testABC4") - await conn.close() - protoBCA.codec = "/bcatest" - protoBCA.handler = proc(conn: Connection, proto: string) {.async.} = - check: "testBCA1" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("testBCA2") - check: "testBCA3" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("testBCA4") - await conn.close() - protoCAB.codec = "/cabtest" - protoCAB.handler = proc(conn: Connection, proto: string) {.async.} = - check: "testCAB1" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("testCAB2") - check: "testCAB3" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("testCAB4") - await conn.close() - - let - clientA = RelayClient.new(canHop = true) - clientB = RelayClient.new(canHop = true) - clientC = RelayClient.new(canHop = true) - switchA = createSwitch(clientA) - switchB = createSwitch(clientB) - switchC = createSwitch(clientC) - - switchA.mount(protoBCA) - switchB.mount(protoCAB) - switchC.mount(protoABC) - - await switchA.start() - await switchB.start() - await switchC.start() - - let - addrsABC = MultiAddress.init($switchB.peerInfo.addrs[0] & "/p2p/" & - $switchB.peerInfo.peerId & "/p2p-circuit").get() - addrsBCA = MultiAddress.init($switchC.peerInfo.addrs[0] & "/p2p/" & - $switchC.peerInfo.peerId & "/p2p-circuit").get() - addrsCAB = MultiAddress.init($switchA.peerInfo.addrs[0] & "/p2p/" & - $switchA.peerInfo.peerId & "/p2p-circuit").get() - - await switchA.connect(switchB.peerInfo.peerId, switchB.peerInfo.addrs) - await switchB.connect(switchC.peerInfo.peerId, switchC.peerInfo.addrs) - await switchC.connect(switchA.peerInfo.peerId, switchA.peerInfo.addrs) - let rsvpABC = await clientA.reserve(switchC.peerInfo.peerId, switchC.peerInfo.addrs) - let rsvpBCA = await clientB.reserve(switchA.peerInfo.peerId, switchA.peerInfo.addrs) - let rsvpCAB = await clientC.reserve(switchB.peerInfo.peerId, switchB.peerInfo.addrs) - let connABC = await switchA.dial(switchC.peerInfo.peerId, @[ addrsABC ], "/abctest") - let connBCA = await switchB.dial(switchA.peerInfo.peerId, @[ addrsBCA ], "/bcatest") - let connCAB = await switchC.dial(switchB.peerInfo.peerId, @[ addrsCAB ], "/cabtest") - - await connABC.writeLp("testABC1") - await connBCA.writeLp("testBCA1") - await connCAB.writeLp("testCAB1") - check: - "testABC2" == string.fromBytes(await connABC.readLp(1024)) - "testBCA2" == string.fromBytes(await connBCA.readLp(1024)) - "testCAB2" == string.fromBytes(await connCAB.readLp(1024)) - await connABC.writeLp("testABC3") - await connBCA.writeLp("testBCA3") - await connCAB.writeLp("testCAB3") - check: - "testABC4" == string.fromBytes(await connABC.readLp(1024)) - "testBCA4" == string.fromBytes(await connBCA.readLp(1024)) - "testCAB4" == string.fromBytes(await connCAB.readLp(1024)) - await allFutures(connABC.close(), connBCA.close(), connCAB.close()) - await allFutures(switchA.stop(), switchB.stop(), switchC.stop()) + await conn.writeLp("test1") + check: "test2" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("test3") + check: "test4" == string.fromBytes(await conn.readLp(1024)) + await allFutures(conn.close()) + await allFutures(src.stop(), dst.stop(), rel.stop()) + + asyncTest "Connection duration exceeded": + ldur = 3 + proto.handler = proc(conn: Connection, proto: string) {.async.} = + check "wanna sleep?" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("yeah!") + check "go!" == string.fromBytes(await conn.readLp(1024)) + await sleepAsync(chronos.timer.seconds(ldur + 1)) + await conn.writeLp("that was a cool power nap") + await conn.close() + rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl), + limitDuration=ldur, + limitData=ldata) + rv2.setup(rel) + rel.mount(rv2) + dst.mount(proto) + + await rel.start() + await src.start() + await dst.start() + + let addrs = MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" & + $rel.peerInfo.peerId & "/p2p-circuit").get() + + await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) + await dst.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) + + rsvp = await dstCl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) + conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec) + await conn.writeLp("wanna sleep?") + check: "yeah!" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("go!") + expect(LPStreamEOFError): + discard await conn.readLp(1024) + await allFutures(conn.close()) + await allFutures(src.stop(), dst.stop(), rel.stop()) + + asyncTest "Connection data exceeded": + ldata = 1000 + proto.handler = proc(conn: Connection, proto: string) {.async.} = + check "count me the better story you know" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("do you expect a lorem ipsum or...?") + check "surprise me!" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("""Call me Ishmael. Some years ago--never mind how long + precisely--having little or no money in my purse, and nothing + particular to interest me on shore, I thought I would sail about a + little and see the watery part of the world. It is a way I have of + driving off the spleen and regulating the circulation. Whenever I + find myself growing grim about the mouth; whenever it is a damp, + drizzly November in my soul; whenever I find myself involuntarily + pausing before coffin warehouses, and bringing up the rear of every + funeral I meet; and especially whenever my hypos get such an upper + hand of me, that it requires a strong moral principle to prevent me + from deliberately stepping into the street, and methodically knocking + people's hats off--then, I account it high time to get to sea as soon + as I can. This is my substitute for pistol and ball. With a + philosophical flourish Cato throws himself upon his sword; I quietly + take to the ship.""") + rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl), + limitDuration=ldur, + limitData=ldata) + rv2.setup(rel) + rel.mount(rv2) + dst.mount(proto) + + await rel.start() + await src.start() + await dst.start() + + let addrs = MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" & + $rel.peerInfo.peerId & "/p2p-circuit").get() + + await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) + await dst.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) + + rsvp = await dstCl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) + conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec) + await conn.writeLp("count me the better story you know") + check: "do you expect a lorem ipsum or...?" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("surprise me!") + expect(LPStreamEOFError): + discard await conn.readLp(1024) + await allFutures(conn.close()) + await allFutures(src.stop(), dst.stop(), rel.stop()) + + asyncTest "Reservation ttl expire during connection": + ttl = 3 + proto.handler = proc(conn: Connection, proto: string) {.async.} = + check: "test1" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("test2") + check: "test3" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("test4") + await conn.close() + rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl), + limitDuration=ldur, + limitData=ldata) + rv2.setup(rel) + rel.mount(rv2) + dst.mount(proto) + + await rel.start() + await src.start() + await dst.start() + + let addrs = MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" & + $rel.peerInfo.peerId & "/p2p-circuit").get() + + await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) + await dst.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) + + rsvp = await dstCl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) + conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec) + await conn.writeLp("test1") + check: "test2" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("test3") + check: "test4" == string.fromBytes(await conn.readLp(1024)) + await src.disconnect(rel.peerInfo.peerId) + await sleepAsync(chronos.timer.seconds(ttl + 1)) + + expect(DialFailedError): + check: conn.atEof() + await conn.close() + await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) + conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec) + await allFutures(conn.close()) + await allFutures(src.stop(), dst.stop(), rel.stop()) + + asyncTest "Connection over relay": + # src => rel => rel2 => dst + # rel2 reserve rel + # dst reserve rel2 + # src try to connect with dst + proto.handler = proc(conn: Connection, proto: string) {.async.} = + raise newException(CatchableError, "Should not be here") + let + rel2Cl = RelayClient.new(canHop = true) + rel2 = createSwitch(rel2Cl) + rv2 = Relay.new() + rv2.setup(rel) + rel.mount(rv2) + dst.mount(proto) + await rel.start() + await rel2.start() + await src.start() + await dst.start() + + let + addrs = @[ MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" & + $rel.peerInfo.peerId & "/p2p-circuit/p2p/" & + $rel2.peerInfo.peerId & "/p2p/" & + $rel2.peerInfo.peerId & "/p2p-circuit").get() ] + + await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) + await rel2.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) + await dst.connect(rel2.peerInfo.peerId, rel2.peerInfo.addrs) + + rsvp = await rel2Cl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) + let rsvp2 = await dstCl.reserve(rel2.peerInfo.peerId, rel2.peerInfo.addrs) + + expect(DialFailedError): + conn = await src.dial(dst.peerInfo.peerId, addrs, customProtoCodec) + await allFutures(conn.close()) + await allFutures(src.stop(), dst.stop(), rel.stop(), rel2.stop()) + + asyncTest "Connection using ClientRelay": + var + protoABC = new LPProtocol + protoBCA = new LPProtocol + protoCAB = new LPProtocol + protoABC.codec = "/abctest" + protoABC.handler = proc(conn: Connection, proto: string) {.async.} = + check: "testABC1" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("testABC2") + check: "testABC3" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("testABC4") + await conn.close() + protoBCA.codec = "/bcatest" + protoBCA.handler = proc(conn: Connection, proto: string) {.async.} = + check: "testBCA1" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("testBCA2") + check: "testBCA3" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("testBCA4") + await conn.close() + protoCAB.codec = "/cabtest" + protoCAB.handler = proc(conn: Connection, proto: string) {.async.} = + check: "testCAB1" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("testCAB2") + check: "testCAB3" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("testCAB4") + await conn.close() + + let + clientA = RelayClient.new(canHop = true) + clientB = RelayClient.new(canHop = true) + clientC = RelayClient.new(canHop = true) + switchA = createSwitch(clientA) + switchB = createSwitch(clientB) + switchC = createSwitch(clientC) + + switchA.mount(protoBCA) + switchB.mount(protoCAB) + switchC.mount(protoABC) + + await switchA.start() + await switchB.start() + await switchC.start() + + let + addrsABC = MultiAddress.init($switchB.peerInfo.addrs[0] & "/p2p/" & + $switchB.peerInfo.peerId & "/p2p-circuit").get() + addrsBCA = MultiAddress.init($switchC.peerInfo.addrs[0] & "/p2p/" & + $switchC.peerInfo.peerId & "/p2p-circuit").get() + addrsCAB = MultiAddress.init($switchA.peerInfo.addrs[0] & "/p2p/" & + $switchA.peerInfo.peerId & "/p2p-circuit").get() + + await switchA.connect(switchB.peerInfo.peerId, switchB.peerInfo.addrs) + await switchB.connect(switchC.peerInfo.peerId, switchC.peerInfo.addrs) + await switchC.connect(switchA.peerInfo.peerId, switchA.peerInfo.addrs) + let rsvpABC = await clientA.reserve(switchC.peerInfo.peerId, switchC.peerInfo.addrs) + let rsvpBCA = await clientB.reserve(switchA.peerInfo.peerId, switchA.peerInfo.addrs) + let rsvpCAB = await clientC.reserve(switchB.peerInfo.peerId, switchB.peerInfo.addrs) + let connABC = await switchA.dial(switchC.peerInfo.peerId, @[ addrsABC ], "/abctest") + let connBCA = await switchB.dial(switchA.peerInfo.peerId, @[ addrsBCA ], "/bcatest") + let connCAB = await switchC.dial(switchB.peerInfo.peerId, @[ addrsCAB ], "/cabtest") + + await connABC.writeLp("testABC1") + await connBCA.writeLp("testBCA1") + await connCAB.writeLp("testCAB1") + check: + "testABC2" == string.fromBytes(await connABC.readLp(1024)) + "testBCA2" == string.fromBytes(await connBCA.readLp(1024)) + "testCAB2" == string.fromBytes(await connCAB.readLp(1024)) + await connABC.writeLp("testABC3") + await connBCA.writeLp("testBCA3") + await connCAB.writeLp("testCAB3") + check: + "testABC4" == string.fromBytes(await connABC.readLp(1024)) + "testBCA4" == string.fromBytes(await connBCA.readLp(1024)) + "testCAB4" == string.fromBytes(await connCAB.readLp(1024)) + await allFutures(connABC.close(), connBCA.close(), connCAB.close()) + await allFutures(switchA.stop(), switchB.stop(), switchC.stop()) From 583231132c89c582766a7462c85236faa1bcbc0b Mon Sep 17 00:00:00 2001 From: Diego Date: Thu, 9 Nov 2023 14:18:29 +0100 Subject: [PATCH 2/4] Add missing dir for relay connection --- libp2p/protocols/connectivity/relay/rconn.nim | 1 + 1 file changed, 1 insertion(+) diff --git a/libp2p/protocols/connectivity/relay/rconn.nim b/libp2p/protocols/connectivity/relay/rconn.nim index 1856afe706..4f2732aac2 100644 --- a/libp2p/protocols/connectivity/relay/rconn.nim +++ b/libp2p/protocols/connectivity/relay/rconn.nim @@ -47,6 +47,7 @@ proc new*( limitDuration: uint32, limitData: uint64): T = let rc = T(conn: conn, limitDuration: limitDuration, limitData: limitData) + rc.dir = conn.dir rc.initStream() if limitDuration > 0: proc checkDurationConnection() {.async.} = From d282f52af2eca7c77025736380b6adf942d05c73 Mon Sep 17 00:00:00 2001 From: Ludovic Chenut Date: Mon, 20 Nov 2023 16:39:15 +0100 Subject: [PATCH 3/4] fix yamux / relay bug --- libp2p/muxers/yamux/yamux.nim | 2 + tests/testrelayv2.nim | 197 +++++++++++++++++----------------- 2 files changed, 101 insertions(+), 98 deletions(-) diff --git a/libp2p/muxers/yamux/yamux.nim b/libp2p/muxers/yamux/yamux.nim index 23b8b16a28..a50f9cf0da 100644 --- a/libp2p/muxers/yamux/yamux.nim +++ b/libp2p/muxers/yamux/yamux.nim @@ -186,6 +186,7 @@ proc remoteClosed(channel: YamuxChannel) {.async.} = method closeImpl*(channel: YamuxChannel) {.async, gcsafe.} = if not channel.closedLocally: channel.closedLocally = true + channel.isEof = true if channel.isReset == false and channel.sendQueue.len == 0: await channel.conn.write(YamuxHeader.data(channel.id, 0, {Fin})) @@ -249,6 +250,7 @@ method readOnce*( await channel.closedRemotely or channel.receivedData.wait() if channel.closedRemotely.done() and channel.recvQueue.len == 0: channel.returnedEof = true + channel.isEof = true return 0 let toRead = min(channel.recvQueue.len, nbytes) diff --git a/tests/testrelayv2.nim b/tests/testrelayv2.nim index 8b9dd93d48..b928680311 100644 --- a/tests/testrelayv2.nim +++ b/tests/testrelayv2.nim @@ -37,99 +37,99 @@ proc createSwitch(r: Relay, useYamux: bool = false): Switch = suite "Circuit Relay V2": - # suite "Reservation": - # asyncTeardown: - # await allFutures(src1.stop(), src2.stop(), rel.stop()) - # checkTrackers() - # var - # ttl {.threadvar.}: int - # ldur {.threadvar.}: uint32 - # ldata {.threadvar.}: uint64 - # cl1 {.threadvar.}: RelayClient - # cl2 {.threadvar.}: RelayClient - # rv2 {.threadvar.}: Relay - # src1 {.threadvar.}: Switch - # src2 {.threadvar.}: Switch - # rel {.threadvar.}: Switch - # rsvp {.threadvar.}: Rsvp - # range {.threadvar.}: HSlice[times.DateTime, times.DateTime] - # - # asyncSetup: - # ttl = 3 - # ldur = 60 - # ldata = 2048 - # cl1 = RelayClient.new() - # cl2 = RelayClient.new() - # rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl), - # limitDuration=ldur, - # limitData=ldata, - # maxCircuit=1) - # src1 = createSwitch(cl1) - # src2 = createSwitch(cl2) - # rel = createSwitch(rv2) - # - # await src1.start() - # await src2.start() - # await rel.start() - # await src1.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - # await src2.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - # rsvp = await cl1.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) - # range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds - # check: - # rsvp.expire.int64.fromUnix.utc in range - # rsvp.limitDuration == ldur - # rsvp.limitData == ldata - # - # asyncTest "Too many reservations": - # let conn = await cl2.switch.dial(rel.peerInfo.peerId, rel.peerInfo.addrs, RelayV2HopCodec) - # let pb = encode(HopMessage(msgType: HopMessageType.Reserve)) - # await conn.writeLp(pb.buffer) - # let msg = HopMessage.decode(await conn.readLp(RelayMsgSize)).get() - # check: - # msg.msgType == HopMessageType.Status - # msg.status == Opt.some(StatusV2.ReservationRefused) - # - # asyncTest "Too many reservations + Reconnect": - # expect(ReservationError): - # discard await cl2.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) - # await rel.disconnect(src1.peerInfo.peerId) - # rsvp = await cl2.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) - # range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds - # check: - # rsvp.expire.int64.fromUnix.utc in range - # rsvp.limitDuration == ldur - # rsvp.limitData == ldata - # - # asyncTest "Reservation ttl expires": - # await sleepAsync(chronos.timer.seconds(ttl + 1)) - # rsvp = await cl1.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) - # range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds - # check: - # rsvp.expire.int64.fromUnix.utc in range - # rsvp.limitDuration == ldur - # rsvp.limitData == ldata - # - # asyncTest "Reservation over relay": - # let - # rv2add = Relay.new() - # addrs = @[ MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" & - # $rel.peerInfo.peerId & "/p2p-circuit").get() ] - # rv2add.setup(src2) - # await rv2add.start() - # src2.mount(rv2add) - # rv2.maxCircuit.inc() - # - # rsvp = await cl2.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) - # range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds - # check: - # rsvp.expire.int64.fromUnix.utc in range - # rsvp.limitDuration == ldur - # rsvp.limitData == ldata - # expect(ReservationError): - # discard await cl1.reserve(src2.peerInfo.peerId, addrs) - - suite "Connection": - for useYamux in [false, true]: + suite "Reservation": + asyncTeardown: + await allFutures(src1.stop(), src2.stop(), rel.stop()) + checkTrackers() + var + ttl {.threadvar.}: int + ldur {.threadvar.}: uint32 + ldata {.threadvar.}: uint64 + cl1 {.threadvar.}: RelayClient + cl2 {.threadvar.}: RelayClient + rv2 {.threadvar.}: Relay + src1 {.threadvar.}: Switch + src2 {.threadvar.}: Switch + rel {.threadvar.}: Switch + rsvp {.threadvar.}: Rsvp + range {.threadvar.}: HSlice[times.DateTime, times.DateTime] + + asyncSetup: + ttl = 3 + ldur = 60 + ldata = 2048 + cl1 = RelayClient.new() + cl2 = RelayClient.new() + rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl), + limitDuration=ldur, + limitData=ldata, + maxCircuit=1) + src1 = createSwitch(cl1) + src2 = createSwitch(cl2) + rel = createSwitch(rv2) + + await src1.start() + await src2.start() + await rel.start() + await src1.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) + await src2.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) + rsvp = await cl1.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) + range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds + check: + rsvp.expire.int64.fromUnix.utc in range + rsvp.limitDuration == ldur + rsvp.limitData == ldata + + asyncTest "Too many reservations": + let conn = await cl2.switch.dial(rel.peerInfo.peerId, rel.peerInfo.addrs, RelayV2HopCodec) + let pb = encode(HopMessage(msgType: HopMessageType.Reserve)) + await conn.writeLp(pb.buffer) + let msg = HopMessage.decode(await conn.readLp(RelayMsgSize)).get() + check: + msg.msgType == HopMessageType.Status + msg.status == Opt.some(StatusV2.ReservationRefused) + + asyncTest "Too many reservations + Reconnect": + expect(ReservationError): + discard await cl2.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) + await rel.disconnect(src1.peerInfo.peerId) + rsvp = await cl2.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) + range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds + check: + rsvp.expire.int64.fromUnix.utc in range + rsvp.limitDuration == ldur + rsvp.limitData == ldata + + asyncTest "Reservation ttl expires": + await sleepAsync(chronos.timer.seconds(ttl + 1)) + rsvp = await cl1.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) + range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds + check: + rsvp.expire.int64.fromUnix.utc in range + rsvp.limitDuration == ldur + rsvp.limitData == ldata + + asyncTest "Reservation over relay": + let + rv2add = Relay.new() + addrs = @[ MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" & + $rel.peerInfo.peerId & "/p2p-circuit").get() ] + rv2add.setup(src2) + await rv2add.start() + src2.mount(rv2add) + rv2.maxCircuit.inc() + + rsvp = await cl2.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) + range = now().utc + (ttl-3).seconds..now().utc + (ttl+3).seconds + check: + rsvp.expire.int64.fromUnix.utc in range + rsvp.limitDuration == ldur + rsvp.limitData == ldata + expect(ReservationError): + discard await cl1.reserve(src2.peerInfo.peerId, addrs) + + for (useYamux, muxName) in [(false, "Mplex"), (true, "Yamux")]: + suite "Circuit Relay V2 Connection using " & muxName: asyncTeardown: checkTrackers() var @@ -329,7 +329,7 @@ suite "Circuit Relay V2": raise newException(CatchableError, "Should not be here") let rel2Cl = RelayClient.new(canHop = true) - rel2 = createSwitch(rel2Cl) + rel2 = createSwitch(rel2Cl, useYamux) rv2 = Relay.new() rv2.setup(rel) rel.mount(rv2) @@ -354,7 +354,8 @@ suite "Circuit Relay V2": expect(DialFailedError): conn = await src.dial(dst.peerInfo.peerId, addrs, customProtoCodec) - await allFutures(conn.close()) + if not conn.isNil(): + await allFutures(conn.close()) await allFutures(src.stop(), dst.stop(), rel.stop(), rel2.stop()) asyncTest "Connection using ClientRelay": @@ -388,9 +389,9 @@ suite "Circuit Relay V2": clientA = RelayClient.new(canHop = true) clientB = RelayClient.new(canHop = true) clientC = RelayClient.new(canHop = true) - switchA = createSwitch(clientA) - switchB = createSwitch(clientB) - switchC = createSwitch(clientC) + switchA = createSwitch(clientA, useYamux) + switchB = createSwitch(clientB, useYamux) + switchC = createSwitch(clientC, useYamux) switchA.mount(protoBCA) switchB.mount(protoCAB) From 1b001c02bf0d2d665e572f05134c3523a0e7f7dc Mon Sep 17 00:00:00 2001 From: Diego Date: Tue, 21 Nov 2023 14:44:43 +0100 Subject: [PATCH 4/4] remove yamux from newStandardSwitch --- libp2p/builders.nim | 1 - tests/testrelayv2.nim | 8 +++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/libp2p/builders.nim b/libp2p/builders.nim index 6174f51d47..63a216d46d 100644 --- a/libp2p/builders.nim +++ b/libp2p/builders.nim @@ -322,7 +322,6 @@ proc newStandardSwitch*( .withMaxConnsPerPeer(maxConnsPerPeer) .withPeerStore(capacity=peerStoreCapacity) .withMplex(inTimeout, outTimeout) - .withYamux() .withTcpTransport(transportFlags) .withNameResolver(nameResolver) .withNoise() diff --git a/tests/testrelayv2.nim b/tests/testrelayv2.nim index b928680311..83eb994713 100644 --- a/tests/testrelayv2.nim +++ b/tests/testrelayv2.nim @@ -19,7 +19,7 @@ import ./helpers import std/times import stew/byteutils -proc createSwitch(r: Relay, useYamux: bool = false): Switch = +proc createSwitch(r: Relay = nil, useYamux: bool = false): Switch = var builder = SwitchBuilder.new() .withRng(newRng()) .withAddresses(@[ MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() ]) @@ -30,9 +30,11 @@ proc createSwitch(r: Relay, useYamux: bool = false): Switch = else: builder = builder.withMplex() + if r != nil: + builder = builder.withCircuitRelay(r) + return builder .withNoise() - .withCircuitRelay(r) .build() suite "Circuit Relay V2": @@ -158,7 +160,7 @@ suite "Circuit Relay V2": dstCl = RelayClient.new() src = createSwitch(srcCl, useYamux) dst = createSwitch(dstCl, useYamux) - rel = newStandardSwitch() + rel = createSwitch(nil, useYamux) asyncTest "Connection succeed": proto.handler = proc(conn: Connection, proto: string) {.async.} =