From d6e9c7d9d1999d73a40aa010132f78cd5d686530 Mon Sep 17 00:00:00 2001 From: mantre Date: Tue, 24 Oct 2023 23:07:43 +0800 Subject: [PATCH] fix(network): ensure relay is not used when there is no direct connection --- network/network_test.go | 43 ++++++++++++-------- network/stream.go | 88 ++++++++++++++++++++--------------------- 2 files changed, 71 insertions(+), 60 deletions(-) diff --git a/network/network_test.go b/network/network_test.go index 4838c405b..eeb2d5ad5 100644 --- a/network/network_test.go +++ b/network/network_test.go @@ -150,13 +150,13 @@ func TestNetwork(t *testing.T) { relayAddrs = append(relayAddrs, addr2) } + bootstrapPort := ts.RandInt32(9999) + 10000 + // Bootstrap node confB := testConfig() - bootstrapPort := ts.RandInt32(9999) + 10000 confB.Bootstrapper = true confB.Listens = []string{ fmt.Sprintf("/ip4/127.0.0.1/tcp/%v", bootstrapPort), - fmt.Sprintf("/ip6/::1/tcp/%v", bootstrapPort), } fmt.Println("Starting Bootstrap node") networkB := makeTestNetwork(t, confB, []lp2p.Option{ @@ -164,7 +164,6 @@ func TestNetwork(t *testing.T) { }) bootstrapAddresses := []string{ fmt.Sprintf("/ip4/127.0.0.1/tcp/%v/p2p/%v", bootstrapPort, networkB.SelfID().String()), - fmt.Sprintf("/ip6/::1/tcp/%v/p2p/%v", bootstrapPort, networkB.SelfID().String()), } assert.NoError(t, networkB.JoinConsensusTopic()) @@ -174,7 +173,6 @@ func TestNetwork(t *testing.T) { confP.BootstrapAddrs = bootstrapAddresses confP.Listens = []string{ "/ip4/127.0.0.1/tcp/0", - "/ip6/::1/tcp/0", } fmt.Println("Starting Public node") networkP := makeTestNetwork(t, confP, []lp2p.Option{ @@ -189,7 +187,6 @@ func TestNetwork(t *testing.T) { confM.BootstrapAddrs = bootstrapAddresses confM.Listens = []string{ "/ip4/127.0.0.1/tcp/0", - "/ip6/::1/tcp/0", } fmt.Println("Starting Private node M") networkM := makeTestNetwork(t, confM, []lp2p.Option{ @@ -204,7 +201,6 @@ func TestNetwork(t *testing.T) { confN.BootstrapAddrs = bootstrapAddresses confN.Listens = []string{ "/ip4/127.0.0.1/tcp/0", - "/ip6/::1/tcp/0", } fmt.Println("Starting Private node N") networkN := makeTestNetwork(t, confN, []lp2p.Option{ @@ -212,13 +208,12 @@ func TestNetwork(t *testing.T) { }) assert.NoError(t, networkN.JoinConsensusTopic()) - // Private node X, doesn't join consensus topic + // Private node X, doesn't join consensus topic and without relay address confX := testConfig() confX.EnableRelay = false confX.BootstrapAddrs = bootstrapAddresses confX.Listens = []string{ "/ip4/127.0.0.1/tcp/0", - "/ip6/::1/tcp/0", } fmt.Println("Starting Private node X") networkX := makeTestNetwork(t, confX, []lp2p.Option{ @@ -278,9 +273,18 @@ func TestNetwork(t *testing.T) { msgM := []byte("test-stream-from-m") require.NoError(t, networkM.SendTo(msgM, networkP.SelfID())) - eB := shouldReceiveEvent(t, networkP, EventTypeStream).(*StreamMessage) - assert.Equal(t, eB.Source, networkM.SelfID()) - assert.Equal(t, readData(t, eB.Reader, len(msgM)), msgM) + eP := shouldReceiveEvent(t, networkP, EventTypeStream).(*StreamMessage) + assert.Equal(t, eP.Source, networkM.SelfID()) + assert.Equal(t, readData(t, eP.Reader, len(msgM)), msgM) + }) + + t.Run("node P (public) is directly accessible by node X (private behind NAT, without relay)", func(t *testing.T) { + msgX := []byte("test-stream-from-x") + + require.NoError(t, networkX.SendTo(msgX, networkP.SelfID())) + eP := shouldReceiveEvent(t, networkP, EventTypeStream).(*StreamMessage) + assert.Equal(t, eP.Source, networkX.SelfID()) + assert.Equal(t, readData(t, eP.Reader, len(msgX)), msgX) }) t.Run("node P (public) is directly accessible by node B (bootstrap)", func(t *testing.T) { @@ -293,17 +297,24 @@ func TestNetwork(t *testing.T) { }) t.Run("nodes M and N (private, connected via relay) can communicate using the relay node R", func(t *testing.T) { - msgM := []byte("test-stream-from-m") + _, err := networkM.host.Network().DialPeer(networkM.ctx, nodeR.ID()) + assert.NoError(t, err) - require.NoError(t, networkM.SendTo(msgM, networkN.SelfID())) - eM := shouldReceiveEvent(t, networkN, EventTypeStream).(*StreamMessage) - assert.Equal(t, eM.Source, networkM.SelfID()) - assert.Equal(t, readData(t, eM.Reader, len(msgM)), msgM) + // TODO: How we can test this? + // msgM := []byte("test-stream-from-m") + + // require.NoError(t, networkM.SendTo(msgM, networkN.SelfID())) + // eM := shouldReceiveEvent(t, networkN, EventTypeStream).(*StreamMessage) + // assert.Equal(t, eM.Source, networkM.SelfID()) + // assert.Equal(t, readData(t, eM.Reader, len(msgM)), msgM) }) t.Run("node X (private, not connected via relay) is not accessible by node M", func(t *testing.T) { msgM := []byte("test-stream-from-m") + _, err := networkX.host.Network().DialPeer(networkX.ctx, nodeR.ID()) + assert.Error(t, err) + require.Error(t, networkM.SendTo(msgM, networkX.SelfID())) }) diff --git a/network/stream.go b/network/stream.go index bf03d9614..a4a1190ec 100644 --- a/network/stream.go +++ b/network/stream.go @@ -2,7 +2,6 @@ package network import ( "context" - "fmt" lp2pcore "github.com/libp2p/go-libp2p/core" lp2phost "github.com/libp2p/go-libp2p/core/host" @@ -70,49 +69,50 @@ func (s *streamService) SendRequest(msg []byte, pid lp2peer.ID) error { stream, err := s.host.NewStream( lp2pnetwork.WithNoDial(s.ctx, "should already have connection"), pid, s.protocolID) if err != nil { - s.logger.Debug("unable to open direct stream", "pid", pid, "error", err) - if len(s.relayAddrs) == 0 { - return err - } - - // We don't have a direct connection to the destination node, - // so we try to connect via a relay node. - // An example of a relay connection is described here: - // https://github.com/libp2p/go-libp2p/blob/master/examples/relay/main.go - circuitAddrs := make([]ma.Multiaddr, len(s.relayAddrs)) - for i, addr := range s.relayAddrs { - // To connect a peer over relay, we need a relay address. - // The format for the relay address is defined here: - // https://docs.libp2p.io/concepts/nat/circuit-relay/#relay-addresses - circuitAddr, err := ma.NewMultiaddr(fmt.Sprintf("%s/p2p-circuit/p2p/%s", addr.String(), pid)) - if err != nil { - return LibP2PError{Err: err} - } - // fmt.Println(circuitAddr) - circuitAddrs[i] = circuitAddr - } - - // Open a connection to the previously unreachable host via the relay address - unreachableRelayInfo := lp2peer.AddrInfo{ - ID: pid, - Addrs: circuitAddrs, - } - - if err := s.host.Connect(s.ctx, unreachableRelayInfo); err != nil { - // There is no relay connection to peer as well - s.logger.Warn("unable to connect to peer using relay", "pid", pid, "error", err) - return LibP2PError{Err: err} - } - s.logger.Debug("connected to peer using relay", "pid", pid) - - // Try to open a new stream to the target peer using the relay connection. - // The connection is marked as transient. - stream, err = s.host.NewStream( - lp2pnetwork.WithUseTransient(s.ctx, string(s.protocolID)), pid, s.protocolID) - if err != nil { - s.logger.Warn("unable to open relay stream", "pid", pid, "error", err) - return LibP2PError{Err: err} - } + return err + // s.logger.Debug("unable to open direct stream", "pid", pid, "error", err) + // if len(s.relayAddrs) == 0 { + // return err + // } + + // // We don't have a direct connection to the destination node, + // // so we try to connect via a relay node. + // // An example of a relay connection is described here: + // // https://github.com/libp2p/go-libp2p/blob/master/examples/relay/main.go + // circuitAddrs := make([]ma.Multiaddr, len(s.relayAddrs)) + // for i, addr := range s.relayAddrs { + // // To connect a peer over relay, we need a relay address. + // // The format for the relay address is defined here: + // // https://docs.libp2p.io/concepts/nat/circuit-relay/#relay-addresses + // circuitAddr, err := ma.NewMultiaddr(fmt.Sprintf("%s/p2p-circuit/p2p/%s", addr.String(), pid)) + // if err != nil { + // return LibP2PError{Err: err} + // } + // // fmt.Println(circuitAddr) + // circuitAddrs[i] = circuitAddr + // } + + // // Open a connection to the previously unreachable host via the relay address + // unreachableRelayInfo := lp2peer.AddrInfo{ + // ID: pid, + // Addrs: circuitAddrs, + // } + + // if err := s.host.Connect(s.ctx, unreachableRelayInfo); err != nil { + // // There is no relay connection to peer as well + // s.logger.Warn("unable to connect to peer using relay", "pid", pid, "error", err) + // return LibP2PError{Err: err} + // } + // s.logger.Debug("connected to peer using relay", "pid", pid) + + // // Try to open a new stream to the target peer using the relay connection. + // // The connection is marked as transient. + // stream, err = s.host.NewStream( + // lp2pnetwork.WithUseTransient(s.ctx, string(s.protocolID)), pid, s.protocolID) + // if err != nil { + // s.logger.Warn("unable to open relay stream", "pid", pid, "error", err) + // return LibP2PError{Err: err} + // } } _, err = stream.Write(msg)