From 1f4b090227ddca5a0a2b0b00ca6408d043584ae8 Mon Sep 17 00:00:00 2001
From: diegomrsantos <diego@status.im>
Date: Tue, 21 Nov 2023 16:03:29 +0100
Subject: [PATCH] fix(yamux): doesn't work in a Relayv2 connection (#979)

Co-authored-by: Ludovic Chenut <ludovic@status.im>
---
 libp2p/muxers/yamux/yamux.nim                 |   3 +
 libp2p/protocols/connectivity/relay/rconn.nim |   1 +
 tests/testrelayv2.nim                         | 606 +++++++++---------
 3 files changed, 312 insertions(+), 298 deletions(-)

diff --git a/libp2p/muxers/yamux/yamux.nim b/libp2p/muxers/yamux/yamux.nim
index e71772da1b..a50f9cf0da 100644
--- a/libp2p/muxers/yamux/yamux.nim
+++ b/libp2p/muxers/yamux/yamux.nim
@@ -186,6 +186,7 @@ proc remoteClosed(channel: YamuxChannel) {.async.} =
 method closeImpl*(channel: YamuxChannel) {.async, gcsafe.} =
   if not channel.closedLocally:
     channel.closedLocally = true
+    channel.isEof = true
 
     if channel.isReset == false and channel.sendQueue.len == 0:
       await channel.conn.write(YamuxHeader.data(channel.id, 0, {Fin}))
@@ -249,6 +250,7 @@ method readOnce*(
     await channel.closedRemotely or channel.receivedData.wait()
     if channel.closedRemotely.done() and channel.recvQueue.len == 0:
       channel.returnedEof = true
+      channel.isEof = true
       return 0
 
   let toRead = min(channel.recvQueue.len, nbytes)
@@ -454,6 +456,7 @@ method handle*(m: Yamux) {.async, gcsafe.} =
             if header.streamId in m.flushed:
               m.flushed.del(header.streamId)
             if header.streamId mod 2 == m.currentId mod 2:
+              debug "Peer used our reserved stream id, skipping", id=header.streamId, currentId=m.currentId, peerId=m.connection.peerId
               raise newException(YamuxError, "Peer used our reserved stream id")
             let newStream = m.createStream(header.streamId, false)
             if m.channels.len >= m.maxChannCount:
diff --git a/libp2p/protocols/connectivity/relay/rconn.nim b/libp2p/protocols/connectivity/relay/rconn.nim
index 1856afe706..4f2732aac2 100644
--- a/libp2p/protocols/connectivity/relay/rconn.nim
+++ b/libp2p/protocols/connectivity/relay/rconn.nim
@@ -47,6 +47,7 @@ proc new*(
   limitDuration: uint32,
   limitData: uint64): T =
   let rc = T(conn: conn, limitDuration: limitDuration, limitData: limitData)
+  rc.dir = conn.dir
   rc.initStream()
   if limitDuration > 0:
     proc checkDurationConnection() {.async.} =
diff --git a/tests/testrelayv2.nim b/tests/testrelayv2.nim
index 6802e8d0ef..83eb994713 100644
--- a/tests/testrelayv2.nim
+++ b/tests/testrelayv2.nim
@@ -19,14 +19,22 @@ import ./helpers
 import std/times
 import stew/byteutils
 
-proc createSwitch(r: Relay): Switch =
-  result = SwitchBuilder.new()
+proc createSwitch(r: Relay = nil, useYamux: bool = false): Switch =
+  var builder = SwitchBuilder.new()
     .withRng(newRng())
     .withAddresses(@[ MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() ])
     .withTcpTransport()
-    .withMplex()
+
+  if useYamux:
+    builder = builder.withYamux()
+  else:
+    builder = builder.withMplex()
+
+  if r != nil:
+    builder = builder.withCircuitRelay(r)
+
+  return builder
     .withNoise()
-    .withCircuitRelay(r)
     .build()
 
 suite "Circuit Relay V2":
@@ -122,308 +130,310 @@ suite "Circuit Relay V2":
       expect(ReservationError):
         discard await cl1.reserve(src2.peerInfo.peerId, addrs)
 
-  suite "Connection":
-    asyncTeardown:
-      checkTrackers()
-    var
-      customProtoCodec {.threadvar.}: string
-      proto {.threadvar.}: LPProtocol
-      ttl {.threadvar.}: int
-      ldur {.threadvar.}: uint32
-      ldata {.threadvar.}: uint64
-      srcCl {.threadvar.}: RelayClient
-      dstCl {.threadvar.}: RelayClient
-      rv2 {.threadvar.}: Relay
-      src {.threadvar.}: Switch
-      dst {.threadvar.}: Switch
-      rel {.threadvar.}: Switch
-      rsvp {.threadvar.}: Rsvp
-      conn {.threadvar.}: Connection
-
-    asyncSetup:
-      customProtoCodec = "/test"
-      proto = new LPProtocol
-      proto.codec = customProtoCodec
-      ttl = 60
-      ldur = 120
-      ldata = 16384
-      srcCl = RelayClient.new()
-      dstCl = RelayClient.new()
-      src = createSwitch(srcCl)
-      dst = createSwitch(dstCl)
-      rel = newStandardSwitch()
-
-    asyncTest "Connection succeed":
-      proto.handler = proc(conn: Connection, proto: string) {.async.} =
-        check: "test1" == string.fromBytes(await conn.readLp(1024))
-        await conn.writeLp("test2")
-        check: "test3" == string.fromBytes(await conn.readLp(1024))
-        await conn.writeLp("test4")
-        await conn.close()
-      rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl),
-                      limitDuration=ldur,
-                      limitData=ldata)
-      rv2.setup(rel)
-      rel.mount(rv2)
-      dst.mount(proto)
-
-      await rel.start()
-      await src.start()
-      await dst.start()
-
-      let addrs = MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" &
-                                $rel.peerInfo.peerId & "/p2p-circuit").get()
-
-      await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
-      await dst.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
-
-      rsvp = await dstCl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
-
-      conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec)
-      await conn.writeLp("test1")
-      check: "test2" == string.fromBytes(await conn.readLp(1024))
-      await conn.writeLp("test3")
-      check: "test4" == string.fromBytes(await conn.readLp(1024))
-      await allFutures(conn.close())
-      await allFutures(src.stop(), dst.stop(), rel.stop())
-
-    asyncTest "Connection duration exceeded":
-      ldur = 3
-      proto.handler = proc(conn: Connection, proto: string) {.async.} =
-        check "wanna sleep?" == string.fromBytes(await conn.readLp(1024))
-        await conn.writeLp("yeah!")
-        check "go!" == string.fromBytes(await conn.readLp(1024))
-        await sleepAsync(chronos.timer.seconds(ldur + 1))
-        await conn.writeLp("that was a cool power nap")
-        await conn.close()
-      rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl),
-                      limitDuration=ldur,
-                      limitData=ldata)
-      rv2.setup(rel)
-      rel.mount(rv2)
-      dst.mount(proto)
-
-      await rel.start()
-      await src.start()
-      await dst.start()
-
-      let addrs = MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" &
-                                $rel.peerInfo.peerId & "/p2p-circuit").get()
-
-      await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
-      await dst.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
-
-      rsvp = await dstCl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
-      conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec)
-      await conn.writeLp("wanna sleep?")
-      check: "yeah!" == string.fromBytes(await conn.readLp(1024))
-      await conn.writeLp("go!")
-      expect(LPStreamEOFError):
-        discard await conn.readLp(1024)
-      await allFutures(conn.close())
-      await allFutures(src.stop(), dst.stop(), rel.stop())
-
-    asyncTest "Connection data exceeded":
-      ldata = 1000
-      proto.handler = proc(conn: Connection, proto: string) {.async.} =
-        check "count me the better story you know" == string.fromBytes(await conn.readLp(1024))
-        await conn.writeLp("do you expect a lorem ipsum or...?")
-        check "surprise me!" == string.fromBytes(await conn.readLp(1024))
-        await conn.writeLp("""Call me Ishmael. Some years ago--never mind how long
-precisely--having little or no money in my purse, and nothing
-particular to interest me on shore, I thought I would sail about a
-little and see the watery part of the world. It is a way I have of
-driving off the spleen and regulating the circulation. Whenever I
-find myself growing grim about the mouth; whenever it is a damp,
-drizzly November in my soul; whenever I find myself involuntarily
-pausing before coffin warehouses, and bringing up the rear of every
-funeral I meet; and especially whenever my hypos get such an upper
-hand of me, that it requires a strong moral principle to prevent me
-from deliberately stepping into the street, and methodically knocking
-people's hats off--then, I account it high time to get to sea as soon
-as I can. This is my substitute for pistol and ball. With a
-philosophical flourish Cato throws himself upon his sword; I quietly
-take to the ship.""")
-      rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl),
-                      limitDuration=ldur,
-                      limitData=ldata)
-      rv2.setup(rel)
-      rel.mount(rv2)
-      dst.mount(proto)
-
-      await rel.start()
-      await src.start()
-      await dst.start()
-
-      let addrs = MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" &
-                                $rel.peerInfo.peerId & "/p2p-circuit").get()
-
-      await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
-      await dst.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
-
-      rsvp = await dstCl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
-      conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec)
-      await conn.writeLp("count me the better story you know")
-      check: "do you expect a lorem ipsum or...?" == string.fromBytes(await conn.readLp(1024))
-      await conn.writeLp("surprise me!")
-      expect(LPStreamEOFError):
-        discard await conn.readLp(1024)
-      await allFutures(conn.close())
-      await allFutures(src.stop(), dst.stop(), rel.stop())
-
-    asyncTest "Reservation ttl expire during connection":
-      ttl = 3
-      proto.handler = proc(conn: Connection, proto: string) {.async.} =
-        check: "test1" == string.fromBytes(await conn.readLp(1024))
-        await conn.writeLp("test2")
-        check: "test3" == string.fromBytes(await conn.readLp(1024))
-        await conn.writeLp("test4")
-        await conn.close()
-      rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl),
-                      limitDuration=ldur,
-                      limitData=ldata)
-      rv2.setup(rel)
-      rel.mount(rv2)
-      dst.mount(proto)
-
-      await rel.start()
-      await src.start()
-      await dst.start()
-
-      let addrs = MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" &
-                                $rel.peerInfo.peerId & "/p2p-circuit").get()
-
-      await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
-      await dst.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
-
-      rsvp = await dstCl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
-      conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec)
-      await conn.writeLp("test1")
-      check: "test2" == string.fromBytes(await conn.readLp(1024))
-      await conn.writeLp("test3")
-      check: "test4" == string.fromBytes(await conn.readLp(1024))
-      await src.disconnect(rel.peerInfo.peerId)
-      await sleepAsync(chronos.timer.seconds(ttl + 1))
+  for (useYamux, muxName) in [(false, "Mplex"), (true, "Yamux")]:
+    suite "Circuit Relay V2 Connection using " & muxName:
+      asyncTeardown:
+        checkTrackers()
+      var
+        customProtoCodec {.threadvar.}: string
+        proto {.threadvar.}: LPProtocol
+        ttl {.threadvar.}: int
+        ldur {.threadvar.}: uint32
+        ldata {.threadvar.}: uint64
+        srcCl {.threadvar.}: RelayClient
+        dstCl {.threadvar.}: RelayClient
+        rv2 {.threadvar.}: Relay
+        src {.threadvar.}: Switch
+        dst {.threadvar.}: Switch
+        rel {.threadvar.}: Switch
+        rsvp {.threadvar.}: Rsvp
+        conn {.threadvar.}: Connection
+
+      asyncSetup:
+        customProtoCodec = "/test"
+        proto = new LPProtocol
+        proto.codec = customProtoCodec
+        ttl = 60
+        ldur = 120
+        ldata = 16384
+        srcCl = RelayClient.new()
+        dstCl = RelayClient.new()
+        src = createSwitch(srcCl, useYamux)
+        dst = createSwitch(dstCl, useYamux)
+        rel = createSwitch(nil, useYamux)
+
+      asyncTest "Connection succeed":
+        proto.handler = proc(conn: Connection, proto: string) {.async.} =
+          check: "test1" == string.fromBytes(await conn.readLp(1024))
+          await conn.writeLp("test2")
+          check: "test3" == string.fromBytes(await conn.readLp(1024))
+          await conn.writeLp("test4")
+          await conn.close()
+        rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl),
+                        limitDuration=ldur,
+                        limitData=ldata)
+        rv2.setup(rel)
+        rel.mount(rv2)
+        dst.mount(proto)
+
+        await rel.start()
+        await src.start()
+        await dst.start()
+
+        let addrs = MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" &
+                                  $rel.peerInfo.peerId & "/p2p-circuit").get()
 
-      expect(DialFailedError):
-        check: conn.atEof()
-        await conn.close()
         await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
-        conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec)
-      await allFutures(conn.close())
-      await allFutures(src.stop(), dst.stop(), rel.stop())
-
-    asyncTest "Connection over relay":
-      # src => rel => rel2 => dst
-      # rel2 reserve rel
-      # dst reserve rel2
-      # src try to connect with dst
-      proto.handler = proc(conn: Connection, proto: string) {.async.} =
-        raise newException(CatchableError, "Should not be here")
-      let
-        rel2Cl = RelayClient.new(canHop = true)
-        rel2 = createSwitch(rel2Cl)
-        rv2 = Relay.new()
-      rv2.setup(rel)
-      rel.mount(rv2)
-      dst.mount(proto)
-      await rel.start()
-      await rel2.start()
-      await src.start()
-      await dst.start()
+        await dst.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
 
-      let
-        addrs = @[ MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" &
-                                     $rel.peerInfo.peerId & "/p2p-circuit/p2p/" &
-                                     $rel2.peerInfo.peerId & "/p2p/" &
-                                     $rel2.peerInfo.peerId & "/p2p-circuit").get() ]
+        rsvp = await dstCl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
 
-      await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
-      await rel2.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
-      await dst.connect(rel2.peerInfo.peerId, rel2.peerInfo.addrs)
+        conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec)
+        await conn.writeLp("test1")
+        check: "test2" == string.fromBytes(await conn.readLp(1024))
+        await conn.writeLp("test3")
+        check: "test4" == string.fromBytes(await conn.readLp(1024))
+        await allFutures(conn.close())
+        await allFutures(src.stop(), dst.stop(), rel.stop())
+
+      asyncTest "Connection duration exceeded":
+        ldur = 3
+        proto.handler = proc(conn: Connection, proto: string) {.async.} =
+          check "wanna sleep?" == string.fromBytes(await conn.readLp(1024))
+          await conn.writeLp("yeah!")
+          check "go!" == string.fromBytes(await conn.readLp(1024))
+          await sleepAsync(chronos.timer.seconds(ldur + 1))
+          await conn.writeLp("that was a cool power nap")
+          await conn.close()
+        rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl),
+                        limitDuration=ldur,
+                        limitData=ldata)
+        rv2.setup(rel)
+        rel.mount(rv2)
+        dst.mount(proto)
+
+        await rel.start()
+        await src.start()
+        await dst.start()
+
+        let addrs = MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" &
+                                  $rel.peerInfo.peerId & "/p2p-circuit").get()
 
-      rsvp = await rel2Cl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
-      let rsvp2 = await dstCl.reserve(rel2.peerInfo.peerId, rel2.peerInfo.addrs)
+        await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
+        await dst.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
 
-      expect(DialFailedError):
-        conn = await src.dial(dst.peerInfo.peerId, addrs, customProtoCodec)
-      await allFutures(conn.close())
-      await allFutures(src.stop(), dst.stop(), rel.stop(), rel2.stop())
+        rsvp = await dstCl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
+        conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec)
+        await conn.writeLp("wanna sleep?")
+        check: "yeah!" == string.fromBytes(await conn.readLp(1024))
+        await conn.writeLp("go!")
+        expect(LPStreamEOFError):
+          discard await conn.readLp(1024)
+        await allFutures(conn.close())
+        await allFutures(src.stop(), dst.stop(), rel.stop())
+
+      asyncTest "Connection data exceeded":
+        ldata = 1000
+        proto.handler = proc(conn: Connection, proto: string) {.async.} =
+          check "count me the better story you know" == string.fromBytes(await conn.readLp(1024))
+          await conn.writeLp("do you expect a lorem ipsum or...?")
+          check "surprise me!" == string.fromBytes(await conn.readLp(1024))
+          await conn.writeLp("""Call me Ishmael. Some years ago--never mind how long
+  precisely--having little or no money in my purse, and nothing
+  particular to interest me on shore, I thought I would sail about a
+  little and see the watery part of the world. It is a way I have of
+  driving off the spleen and regulating the circulation. Whenever I
+  find myself growing grim about the mouth; whenever it is a damp,
+  drizzly November in my soul; whenever I find myself involuntarily
+  pausing before coffin warehouses, and bringing up the rear of every
+  funeral I meet; and especially whenever my hypos get such an upper
+  hand of me, that it requires a strong moral principle to prevent me
+  from deliberately stepping into the street, and methodically knocking
+  people's hats off--then, I account it high time to get to sea as soon
+  as I can. This is my substitute for pistol and ball. With a
+  philosophical flourish Cato throws himself upon his sword; I quietly
+  take to the ship.""")
+        rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl),
+                        limitDuration=ldur,
+                        limitData=ldata)
+        rv2.setup(rel)
+        rel.mount(rv2)
+        dst.mount(proto)
+
+        await rel.start()
+        await src.start()
+        await dst.start()
+
+        let addrs = MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" &
+                                  $rel.peerInfo.peerId & "/p2p-circuit").get()
 
-    asyncTest "Connection using ClientRelay":
-      var
-        protoABC = new LPProtocol
-        protoBCA = new LPProtocol
-        protoCAB = new LPProtocol
-      protoABC.codec = "/abctest"
-      protoABC.handler = proc(conn: Connection, proto: string) {.async.} =
-        check: "testABC1" == string.fromBytes(await conn.readLp(1024))
-        await conn.writeLp("testABC2")
-        check: "testABC3" == string.fromBytes(await conn.readLp(1024))
-        await conn.writeLp("testABC4")
-        await conn.close()
-      protoBCA.codec = "/bcatest"
-      protoBCA.handler = proc(conn: Connection, proto: string) {.async.} =
-        check: "testBCA1" == string.fromBytes(await conn.readLp(1024))
-        await conn.writeLp("testBCA2")
-        check: "testBCA3" == string.fromBytes(await conn.readLp(1024))
-        await conn.writeLp("testBCA4")
-        await conn.close()
-      protoCAB.codec = "/cabtest"
-      protoCAB.handler = proc(conn: Connection, proto: string) {.async.} =
-        check: "testCAB1" == string.fromBytes(await conn.readLp(1024))
-        await conn.writeLp("testCAB2")
-        check: "testCAB3" == string.fromBytes(await conn.readLp(1024))
-        await conn.writeLp("testCAB4")
-        await conn.close()
+        await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
+        await dst.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
 
-      let
-        clientA = RelayClient.new(canHop = true)
-        clientB = RelayClient.new(canHop = true)
-        clientC = RelayClient.new(canHop = true)
-        switchA = createSwitch(clientA)
-        switchB = createSwitch(clientB)
-        switchC = createSwitch(clientC)
+        rsvp = await dstCl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
+        conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec)
+        await conn.writeLp("count me the better story you know")
+        check: "do you expect a lorem ipsum or...?" == string.fromBytes(await conn.readLp(1024))
+        await conn.writeLp("surprise me!")
+        expect(LPStreamEOFError):
+          discard await conn.readLp(1024)
+        await allFutures(conn.close())
+        await allFutures(src.stop(), dst.stop(), rel.stop())
+
+      asyncTest "Reservation ttl expire during connection":
+        ttl = 3
+        proto.handler = proc(conn: Connection, proto: string) {.async.} =
+          check: "test1" == string.fromBytes(await conn.readLp(1024))
+          await conn.writeLp("test2")
+          check: "test3" == string.fromBytes(await conn.readLp(1024))
+          await conn.writeLp("test4")
+          await conn.close()
+        rv2 = Relay.new(reservationTTL=initDuration(seconds=ttl),
+                        limitDuration=ldur,
+                        limitData=ldata)
+        rv2.setup(rel)
+        rel.mount(rv2)
+        dst.mount(proto)
+
+        await rel.start()
+        await src.start()
+        await dst.start()
+
+        let addrs = MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" &
+                                  $rel.peerInfo.peerId & "/p2p-circuit").get()
 
-      switchA.mount(protoBCA)
-      switchB.mount(protoCAB)
-      switchC.mount(protoABC)
+        await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
+        await dst.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
 
-      await switchA.start()
-      await switchB.start()
-      await switchC.start()
+        rsvp = await dstCl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
+        conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec)
+        await conn.writeLp("test1")
+        check: "test2" == string.fromBytes(await conn.readLp(1024))
+        await conn.writeLp("test3")
+        check: "test4" == string.fromBytes(await conn.readLp(1024))
+        await src.disconnect(rel.peerInfo.peerId)
+        await sleepAsync(chronos.timer.seconds(ttl + 1))
+
+        expect(DialFailedError):
+          check: conn.atEof()
+          await conn.close()
+          await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
+          conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec)
+        await allFutures(conn.close())
+        await allFutures(src.stop(), dst.stop(), rel.stop())
+
+      asyncTest "Connection over relay":
+        # src => rel => rel2 => dst
+        # rel2 reserve rel
+        # dst reserve rel2
+        # src try to connect with dst
+        proto.handler = proc(conn: Connection, proto: string) {.async.} =
+          raise newException(CatchableError, "Should not be here")
+        let
+          rel2Cl = RelayClient.new(canHop = true)
+          rel2 = createSwitch(rel2Cl, useYamux)
+          rv2 = Relay.new()
+        rv2.setup(rel)
+        rel.mount(rv2)
+        dst.mount(proto)
+        await rel.start()
+        await rel2.start()
+        await src.start()
+        await dst.start()
+
+        let
+          addrs = @[ MultiAddress.init($rel.peerInfo.addrs[0] & "/p2p/" &
+                                       $rel.peerInfo.peerId & "/p2p-circuit/p2p/" &
+                                       $rel2.peerInfo.peerId & "/p2p/" &
+                                       $rel2.peerInfo.peerId & "/p2p-circuit").get() ]
 
-      let
-        addrsABC = MultiAddress.init($switchB.peerInfo.addrs[0] & "/p2p/" &
-                                     $switchB.peerInfo.peerId & "/p2p-circuit").get()
-        addrsBCA = MultiAddress.init($switchC.peerInfo.addrs[0] & "/p2p/" &
-                                     $switchC.peerInfo.peerId & "/p2p-circuit").get()
-        addrsCAB = MultiAddress.init($switchA.peerInfo.addrs[0] & "/p2p/" &
-                                     $switchA.peerInfo.peerId & "/p2p-circuit").get()
-
-      await switchA.connect(switchB.peerInfo.peerId, switchB.peerInfo.addrs)
-      await switchB.connect(switchC.peerInfo.peerId, switchC.peerInfo.addrs)
-      await switchC.connect(switchA.peerInfo.peerId, switchA.peerInfo.addrs)
-      let rsvpABC = await clientA.reserve(switchC.peerInfo.peerId, switchC.peerInfo.addrs)
-      let rsvpBCA = await clientB.reserve(switchA.peerInfo.peerId, switchA.peerInfo.addrs)
-      let rsvpCAB = await clientC.reserve(switchB.peerInfo.peerId, switchB.peerInfo.addrs)
-      let connABC = await switchA.dial(switchC.peerInfo.peerId, @[ addrsABC ], "/abctest")
-      let connBCA = await switchB.dial(switchA.peerInfo.peerId, @[ addrsBCA ], "/bcatest")
-      let connCAB = await switchC.dial(switchB.peerInfo.peerId, @[ addrsCAB ], "/cabtest")
-
-      await connABC.writeLp("testABC1")
-      await connBCA.writeLp("testBCA1")
-      await connCAB.writeLp("testCAB1")
-      check:
-        "testABC2" == string.fromBytes(await connABC.readLp(1024))
-        "testBCA2" == string.fromBytes(await connBCA.readLp(1024))
-        "testCAB2" == string.fromBytes(await connCAB.readLp(1024))
-      await connABC.writeLp("testABC3")
-      await connBCA.writeLp("testBCA3")
-      await connCAB.writeLp("testCAB3")
-      check:
-        "testABC4" == string.fromBytes(await connABC.readLp(1024))
-        "testBCA4" == string.fromBytes(await connBCA.readLp(1024))
-        "testCAB4" == string.fromBytes(await connCAB.readLp(1024))
-      await allFutures(connABC.close(), connBCA.close(), connCAB.close())
-      await allFutures(switchA.stop(), switchB.stop(), switchC.stop())
+        await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
+        await rel2.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
+        await dst.connect(rel2.peerInfo.peerId, rel2.peerInfo.addrs)
+
+        rsvp = await rel2Cl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
+        let rsvp2 = await dstCl.reserve(rel2.peerInfo.peerId, rel2.peerInfo.addrs)
+
+        expect(DialFailedError):
+          conn = await src.dial(dst.peerInfo.peerId, addrs, customProtoCodec)
+        if not conn.isNil():
+          await allFutures(conn.close())
+        await allFutures(src.stop(), dst.stop(), rel.stop(), rel2.stop())
+
+      asyncTest "Connection using ClientRelay":
+        var
+          protoABC = new LPProtocol
+          protoBCA = new LPProtocol
+          protoCAB = new LPProtocol
+        protoABC.codec = "/abctest"
+        protoABC.handler = proc(conn: Connection, proto: string) {.async.} =
+          check: "testABC1" == string.fromBytes(await conn.readLp(1024))
+          await conn.writeLp("testABC2")
+          check: "testABC3" == string.fromBytes(await conn.readLp(1024))
+          await conn.writeLp("testABC4")
+          await conn.close()
+        protoBCA.codec = "/bcatest"
+        protoBCA.handler = proc(conn: Connection, proto: string) {.async.} =
+          check: "testBCA1" == string.fromBytes(await conn.readLp(1024))
+          await conn.writeLp("testBCA2")
+          check: "testBCA3" == string.fromBytes(await conn.readLp(1024))
+          await conn.writeLp("testBCA4")
+          await conn.close()
+        protoCAB.codec = "/cabtest"
+        protoCAB.handler = proc(conn: Connection, proto: string) {.async.} =
+          check: "testCAB1" == string.fromBytes(await conn.readLp(1024))
+          await conn.writeLp("testCAB2")
+          check: "testCAB3" == string.fromBytes(await conn.readLp(1024))
+          await conn.writeLp("testCAB4")
+          await conn.close()
+
+        let
+          clientA = RelayClient.new(canHop = true)
+          clientB = RelayClient.new(canHop = true)
+          clientC = RelayClient.new(canHop = true)
+          switchA = createSwitch(clientA, useYamux)
+          switchB = createSwitch(clientB, useYamux)
+          switchC = createSwitch(clientC, useYamux)
+
+        switchA.mount(protoBCA)
+        switchB.mount(protoCAB)
+        switchC.mount(protoABC)
+
+        await switchA.start()
+        await switchB.start()
+        await switchC.start()
+
+        let
+          addrsABC = MultiAddress.init($switchB.peerInfo.addrs[0] & "/p2p/" &
+                                       $switchB.peerInfo.peerId & "/p2p-circuit").get()
+          addrsBCA = MultiAddress.init($switchC.peerInfo.addrs[0] & "/p2p/" &
+                                       $switchC.peerInfo.peerId & "/p2p-circuit").get()
+          addrsCAB = MultiAddress.init($switchA.peerInfo.addrs[0] & "/p2p/" &
+                                       $switchA.peerInfo.peerId & "/p2p-circuit").get()
+
+        await switchA.connect(switchB.peerInfo.peerId, switchB.peerInfo.addrs)
+        await switchB.connect(switchC.peerInfo.peerId, switchC.peerInfo.addrs)
+        await switchC.connect(switchA.peerInfo.peerId, switchA.peerInfo.addrs)
+        let rsvpABC = await clientA.reserve(switchC.peerInfo.peerId, switchC.peerInfo.addrs)
+        let rsvpBCA = await clientB.reserve(switchA.peerInfo.peerId, switchA.peerInfo.addrs)
+        let rsvpCAB = await clientC.reserve(switchB.peerInfo.peerId, switchB.peerInfo.addrs)
+        let connABC = await switchA.dial(switchC.peerInfo.peerId, @[ addrsABC ], "/abctest")
+        let connBCA = await switchB.dial(switchA.peerInfo.peerId, @[ addrsBCA ], "/bcatest")
+        let connCAB = await switchC.dial(switchB.peerInfo.peerId, @[ addrsCAB ], "/cabtest")
+
+        await connABC.writeLp("testABC1")
+        await connBCA.writeLp("testBCA1")
+        await connCAB.writeLp("testCAB1")
+        check:
+          "testABC2" == string.fromBytes(await connABC.readLp(1024))
+          "testBCA2" == string.fromBytes(await connBCA.readLp(1024))
+          "testCAB2" == string.fromBytes(await connCAB.readLp(1024))
+        await connABC.writeLp("testABC3")
+        await connBCA.writeLp("testBCA3")
+        await connCAB.writeLp("testCAB3")
+        check:
+          "testABC4" == string.fromBytes(await connABC.readLp(1024))
+          "testBCA4" == string.fromBytes(await connBCA.readLp(1024))
+          "testCAB4" == string.fromBytes(await connCAB.readLp(1024))
+        await allFutures(connABC.close(), connBCA.close(), connCAB.close())
+        await allFutures(switchA.stop(), switchB.stop(), switchC.stop())