Skip to content

Commit

Permalink
fix(network): ensure relay is not used when there is no direct connec…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
themantre committed Oct 24, 2023
1 parent 5802ec9 commit d6e9c7d
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 60 deletions.
43 changes: 27 additions & 16 deletions network/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,21 +150,20 @@ func TestNetwork(t *testing.T) {
relayAddrs = append(relayAddrs, addr2)
}

bootstrapPort := ts.RandInt32(9999) + 10000

// Bootstrap node
confB := testConfig()
bootstrapPort := ts.RandInt32(9999) + 10000
confB.Bootstrapper = true
confB.Listens = []string{
fmt.Sprintf("/ip4/127.0.0.1/tcp/%v", bootstrapPort),
fmt.Sprintf("/ip6/::1/tcp/%v", bootstrapPort),
}
fmt.Println("Starting Bootstrap node")
networkB := makeTestNetwork(t, confB, []lp2p.Option{
lp2p.ForceReachabilityPublic(),
})
bootstrapAddresses := []string{
fmt.Sprintf("/ip4/127.0.0.1/tcp/%v/p2p/%v", bootstrapPort, networkB.SelfID().String()),
fmt.Sprintf("/ip6/::1/tcp/%v/p2p/%v", bootstrapPort, networkB.SelfID().String()),
}
assert.NoError(t, networkB.JoinConsensusTopic())

Expand All @@ -174,7 +173,6 @@ func TestNetwork(t *testing.T) {
confP.BootstrapAddrs = bootstrapAddresses
confP.Listens = []string{
"/ip4/127.0.0.1/tcp/0",
"/ip6/::1/tcp/0",
}
fmt.Println("Starting Public node")
networkP := makeTestNetwork(t, confP, []lp2p.Option{
Expand All @@ -189,7 +187,6 @@ func TestNetwork(t *testing.T) {
confM.BootstrapAddrs = bootstrapAddresses
confM.Listens = []string{
"/ip4/127.0.0.1/tcp/0",
"/ip6/::1/tcp/0",
}
fmt.Println("Starting Private node M")
networkM := makeTestNetwork(t, confM, []lp2p.Option{
Expand All @@ -204,21 +201,19 @@ func TestNetwork(t *testing.T) {
confN.BootstrapAddrs = bootstrapAddresses
confN.Listens = []string{
"/ip4/127.0.0.1/tcp/0",
"/ip6/::1/tcp/0",
}
fmt.Println("Starting Private node N")
networkN := makeTestNetwork(t, confN, []lp2p.Option{
lp2p.ForceReachabilityPrivate(),
})
assert.NoError(t, networkN.JoinConsensusTopic())

// Private node X, doesn't join consensus topic
// Private node X, doesn't join consensus topic and without relay address
confX := testConfig()
confX.EnableRelay = false
confX.BootstrapAddrs = bootstrapAddresses
confX.Listens = []string{
"/ip4/127.0.0.1/tcp/0",
"/ip6/::1/tcp/0",
}
fmt.Println("Starting Private node X")
networkX := makeTestNetwork(t, confX, []lp2p.Option{
Expand Down Expand Up @@ -278,9 +273,18 @@ func TestNetwork(t *testing.T) {
msgM := []byte("test-stream-from-m")

require.NoError(t, networkM.SendTo(msgM, networkP.SelfID()))
eB := shouldReceiveEvent(t, networkP, EventTypeStream).(*StreamMessage)
assert.Equal(t, eB.Source, networkM.SelfID())
assert.Equal(t, readData(t, eB.Reader, len(msgM)), msgM)
eP := shouldReceiveEvent(t, networkP, EventTypeStream).(*StreamMessage)
assert.Equal(t, eP.Source, networkM.SelfID())
assert.Equal(t, readData(t, eP.Reader, len(msgM)), msgM)
})

t.Run("node P (public) is directly accessible by node X (private behind NAT, without relay)", func(t *testing.T) {
msgX := []byte("test-stream-from-x")

require.NoError(t, networkX.SendTo(msgX, networkP.SelfID()))
eP := shouldReceiveEvent(t, networkP, EventTypeStream).(*StreamMessage)
assert.Equal(t, eP.Source, networkX.SelfID())
assert.Equal(t, readData(t, eP.Reader, len(msgX)), msgX)
})

t.Run("node P (public) is directly accessible by node B (bootstrap)", func(t *testing.T) {
Expand All @@ -293,17 +297,24 @@ func TestNetwork(t *testing.T) {
})

t.Run("nodes M and N (private, connected via relay) can communicate using the relay node R", func(t *testing.T) {
msgM := []byte("test-stream-from-m")
_, err := networkM.host.Network().DialPeer(networkM.ctx, nodeR.ID())
assert.NoError(t, err)

require.NoError(t, networkM.SendTo(msgM, networkN.SelfID()))
eM := shouldReceiveEvent(t, networkN, EventTypeStream).(*StreamMessage)
assert.Equal(t, eM.Source, networkM.SelfID())
assert.Equal(t, readData(t, eM.Reader, len(msgM)), msgM)
// TODO: How we can test this?
// msgM := []byte("test-stream-from-m")

// require.NoError(t, networkM.SendTo(msgM, networkN.SelfID()))
// eM := shouldReceiveEvent(t, networkN, EventTypeStream).(*StreamMessage)
// assert.Equal(t, eM.Source, networkM.SelfID())
// assert.Equal(t, readData(t, eM.Reader, len(msgM)), msgM)
})

t.Run("node X (private, not connected via relay) is not accessible by node M", func(t *testing.T) {
msgM := []byte("test-stream-from-m")

_, err := networkX.host.Network().DialPeer(networkX.ctx, nodeR.ID())
assert.Error(t, err)

require.Error(t, networkM.SendTo(msgM, networkX.SelfID()))
})

Expand Down
88 changes: 44 additions & 44 deletions network/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package network

import (
"context"
"fmt"

lp2pcore "github.com/libp2p/go-libp2p/core"
lp2phost "github.com/libp2p/go-libp2p/core/host"
Expand Down Expand Up @@ -70,49 +69,50 @@ func (s *streamService) SendRequest(msg []byte, pid lp2peer.ID) error {
stream, err := s.host.NewStream(
lp2pnetwork.WithNoDial(s.ctx, "should already have connection"), pid, s.protocolID)
if err != nil {
s.logger.Debug("unable to open direct stream", "pid", pid, "error", err)
if len(s.relayAddrs) == 0 {
return err
}

// We don't have a direct connection to the destination node,
// so we try to connect via a relay node.
// An example of a relay connection is described here:
// https://github.com/libp2p/go-libp2p/blob/master/examples/relay/main.go
circuitAddrs := make([]ma.Multiaddr, len(s.relayAddrs))
for i, addr := range s.relayAddrs {
// To connect a peer over relay, we need a relay address.
// The format for the relay address is defined here:
// https://docs.libp2p.io/concepts/nat/circuit-relay/#relay-addresses
circuitAddr, err := ma.NewMultiaddr(fmt.Sprintf("%s/p2p-circuit/p2p/%s", addr.String(), pid))
if err != nil {
return LibP2PError{Err: err}
}
// fmt.Println(circuitAddr)
circuitAddrs[i] = circuitAddr
}

// Open a connection to the previously unreachable host via the relay address
unreachableRelayInfo := lp2peer.AddrInfo{
ID: pid,
Addrs: circuitAddrs,
}

if err := s.host.Connect(s.ctx, unreachableRelayInfo); err != nil {
// There is no relay connection to peer as well
s.logger.Warn("unable to connect to peer using relay", "pid", pid, "error", err)
return LibP2PError{Err: err}
}
s.logger.Debug("connected to peer using relay", "pid", pid)

// Try to open a new stream to the target peer using the relay connection.
// The connection is marked as transient.
stream, err = s.host.NewStream(
lp2pnetwork.WithUseTransient(s.ctx, string(s.protocolID)), pid, s.protocolID)
if err != nil {
s.logger.Warn("unable to open relay stream", "pid", pid, "error", err)
return LibP2PError{Err: err}
}
return err
// s.logger.Debug("unable to open direct stream", "pid", pid, "error", err)
// if len(s.relayAddrs) == 0 {
// return err
// }

// // We don't have a direct connection to the destination node,
// // so we try to connect via a relay node.
// // An example of a relay connection is described here:
// // https://github.com/libp2p/go-libp2p/blob/master/examples/relay/main.go
// circuitAddrs := make([]ma.Multiaddr, len(s.relayAddrs))
// for i, addr := range s.relayAddrs {
// // To connect a peer over relay, we need a relay address.
// // The format for the relay address is defined here:
// // https://docs.libp2p.io/concepts/nat/circuit-relay/#relay-addresses
// circuitAddr, err := ma.NewMultiaddr(fmt.Sprintf("%s/p2p-circuit/p2p/%s", addr.String(), pid))
// if err != nil {
// return LibP2PError{Err: err}
// }
// // fmt.Println(circuitAddr)
// circuitAddrs[i] = circuitAddr
// }

// // Open a connection to the previously unreachable host via the relay address
// unreachableRelayInfo := lp2peer.AddrInfo{
// ID: pid,
// Addrs: circuitAddrs,
// }

// if err := s.host.Connect(s.ctx, unreachableRelayInfo); err != nil {
// // There is no relay connection to peer as well
// s.logger.Warn("unable to connect to peer using relay", "pid", pid, "error", err)
// return LibP2PError{Err: err}
// }
// s.logger.Debug("connected to peer using relay", "pid", pid)

// // Try to open a new stream to the target peer using the relay connection.
// // The connection is marked as transient.
// stream, err = s.host.NewStream(
// lp2pnetwork.WithUseTransient(s.ctx, string(s.protocolID)), pid, s.protocolID)
// if err != nil {
// s.logger.Warn("unable to open relay stream", "pid", pid, "error", err)
// return LibP2PError{Err: err}
// }
}

_, err = stream.Write(msg)
Expand Down

0 comments on commit d6e9c7d

Please sign in to comment.