From e7fcb606898e98e0d973bfc748c7485782eb9fc5 Mon Sep 17 00:00:00 2001 From: b00f Date: Sat, 28 Oct 2023 18:17:35 +0800 Subject: [PATCH] refactor(network): refactoring peer manager (#787) --- network/dht.go | 30 ++++++++---------------------- network/mdns.go | 22 +++++++++++----------- network/mdns_test.go | 3 +++ network/network.go | 7 ++++++- network/network_test.go | 41 +++++++++++++++++++++++++++++------------ network/notifee.go | 6 ++---- network/peermgr.go | 18 ++++++------------ 7 files changed, 65 insertions(+), 62 deletions(-) diff --git a/network/dht.go b/network/dht.go index 5767d87cd..b92269d4e 100644 --- a/network/dht.go +++ b/network/dht.go @@ -13,7 +13,6 @@ type dhtService struct { ctx context.Context host lp2phost.Host kademlia *lp2pdht.IpfsDHT - peerMgr *peerMgr logger *logger.SubLogger } @@ -24,45 +23,32 @@ func newDHTService(ctx context.Context, host lp2phost.Host, protocolID lp2pcore. if conf.Bootstrapper { mode = lp2pdht.ModeServer } - bootsrapAddrs := PeerAddrsToAddrInfo(conf.BootstrapAddrs) + bootstrapAddrs := PeerAddrsToAddrInfo(conf.BootstrapAddrs) opts := []lp2pdht.Option{ lp2pdht.Mode(mode), lp2pdht.ProtocolPrefix(protocolID), - lp2pdht.BootstrapPeers(bootsrapAddrs...), + lp2pdht.BootstrapPeers(bootstrapAddrs...), } kademlia, err := lp2pdht.New(ctx, host, opts...) if err != nil { - logger.Panic("unable to start DHT service", "error", err) - return nil + panic(err) } - err = kademlia.Bootstrap(ctx) - if err != nil { - panic(err.Error()) - } - - bootstrap := newPeerMgr(ctx, host, host.Network(), kademlia, - bootsrapAddrs, conf.MinConns, conf.MaxConns, logger) - return &dhtService{ ctx: ctx, host: host, kademlia: kademlia, - peerMgr: bootstrap, logger: logger, } } -func (dht *dhtService) Start() error { - dht.peerMgr.Start() - return nil +func (s *dhtService) Start() error { + return s.kademlia.Bootstrap(s.ctx) } -func (dht *dhtService) Stop() { - if err := dht.kademlia.Close(); err != nil { - dht.logger.Error("unable to close Kademlia", "error", err) +func (s *dhtService) Stop() { + if err := s.kademlia.Close(); err != nil { + s.logger.Error("unable to close Kademlia", "error", err) } - - dht.peerMgr.Stop() } diff --git a/network/mdns.go b/network/mdns.go index 8ff2291d7..cccfba2f0 100644 --- a/network/mdns.go +++ b/network/mdns.go @@ -34,20 +34,20 @@ func newMdnsService(ctx context.Context, host lp2phost.Host, logger *logger.SubL // HandlePeerFound connects to peers discovered via mDNS. Once they're connected, // the PubSub system will automatically start interacting with them if they also // support PubSub. -func (mdns *mdnsService) HandlePeerFound(pi lp2ppeer.AddrInfo) { - ctx, cancel := context.WithTimeout(mdns.ctx, time.Second*10) +func (s *mdnsService) HandlePeerFound(pi lp2ppeer.AddrInfo) { + ctx, cancel := context.WithTimeout(s.ctx, time.Second*10) defer cancel() - if pi.ID != mdns.host.ID() { - mdns.logger.Debug("connecting to new peer", "addr", pi.Addrs, "id", pi.ID.Pretty()) - if err := mdns.host.Connect(ctx, pi); err != nil { - mdns.logger.Error("error on connecting to peer", "id", pi.ID.Pretty(), "error", err) + if pi.ID != s.host.ID() { + s.logger.Debug("connecting to new peer", "addr", pi.Addrs, "id", pi.ID.Pretty()) + if err := s.host.Connect(ctx, pi); err != nil { + s.logger.Error("error on connecting to peer", "id", pi.ID.Pretty(), "error", err) } } } -func (mdns *mdnsService) Start() error { - err := mdns.service.Start() +func (s *mdnsService) Start() error { + err := s.service.Start() if err != nil { return LibP2PError{Err: err} } @@ -55,9 +55,9 @@ func (mdns *mdnsService) Start() error { return nil } -func (mdns *mdnsService) Stop() { - err := mdns.service.Close() +func (s *mdnsService) Stop() { + err := s.service.Close() if err != nil { - mdns.logger.Error("unable to close the network", "error", err) + s.logger.Error("unable to close the network", "error", err) } } diff --git a/network/mdns_test.go b/network/mdns_test.go index 376322e7e..dfa60577a 100644 --- a/network/mdns_test.go +++ b/network/mdns_test.go @@ -33,4 +33,7 @@ func TestMDNS(t *testing.T) { se := shouldReceiveEvent(t, net2, EventTypeStream).(*StreamMessage) assert.Equal(t, se.Source, net1.SelfID()) assert.Equal(t, readData(t, se.Reader, len(msg)), msg) + + net1.Stop() + net2.Stop() } diff --git a/network/network.go b/network/network.go index 0abd80aac..eb1a38447 100644 --- a/network/network.go +++ b/network/network.go @@ -35,6 +35,7 @@ type network struct { host lp2phost.Host mdns *mdnsService dht *dhtService + peerMgr *peerMgr stream *streamService gossip *gossipService notifee *NotifeeService @@ -152,6 +153,7 @@ func newNetwork(networkName string, conf *Config, opts []lp2p.Option) (*network, if err != nil { return nil, LibP2PError{Err: err} } + opts = append(opts, lp2p.EnableRelay(), lp2p.EnableAutoRelayWithStaticRelays(static), @@ -189,6 +191,7 @@ func newNetwork(networkName string, conf *Config, opts []lp2p.Option) (*network, streamProtocolID := lp2pcore.ProtocolID(fmt.Sprintf("/%s/stream/v1", n.name)) n.dht = newDHTService(n.ctx, n.host, kadProtocolID, conf, n.logger) + n.peerMgr = newPeerMgr(ctx, host, n.dht.kademlia, conf, n.logger) n.stream = newStreamService(ctx, n.host, streamProtocolID, relayAddrs, n.eventChannel, n.logger) n.gossip = newGossipService(ctx, n.host, n.eventChannel, conf, n.logger) n.notifee = newNotifeeService(n.host, n.eventChannel, n.logger, streamProtocolID) @@ -217,8 +220,9 @@ func (n *network) Start() error { } n.gossip.Start() n.stream.Start() + n.peerMgr.Start() - n.logger.Info("network started", "addr", n.host.Addrs()) + n.logger.Info("network started", "addr", n.host.Addrs(), "id", n.host.ID()) return nil } @@ -232,6 +236,7 @@ func (n *network) Stop() { n.dht.Stop() n.gossip.Stop() n.stream.Stop() + n.peerMgr.Stop() if err := n.host.Close(); err != nil { n.logger.Error("unable to close the network", "error", err) diff --git a/network/network_test.go b/network/network_test.go index 4838c405b..b52f337a9 100644 --- a/network/network_test.go +++ b/network/network_test.go @@ -150,13 +150,14 @@ func TestNetwork(t *testing.T) { relayAddrs = append(relayAddrs, addr2) } + bootstrapPort := ts.RandInt32(9999) + 10000 + publicPort := ts.RandInt32(9999) + 10000 + // Bootstrap node confB := testConfig() - bootstrapPort := ts.RandInt32(9999) + 10000 confB.Bootstrapper = true confB.Listens = []string{ fmt.Sprintf("/ip4/127.0.0.1/tcp/%v", bootstrapPort), - fmt.Sprintf("/ip6/::1/tcp/%v", bootstrapPort), } fmt.Println("Starting Bootstrap node") networkB := makeTestNetwork(t, confB, []lp2p.Option{ @@ -164,7 +165,6 @@ func TestNetwork(t *testing.T) { }) bootstrapAddresses := []string{ fmt.Sprintf("/ip4/127.0.0.1/tcp/%v/p2p/%v", bootstrapPort, networkB.SelfID().String()), - fmt.Sprintf("/ip6/::1/tcp/%v/p2p/%v", bootstrapPort, networkB.SelfID().String()), } assert.NoError(t, networkB.JoinConsensusTopic()) @@ -173,14 +173,14 @@ func TestNetwork(t *testing.T) { confP.EnableNAT = true confP.BootstrapAddrs = bootstrapAddresses confP.Listens = []string{ - "/ip4/127.0.0.1/tcp/0", - "/ip6/::1/tcp/0", + fmt.Sprintf("/ip4/127.0.0.1/tcp/%v", publicPort), } fmt.Println("Starting Public node") networkP := makeTestNetwork(t, confP, []lp2p.Option{ lp2p.ForceReachabilityPublic(), }) assert.NoError(t, networkP.JoinConsensusTopic()) + publicAddrInfo, _ := MakeAddressInfo(fmt.Sprintf("/ip4/127.0.0.1/tcp/%v/p2p/%s", publicPort, networkP.SelfID())) // Private node M confM := testConfig() @@ -189,7 +189,6 @@ func TestNetwork(t *testing.T) { confM.BootstrapAddrs = bootstrapAddresses confM.Listens = []string{ "/ip4/127.0.0.1/tcp/0", - "/ip6/::1/tcp/0", } fmt.Println("Starting Private node M") networkM := makeTestNetwork(t, confM, []lp2p.Option{ @@ -204,7 +203,6 @@ func TestNetwork(t *testing.T) { confN.BootstrapAddrs = bootstrapAddresses confN.Listens = []string{ "/ip4/127.0.0.1/tcp/0", - "/ip6/::1/tcp/0", } fmt.Println("Starting Private node N") networkN := makeTestNetwork(t, confN, []lp2p.Option{ @@ -212,18 +210,24 @@ func TestNetwork(t *testing.T) { }) assert.NoError(t, networkN.JoinConsensusTopic()) - // Private node X, doesn't join consensus topic + // Private node X, doesn't join consensus topic and without relay address confX := testConfig() confX.EnableRelay = false confX.BootstrapAddrs = bootstrapAddresses confX.Listens = []string{ "/ip4/127.0.0.1/tcp/0", - "/ip6/::1/tcp/0", } fmt.Println("Starting Private node X") networkX := makeTestNetwork(t, confX, []lp2p.Option{ lp2p.ForceReachabilityPrivate(), }) + + assert.NoError(t, networkB.Start()) + assert.NoError(t, networkP.Start()) + assert.NoError(t, networkM.Start()) + assert.NoError(t, networkN.Start()) + assert.NoError(t, networkX.Start()) + time.Sleep(2 * time.Second) t.Run("all nodes have at least one connection to the bootstrap node B", func(t *testing.T) { @@ -275,12 +279,25 @@ func TestNetwork(t *testing.T) { }) t.Run("node P (public) is directly accessible by nodes M and N (private behind NAT)", func(t *testing.T) { + require.NoError(t, networkM.host.Connect(networkM.ctx, *publicAddrInfo)) + msgM := []byte("test-stream-from-m") require.NoError(t, networkM.SendTo(msgM, networkP.SelfID())) - eB := shouldReceiveEvent(t, networkP, EventTypeStream).(*StreamMessage) - assert.Equal(t, eB.Source, networkM.SelfID()) - assert.Equal(t, readData(t, eB.Reader, len(msgM)), msgM) + eP := shouldReceiveEvent(t, networkP, EventTypeStream).(*StreamMessage) + assert.Equal(t, eP.Source, networkM.SelfID()) + assert.Equal(t, readData(t, eP.Reader, len(msgM)), msgM) + }) + + t.Run("node P (public) is directly accessible by node X (private behind NAT, without relay)", func(t *testing.T) { + require.NoError(t, networkX.host.Connect(networkX.ctx, *publicAddrInfo)) + + msgX := []byte("test-stream-from-x") + + require.NoError(t, networkX.SendTo(msgX, networkP.SelfID())) + eP := shouldReceiveEvent(t, networkP, EventTypeStream).(*StreamMessage) + assert.Equal(t, eP.Source, networkX.SelfID()) + assert.Equal(t, readData(t, eP.Reader, len(msgX)), msgX) }) t.Run("node P (public) is directly accessible by node B (bootstrap)", func(t *testing.T) { diff --git a/network/notifee.go b/network/notifee.go index e328ae9ba..a355f10e3 100644 --- a/network/notifee.go +++ b/network/notifee.go @@ -42,13 +42,11 @@ func (n *NotifeeService) Connected(lp2pn lp2pnetwork.Network, conn lp2pnetwork.C protocols, _ := lp2pn.Peerstore().SupportsProtocols(peerID, n.protocolID) if len(protocols) > 0 { - n.eventChannel <- &ConnectEvent{PeerID: peerID} - return + break } } - n.logger.Info("unable to get supported protocols", "pid", peerID) - _ = n.host.Network().ClosePeer(peerID) + n.eventChannel <- &ConnectEvent{PeerID: peerID} }() } diff --git a/network/peermgr.go b/network/peermgr.go index e1836161d..f688857d3 100644 --- a/network/peermgr.go +++ b/network/peermgr.go @@ -34,16 +34,17 @@ type peerMgr struct { // newPeerMgr creates a new Peer Manager instance. // Peer Manager attempts to establish connections with other nodes when the // number of connections falls below the minimum threshold. -func newPeerMgr(ctx context.Context, h lp2phost.Host, d lp2pnet.Dialer, dht *lp2pdht.IpfsDHT, - bootstrapAddrs []lp2ppeer.AddrInfo, minConns int, maxConns int, logger *logger.SubLogger, +func newPeerMgr(ctx context.Context, h lp2phost.Host, dht *lp2pdht.IpfsDHT, + conf *Config, logger *logger.SubLogger, ) *peerMgr { + bootstrapAddrs := PeerAddrsToAddrInfo(conf.BootstrapAddrs) b := &peerMgr{ ctx: ctx, bootstrapAddrs: bootstrapAddrs, - minConns: minConns, - maxConns: maxConns, + minConns: conf.MinConns, + maxConns: conf.MaxConns, host: h, - dialer: d, + dialer: h.Network(), dht: dht, logger: logger, } @@ -119,12 +120,5 @@ func (mgr *peerMgr) checkConnectivity() { ConnectAsync(mgr.ctx, mgr.host, pi, mgr.logger) } - - mgr.logger.Debug("expanding the connections") - - err := mgr.dht.Bootstrap(mgr.ctx) - if err != nil { - mgr.logger.Warn("peer discovery may suffer", "error", err) - } } }