diff --git a/config/example_config.toml b/config/example_config.toml index 66d596cd0..7991ed5a6 100644 --- a/config/example_config.toml +++ b/config/example_config.toml @@ -50,6 +50,10 @@ # Default is false. ## enable_metrics = false + # `bootstrapper` if enabled, it runs the node in bootstrap mode. + # Default is false. + ## bootstrapper = false + # `network.bootstrap` contains configuration for bootstrapping the node. [network.bootstrap] diff --git a/network/config.go b/network/config.go index cd2e50c36..0f83091a5 100644 --- a/network/config.go +++ b/network/config.go @@ -16,6 +16,7 @@ type Config struct { RelayAddrs []string `toml:"relay_addresses"` EnableMdns bool `toml:"enable_mdns"` EnableMetrics bool `toml:"enable_metrics"` + Bootstrapper bool `toml:"bootstrapper"` Bootstrap *BootstrapConfig `toml:"bootstrap"` } @@ -56,6 +57,7 @@ func DefaultConfig() *Config { EnableRelay: false, EnableMdns: false, EnableMetrics: false, + Bootstrapper: false, Bootstrap: &BootstrapConfig{ Addresses: addresses, MinThreshold: 8, diff --git a/network/dht.go b/network/dht.go index da0968a48..463b93729 100644 --- a/network/dht.go +++ b/network/dht.go @@ -18,11 +18,17 @@ type dhtService struct { } func newDHTService(ctx context.Context, host lp2phost.Host, protocolID lp2pcore.ProtocolID, - conf *BootstrapConfig, logger *logger.SubLogger, + conf *BootstrapConfig, bootstrapper bool, logger *logger.SubLogger, ) *dhtService { + mode := lp2pdht.ModeAuto + if bootstrapper { + mode = lp2pdht.ModeServer + } opts := []lp2pdht.Option{ - lp2pdht.Mode(lp2pdht.ModeAuto), + lp2pdht.Mode(mode), lp2pdht.ProtocolPrefix(protocolID), + lp2pdht.DisableProviders(), + lp2pdht.DisableValues(), } kademlia, err := lp2pdht.New(ctx, host, opts...) @@ -31,6 +37,11 @@ func newDHTService(ctx context.Context, host lp2phost.Host, protocolID lp2pcore. return nil } + err = kademlia.Bootstrap(ctx) + if err != nil { + panic(err.Error()) + } + bootstrap := newBootstrap(ctx, host, host.Network(), kademlia, conf, logger) diff --git a/network/network.go b/network/network.go index 51d9d1cbb..916e60379 100644 --- a/network/network.go +++ b/network/network.go @@ -169,7 +169,7 @@ func newNetwork(networkName string, conf *Config, opts []lp2p.Option) (*network, kadProtocolID := lp2pcore.ProtocolID(fmt.Sprintf("/%s/gossip/v1", n.name)) streamProtocolID := lp2pcore.ProtocolID(fmt.Sprintf("/%s/stream/v1", n.name)) - n.dht = newDHTService(n.ctx, n.host, kadProtocolID, conf.Bootstrap, n.logger) + n.dht = newDHTService(n.ctx, n.host, kadProtocolID, conf.Bootstrap, conf.Bootstrapper, n.logger) n.stream = newStreamService(ctx, n.host, streamProtocolID, relayAddrs, n.eventChannel, n.logger) n.gossip = newGossipService(ctx, n.host, n.eventChannel, n.logger) n.notifee = newNotifeeService(n.host, n.eventChannel, n.logger, streamProtocolID) diff --git a/network/network_test.go b/network/network_test.go index 1d7d9a31c..30be12050 100644 --- a/network/network_test.go +++ b/network/network_test.go @@ -156,12 +156,15 @@ func TestNetwork(t *testing.T) { // Bootstrap node confB := testConfig() bootstrapPort := ts.RandInt32(9999) + 10000 + confB.Bootstrapper = true confB.Listens = []string{ fmt.Sprintf("/ip4/127.0.0.1/tcp/%v", bootstrapPort), fmt.Sprintf("/ip6/::1/tcp/%v", bootstrapPort), } fmt.Println("Starting Bootstrap node") - networkB := makeTestNetwork(t, confB, []lp2p.Option{}) + networkB := makeTestNetwork(t, confB, []lp2p.Option{ + lp2p.ForceReachabilityPublic(), + }) bootstrapAddresses := []string{ fmt.Sprintf("/ip4/127.0.0.1/tcp/%v/p2p/%v", bootstrapPort, networkB.SelfID().String()), fmt.Sprintf("/ip6/::1/tcp/%v/p2p/%v", bootstrapPort, networkB.SelfID().String()), @@ -274,10 +277,13 @@ func TestNetwork(t *testing.T) { assert.Equal(t, eN.Data, msg) }) - t.Run("node P (public) is not directly accessible by nodes M and N (private behind NAT)", func(t *testing.T) { + t.Run("node P (public) is directly accessible by nodes M and N (private behind NAT)", func(t *testing.T) { msgM := []byte("test-stream-from-m") - require.Error(t, networkM.SendTo(msgM, networkP.SelfID())) + require.NoError(t, networkM.SendTo(msgM, networkP.SelfID())) + eB := shouldReceiveEvent(t, networkP, EventTypeStream).(*StreamMessage) + assert.Equal(t, eB.Source, networkM.SelfID()) + assert.Equal(t, readData(t, eB.Reader, len(msgM)), msgM) }) t.Run("node P (public) is directly accessible by node B (bootstrap)", func(t *testing.T) { @@ -327,3 +333,75 @@ func TestInvalidRelayAddress(t *testing.T) { _, err = NewNetwork("test", conf) assert.Error(t, err) } + +func TestConnections(t *testing.T) { + t.Parallel() // run the tests in parallel + + ts := testsuite.NewTestSuite(t) + + tests := []struct { + bootstrapAddr string + peerAddr string + }{ + {"/ip4/127.0.0.1/tcp/%d", "/ip4/127.0.0.1/tcp/0"}, + {"/ip4/127.0.0.1/udp/%d/quic-v1", "/ip4/127.0.0.1/udp/0/quic-v1"}, + {"/ip6/::1/tcp/%d", "/ip6/::1/tcp/0"}, + {"/ip6/::1/udp/%d/quic-v1", "/ip6/::1/udp/0/quic-v1"}, + } + + for i, test := range tests { + // Bootstrap node + confB := testConfig() + bootstrapPort := ts.RandInt32(9999) + 10000 + bootstrapAddr := fmt.Sprintf(test.bootstrapAddr, bootstrapPort) + confB.Listens = []string{bootstrapAddr} + fmt.Println("Starting Bootstrap node") + networkB := makeTestNetwork(t, confB, []lp2p.Option{ + lp2p.ForceReachabilityPublic(), + }) + + // Public node + confP := testConfig() + confP.Bootstrap.Addresses = []string{ + fmt.Sprintf("%s/p2p/%v", bootstrapAddr, networkB.SelfID().String()), + } + confP.Listens = []string{test.peerAddr} + fmt.Println("Starting Public node") + networkP := makeTestNetwork(t, confP, []lp2p.Option{ + lp2p.ForceReachabilityPublic(), + }) + + t.Run(fmt.Sprintf("Running test %d: %s <-> %s ... ", + i, test.bootstrapAddr, test.peerAddr), func(t *testing.T) { + t.Parallel() // run the tests in parallel + + testConnection(t, networkP, networkB) + }) + } +} + +func testConnection(t *testing.T, networkP *network, networkB *network) { + t.Helper() + + // Ensure that peers are connected to each other + for i := 0; i < 20; i++ { + if networkP.NumConnectedPeers() >= 1 && + networkB.NumConnectedPeers() >= 1 { + break + } + time.Sleep(100 * time.Millisecond) + } + + assert.Equal(t, networkB.NumConnectedPeers(), 1) + assert.Equal(t, networkP.NumConnectedPeers(), 1) + + msg := []byte("test-msg") + + require.NoError(t, networkP.SendTo(msg, networkB.SelfID())) + e := shouldReceiveEvent(t, networkB, EventTypeStream).(*StreamMessage) + assert.Equal(t, e.Source, networkP.SelfID()) + assert.Equal(t, readData(t, e.Reader, len(msg)), msg) + + networkB.Stop() + networkP.Stop() +} diff --git a/tests/main_test.go b/tests/main_test.go index 878024868..e1d5ba292 100644 --- a/tests/main_test.go +++ b/tests/main_test.go @@ -82,6 +82,7 @@ func TestMain(m *testing.M) { tConfigs[i].Sync.NodeNetwork = false tConfigs[i].Sync.Firewall.Enabled = false tConfigs[i].Network.EnableMdns = true + tConfigs[i].Network.Bootstrapper = true tConfigs[i].Network.NetworkKey = util.TempFilePath() tConfigs[i].Network.Listens = []string{"/ip4/127.0.0.1/tcp/0", "/ip4/127.0.0.1/udp/0/quic-v1"} tConfigs[i].Network.Bootstrap.Addresses = []string{}