From 704c186a59027ad1e4dd0ff70a856f46aa3ae48d Mon Sep 17 00:00:00 2001 From: diegomrsantos Date: Tue, 21 Nov 2023 16:03:29 +0100 Subject: [PATCH] fix(yamux): doesn't work in a Relayv2 connection (#979) Co-authored-by: Ludovic Chenut --- libp2p/muxers/yamux/yamux.nim | 3 + libp2p/protocols/connectivity/relay/rconn.nim | 1 + tests/testrelayv2.nim | 606 +++++++++--------- 3 files changed, 312 insertions(+), 298 deletions(-) diff --git a/libp2p/muxers/yamux/yamux.nim b/libp2p/muxers/yamux/yamux.nim index e71772da1b..a50f9cf0da 100644 --- a/libp2p/muxers/yamux/yamux.nim +++ b/libp2p/muxers/yamux/yamux.nim @@ -186,6 +186,7 @@ proc remoteClosed(channel: YamuxChannel) {.async.} = method closeImpl*(channel: YamuxChannel) {.async, gcsafe.} = if not channel.closedLocally: channel.closedLocally = true + channel.isEof = true if channel.isReset == false and channel.sendQueue.len == 0: await channel.conn.write(YamuxHeader.data(channel.id, 0, {Fin})) @@ -249,6 +250,7 @@ method readOnce*( await channel.closedRemotely or channel.receivedData.wait() if channel.closedRemotely.done() and channel.recvQueue.len == 0: channel.returnedEof = true + channel.isEof = true return 0 let toRead = min(channel.recvQueue.len, nbytes) @@ -454,6 +456,7 @@ method handle*(m: Yamux) {.async, gcsafe.} = if header.streamId in m.flushed: m.flushed.del(header.streamId) if header.streamId mod 2 == m.currentId mod 2: + debug "Peer used our reserved stream id, skipping", id=header.streamId, currentId=m.currentId, peerId=m.connection.peerId raise newException(YamuxError, "Peer used our reserved stream id") let newStream = m.createStream(header.streamId, false) if m.channels.len >= m.maxChannCount: diff --git a/libp2p/protocols/connectivity/relay/rconn.nim b/libp2p/protocols/connectivity/relay/rconn.nim index 1856afe706..4f2732aac2 100644 --- a/libp2p/protocols/connectivity/relay/rconn.nim +++ b/libp2p/protocols/connectivity/relay/rconn.nim @@ -47,6 +47,7 @@ proc new*( limitDuration: uint32, limitData: uint64): T = let rc = T(conn: conn, limitDuration: limitDuration, limitData: limitData) + rc.dir = conn.dir rc.initStream() if limitDuration > 0: proc checkDurationConnection() {.async.} = diff --git a/tests/testrelayv2.nim b/tests/testrelayv2.nim index 6802e8d0ef..83eb994713 100644 --- a/tests/testrelayv2.nim +++ b/tests/testrelayv2.nim @@ -19,14 +19,22 @@ import ./helpers import std/times import stew/byteutils -proc createSwitch(r: Relay): Switch = - result = SwitchBuilder.new() +proc createSwitch(r: Relay = nil, useYamux: bool = false): Switch = + var builder = SwitchBuilder.new() .withRng(newRng()) .withAddresses(@[ MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() ]) .withTcpTransport() - .withMplex() + + if useYamux: + builder = builder.withYamux() + else: + builder = builder.withMplex() + + if r != nil: + builder = builder.withCircuitRelay(r) + + return builder .withNoise() - .withCircuitRelay(r) .build() suite "Circuit Relay V2": @@ -122,308 +130,310 @@ suite "Circuit Relay V2": expect(ReservationError): discard await cl1.reserve(src2.peerInfo.peerId, addrs) - suite "Connection": - asyncTeardown: - checkTrackers() - var - customProtoCodec {.threadvar.}: string - proto {.threadvar.}: LPProtocol - ttl {.threadvar.}: int - ldur {.threadvar.}: uint32 - ldata {.threadvar.}: uint64 - srcCl {.threadvar.}: RelayClient - dstCl {.threadvar.}: RelayClient - rv2 {.threadvar.}: Relay - src {.threadvar.}: Switch - dst {.threadvar.}: Switch - rel {.threadvar.}: Switch - rsvp {.threadvar.}: Rsvp - conn {.threadvar.}: Connection - - asyncSetup: - customProtoCodec = "/test" - proto = new LPProtocol - proto.codec = customProtoCodec - ttl = 60 - ldur = 120 - ldata = 16384 - srcCl = RelayClient.new() - dstCl = RelayClient.new() - src = createSwitch(srcCl) - dst = createSwitch(dstCl) - rel = newStandardSwitch() - - asyncTest "Connection succeed": - proto.handler = proc(conn: Connection, proto: string) {.async.} = - check: "test1" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("test2") - check: "test3" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("test4") - await conn.close() - rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl), - limitDuration=ldur, - limitData=ldata) - rv2.setup(rel) - rel.mount(rv2) - dst.mount(proto) - - await rel.start() - await src.start() - await dst.start() - - let addrs = MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" & - $rel.peerInfo.peerId & "/p2p-circuit").get() - - await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - await dst.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - - rsvp = await dstCl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) - - conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec) - await conn.writeLp("test1") - check: "test2" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("test3") - check: "test4" == string.fromBytes(await conn.readLp(1024)) - await allFutures(conn.close()) - await allFutures(src.stop(), dst.stop(), rel.stop()) - - asyncTest "Connection duration exceeded": - ldur = 3 - proto.handler = proc(conn: Connection, proto: string) {.async.} = - check "wanna sleep?" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("yeah!") - check "go!" == string.fromBytes(await conn.readLp(1024)) - await sleepAsync(chronos.timer.seconds(ldur + 1)) - await conn.writeLp("that was a cool power nap") - await conn.close() - rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl), - limitDuration=ldur, - limitData=ldata) - rv2.setup(rel) - rel.mount(rv2) - dst.mount(proto) - - await rel.start() - await src.start() - await dst.start() - - let addrs = MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" & - $rel.peerInfo.peerId & "/p2p-circuit").get() - - await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - await dst.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - - rsvp = await dstCl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) - conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec) - await conn.writeLp("wanna sleep?") - check: "yeah!" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("go!") - expect(LPStreamEOFError): - discard await conn.readLp(1024) - await allFutures(conn.close()) - await allFutures(src.stop(), dst.stop(), rel.stop()) - - asyncTest "Connection data exceeded": - ldata = 1000 - proto.handler = proc(conn: Connection, proto: string) {.async.} = - check "count me the better story you know" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("do you expect a lorem ipsum or...?") - check "surprise me!" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("""Call me Ishmael. Some years ago--never mind how long -precisely--having little or no money in my purse, and nothing -particular to interest me on shore, I thought I would sail about a -little and see the watery part of the world. It is a way I have of -driving off the spleen and regulating the circulation. Whenever I -find myself growing grim about the mouth; whenever it is a damp, -drizzly November in my soul; whenever I find myself involuntarily -pausing before coffin warehouses, and bringing up the rear of every -funeral I meet; and especially whenever my hypos get such an upper -hand of me, that it requires a strong moral principle to prevent me -from deliberately stepping into the street, and methodically knocking -people's hats off--then, I account it high time to get to sea as soon -as I can. This is my substitute for pistol and ball. With a -philosophical flourish Cato throws himself upon his sword; I quietly -take to the ship.""") - rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl), - limitDuration=ldur, - limitData=ldata) - rv2.setup(rel) - rel.mount(rv2) - dst.mount(proto) - - await rel.start() - await src.start() - await dst.start() - - let addrs = MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" & - $rel.peerInfo.peerId & "/p2p-circuit").get() - - await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - await dst.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - - rsvp = await dstCl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) - conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec) - await conn.writeLp("count me the better story you know") - check: "do you expect a lorem ipsum or...?" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("surprise me!") - expect(LPStreamEOFError): - discard await conn.readLp(1024) - await allFutures(conn.close()) - await allFutures(src.stop(), dst.stop(), rel.stop()) - - asyncTest "Reservation ttl expire during connection": - ttl = 3 - proto.handler = proc(conn: Connection, proto: string) {.async.} = - check: "test1" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("test2") - check: "test3" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("test4") - await conn.close() - rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl), - limitDuration=ldur, - limitData=ldata) - rv2.setup(rel) - rel.mount(rv2) - dst.mount(proto) - - await rel.start() - await src.start() - await dst.start() - - let addrs = MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" & - $rel.peerInfo.peerId & "/p2p-circuit").get() - - await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - await dst.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - - rsvp = await dstCl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) - conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec) - await conn.writeLp("test1") - check: "test2" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("test3") - check: "test4" == string.fromBytes(await conn.readLp(1024)) - await src.disconnect(rel.peerInfo.peerId) - await sleepAsync(chronos.timer.seconds(ttl + 1)) + for (useYamux, muxName) in [(false, "Mplex"), (true, "Yamux")]: + suite "Circuit Relay V2 Connection using " & muxName: + asyncTeardown: + checkTrackers() + var + customProtoCodec {.threadvar.}: string + proto {.threadvar.}: LPProtocol + ttl {.threadvar.}: int + ldur {.threadvar.}: uint32 + ldata {.threadvar.}: uint64 + srcCl {.threadvar.}: RelayClient + dstCl {.threadvar.}: RelayClient + rv2 {.threadvar.}: Relay + src {.threadvar.}: Switch + dst {.threadvar.}: Switch + rel {.threadvar.}: Switch + rsvp {.threadvar.}: Rsvp + conn {.threadvar.}: Connection + + asyncSetup: + customProtoCodec = "/test" + proto = new LPProtocol + proto.codec = customProtoCodec + ttl = 60 + ldur = 120 + ldata = 16384 + srcCl = RelayClient.new() + dstCl = RelayClient.new() + src = createSwitch(srcCl, useYamux) + dst = createSwitch(dstCl, useYamux) + rel = createSwitch(nil, useYamux) + + asyncTest "Connection succeed": + proto.handler = proc(conn: Connection, proto: string) {.async.} = + check: "test1" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("test2") + check: "test3" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("test4") + await conn.close() + rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl), + limitDuration=ldur, + limitData=ldata) + rv2.setup(rel) + rel.mount(rv2) + dst.mount(proto) + + await rel.start() + await src.start() + await dst.start() + + let addrs = MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" & + $rel.peerInfo.peerId & "/p2p-circuit").get() - expect(DialFailedError): - check: conn.atEof() - await conn.close() await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec) - await allFutures(conn.close()) - await allFutures(src.stop(), dst.stop(), rel.stop()) - - asyncTest "Connection over relay": - # src => rel => rel2 => dst - # rel2 reserve rel - # dst reserve rel2 - # src try to connect with dst - proto.handler = proc(conn: Connection, proto: string) {.async.} = - raise newException(CatchableError, "Should not be here") - let - rel2Cl = RelayClient.new(canHop = true) - rel2 = createSwitch(rel2Cl) - rv2 = Relay.new() - rv2.setup(rel) - rel.mount(rv2) - dst.mount(proto) - await rel.start() - await rel2.start() - await src.start() - await dst.start() + await dst.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - let - addrs = @[ MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" & - $rel.peerInfo.peerId & "/p2p-circuit/p2p/" & - $rel2.peerInfo.peerId & "/p2p/" & - $rel2.peerInfo.peerId & "/p2p-circuit").get() ] + rsvp = await dstCl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) - await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - await rel2.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - await dst.connect(rel2.peerInfo.peerId, rel2.peerInfo.addrs) + conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec) + await conn.writeLp("test1") + check: "test2" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("test3") + check: "test4" == string.fromBytes(await conn.readLp(1024)) + await allFutures(conn.close()) + await allFutures(src.stop(), dst.stop(), rel.stop()) + + asyncTest "Connection duration exceeded": + ldur = 3 + proto.handler = proc(conn: Connection, proto: string) {.async.} = + check "wanna sleep?" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("yeah!") + check "go!" == string.fromBytes(await conn.readLp(1024)) + await sleepAsync(chronos.timer.seconds(ldur + 1)) + await conn.writeLp("that was a cool power nap") + await conn.close() + rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl), + limitDuration=ldur, + limitData=ldata) + rv2.setup(rel) + rel.mount(rv2) + dst.mount(proto) + + await rel.start() + await src.start() + await dst.start() + + let addrs = MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" & + $rel.peerInfo.peerId & "/p2p-circuit").get() - rsvp = await rel2Cl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) - let rsvp2 = await dstCl.reserve(rel2.peerInfo.peerId, rel2.peerInfo.addrs) + await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) + await dst.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - expect(DialFailedError): - conn = await src.dial(dst.peerInfo.peerId, addrs, customProtoCodec) - await allFutures(conn.close()) - await allFutures(src.stop(), dst.stop(), rel.stop(), rel2.stop()) + rsvp = await dstCl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) + conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec) + await conn.writeLp("wanna sleep?") + check: "yeah!" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("go!") + expect(LPStreamEOFError): + discard await conn.readLp(1024) + await allFutures(conn.close()) + await allFutures(src.stop(), dst.stop(), rel.stop()) + + asyncTest "Connection data exceeded": + ldata = 1000 + proto.handler = proc(conn: Connection, proto: string) {.async.} = + check "count me the better story you know" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("do you expect a lorem ipsum or...?") + check "surprise me!" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("""Call me Ishmael. Some years ago--never mind how long + precisely--having little or no money in my purse, and nothing + particular to interest me on shore, I thought I would sail about a + little and see the watery part of the world. It is a way I have of + driving off the spleen and regulating the circulation. Whenever I + find myself growing grim about the mouth; whenever it is a damp, + drizzly November in my soul; whenever I find myself involuntarily + pausing before coffin warehouses, and bringing up the rear of every + funeral I meet; and especially whenever my hypos get such an upper + hand of me, that it requires a strong moral principle to prevent me + from deliberately stepping into the street, and methodically knocking + people's hats off--then, I account it high time to get to sea as soon + as I can. This is my substitute for pistol and ball. With a + philosophical flourish Cato throws himself upon his sword; I quietly + take to the ship.""") + rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl), + limitDuration=ldur, + limitData=ldata) + rv2.setup(rel) + rel.mount(rv2) + dst.mount(proto) + + await rel.start() + await src.start() + await dst.start() + + let addrs = MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" & + $rel.peerInfo.peerId & "/p2p-circuit").get() - asyncTest "Connection using ClientRelay": - var - protoABC = new LPProtocol - protoBCA = new LPProtocol - protoCAB = new LPProtocol - protoABC.codec = "/abctest" - protoABC.handler = proc(conn: Connection, proto: string) {.async.} = - check: "testABC1" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("testABC2") - check: "testABC3" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("testABC4") - await conn.close() - protoBCA.codec = "/bcatest" - protoBCA.handler = proc(conn: Connection, proto: string) {.async.} = - check: "testBCA1" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("testBCA2") - check: "testBCA3" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("testBCA4") - await conn.close() - protoCAB.codec = "/cabtest" - protoCAB.handler = proc(conn: Connection, proto: string) {.async.} = - check: "testCAB1" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("testCAB2") - check: "testCAB3" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("testCAB4") - await conn.close() + await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) + await dst.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - let - clientA = RelayClient.new(canHop = true) - clientB = RelayClient.new(canHop = true) - clientC = RelayClient.new(canHop = true) - switchA = createSwitch(clientA) - switchB = createSwitch(clientB) - switchC = createSwitch(clientC) + rsvp = await dstCl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) + conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec) + await conn.writeLp("count me the better story you know") + check: "do you expect a lorem ipsum or...?" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("surprise me!") + expect(LPStreamEOFError): + discard await conn.readLp(1024) + await allFutures(conn.close()) + await allFutures(src.stop(), dst.stop(), rel.stop()) + + asyncTest "Reservation ttl expire during connection": + ttl = 3 + proto.handler = proc(conn: Connection, proto: string) {.async.} = + check: "test1" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("test2") + check: "test3" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("test4") + await conn.close() + rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl), + limitDuration=ldur, + limitData=ldata) + rv2.setup(rel) + rel.mount(rv2) + dst.mount(proto) + + await rel.start() + await src.start() + await dst.start() + + let addrs = MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" & + $rel.peerInfo.peerId & "/p2p-circuit").get() - switchA.mount(protoBCA) - switchB.mount(protoCAB) - switchC.mount(protoABC) + await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) + await dst.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - await switchA.start() - await switchB.start() - await switchC.start() + rsvp = await dstCl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) + conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec) + await conn.writeLp("test1") + check: "test2" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("test3") + check: "test4" == string.fromBytes(await conn.readLp(1024)) + await src.disconnect(rel.peerInfo.peerId) + await sleepAsync(chronos.timer.seconds(ttl + 1)) + + expect(DialFailedError): + check: conn.atEof() + await conn.close() + await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) + conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec) + await allFutures(conn.close()) + await allFutures(src.stop(), dst.stop(), rel.stop()) + + asyncTest "Connection over relay": + # src => rel => rel2 => dst + # rel2 reserve rel + # dst reserve rel2 + # src try to connect with dst + proto.handler = proc(conn: Connection, proto: string) {.async.} = + raise newException(CatchableError, "Should not be here") + let + rel2Cl = RelayClient.new(canHop = true) + rel2 = createSwitch(rel2Cl, useYamux) + rv2 = Relay.new() + rv2.setup(rel) + rel.mount(rv2) + dst.mount(proto) + await rel.start() + await rel2.start() + await src.start() + await dst.start() + + let + addrs = @[ MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" & + $rel.peerInfo.peerId & "/p2p-circuit/p2p/" & + $rel2.peerInfo.peerId & "/p2p/" & + $rel2.peerInfo.peerId & "/p2p-circuit").get() ] - let - addrsABC = MultiAddress.init($switchB.peerInfo.addrs[0] & "/p2p/" & - $switchB.peerInfo.peerId & "/p2p-circuit").get() - addrsBCA = MultiAddress.init($switchC.peerInfo.addrs[0] & "/p2p/" & - $switchC.peerInfo.peerId & "/p2p-circuit").get() - addrsCAB = MultiAddress.init($switchA.peerInfo.addrs[0] & "/p2p/" & - $switchA.peerInfo.peerId & "/p2p-circuit").get() - - await switchA.connect(switchB.peerInfo.peerId, switchB.peerInfo.addrs) - await switchB.connect(switchC.peerInfo.peerId, switchC.peerInfo.addrs) - await switchC.connect(switchA.peerInfo.peerId, switchA.peerInfo.addrs) - let rsvpABC = await clientA.reserve(switchC.peerInfo.peerId, switchC.peerInfo.addrs) - let rsvpBCA = await clientB.reserve(switchA.peerInfo.peerId, switchA.peerInfo.addrs) - let rsvpCAB = await clientC.reserve(switchB.peerInfo.peerId, switchB.peerInfo.addrs) - let connABC = await switchA.dial(switchC.peerInfo.peerId, @[ addrsABC ], "/abctest") - let connBCA = await switchB.dial(switchA.peerInfo.peerId, @[ addrsBCA ], "/bcatest") - let connCAB = await switchC.dial(switchB.peerInfo.peerId, @[ addrsCAB ], "/cabtest") - - await connABC.writeLp("testABC1") - await connBCA.writeLp("testBCA1") - await connCAB.writeLp("testCAB1") - check: - "testABC2" == string.fromBytes(await connABC.readLp(1024)) - "testBCA2" == string.fromBytes(await connBCA.readLp(1024)) - "testCAB2" == string.fromBytes(await connCAB.readLp(1024)) - await connABC.writeLp("testABC3") - await connBCA.writeLp("testBCA3") - await connCAB.writeLp("testCAB3") - check: - "testABC4" == string.fromBytes(await connABC.readLp(1024)) - "testBCA4" == string.fromBytes(await connBCA.readLp(1024)) - "testCAB4" == string.fromBytes(await connCAB.readLp(1024)) - await allFutures(connABC.close(), connBCA.close(), connCAB.close()) - await allFutures(switchA.stop(), switchB.stop(), switchC.stop()) + await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) + await rel2.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) + await dst.connect(rel2.peerInfo.peerId, rel2.peerInfo.addrs) + + rsvp = await rel2Cl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs) + let rsvp2 = await dstCl.reserve(rel2.peerInfo.peerId, rel2.peerInfo.addrs) + + expect(DialFailedError): + conn = await src.dial(dst.peerInfo.peerId, addrs, customProtoCodec) + if not conn.isNil(): + await allFutures(conn.close()) + await allFutures(src.stop(), dst.stop(), rel.stop(), rel2.stop()) + + asyncTest "Connection using ClientRelay": + var + protoABC = new LPProtocol + protoBCA = new LPProtocol + protoCAB = new LPProtocol + protoABC.codec = "/abctest" + protoABC.handler = proc(conn: Connection, proto: string) {.async.} = + check: "testABC1" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("testABC2") + check: "testABC3" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("testABC4") + await conn.close() + protoBCA.codec = "/bcatest" + protoBCA.handler = proc(conn: Connection, proto: string) {.async.} = + check: "testBCA1" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("testBCA2") + check: "testBCA3" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("testBCA4") + await conn.close() + protoCAB.codec = "/cabtest" + protoCAB.handler = proc(conn: Connection, proto: string) {.async.} = + check: "testCAB1" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("testCAB2") + check: "testCAB3" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("testCAB4") + await conn.close() + + let + clientA = RelayClient.new(canHop = true) + clientB = RelayClient.new(canHop = true) + clientC = RelayClient.new(canHop = true) + switchA = createSwitch(clientA, useYamux) + switchB = createSwitch(clientB, useYamux) + switchC = createSwitch(clientC, useYamux) + + switchA.mount(protoBCA) + switchB.mount(protoCAB) + switchC.mount(protoABC) + + await switchA.start() + await switchB.start() + await switchC.start() + + let + addrsABC = MultiAddress.init($switchB.peerInfo.addrs[0] & "/p2p/" & + $switchB.peerInfo.peerId & "/p2p-circuit").get() + addrsBCA = MultiAddress.init($switchC.peerInfo.addrs[0] & "/p2p/" & + $switchC.peerInfo.peerId & "/p2p-circuit").get() + addrsCAB = MultiAddress.init($switchA.peerInfo.addrs[0] & "/p2p/" & + $switchA.peerInfo.peerId & "/p2p-circuit").get() + + await switchA.connect(switchB.peerInfo.peerId, switchB.peerInfo.addrs) + await switchB.connect(switchC.peerInfo.peerId, switchC.peerInfo.addrs) + await switchC.connect(switchA.peerInfo.peerId, switchA.peerInfo.addrs) + let rsvpABC = await clientA.reserve(switchC.peerInfo.peerId, switchC.peerInfo.addrs) + let rsvpBCA = await clientB.reserve(switchA.peerInfo.peerId, switchA.peerInfo.addrs) + let rsvpCAB = await clientC.reserve(switchB.peerInfo.peerId, switchB.peerInfo.addrs) + let connABC = await switchA.dial(switchC.peerInfo.peerId, @[ addrsABC ], "/abctest") + let connBCA = await switchB.dial(switchA.peerInfo.peerId, @[ addrsBCA ], "/bcatest") + let connCAB = await switchC.dial(switchB.peerInfo.peerId, @[ addrsCAB ], "/cabtest") + + await connABC.writeLp("testABC1") + await connBCA.writeLp("testBCA1") + await connCAB.writeLp("testCAB1") + check: + "testABC2" == string.fromBytes(await connABC.readLp(1024)) + "testBCA2" == string.fromBytes(await connBCA.readLp(1024)) + "testCAB2" == string.fromBytes(await connCAB.readLp(1024)) + await connABC.writeLp("testABC3") + await connBCA.writeLp("testBCA3") + await connCAB.writeLp("testCAB3") + check: + "testABC4" == string.fromBytes(await connABC.readLp(1024)) + "testBCA4" == string.fromBytes(await connBCA.readLp(1024)) + "testCAB4" == string.fromBytes(await connCAB.readLp(1024)) + await allFutures(connABC.close(), connBCA.close(), connCAB.close()) + await allFutures(switchA.stop(), switchB.stop(), switchC.stop())