Skip to content

Commit

Permalink
refactor(network): refactoring peer manager
Browse files Browse the repository at this point in the history
  • Loading branch information
b00f committed Oct 27, 2023
1 parent 3b95660 commit 69472a1
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 60 deletions.
30 changes: 8 additions & 22 deletions network/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ type dhtService struct {
ctx context.Context
host lp2phost.Host
kademlia *lp2pdht.IpfsDHT
peerMgr *peerMgr
logger *logger.SubLogger
}

Expand All @@ -24,45 +23,32 @@ func newDHTService(ctx context.Context, host lp2phost.Host, protocolID lp2pcore.
if conf.Bootstrapper {
mode = lp2pdht.ModeServer
}
bootsrapAddrs := PeerAddrsToAddrInfo(conf.BootstrapAddrs)
bootstrapAddrs := PeerAddrsToAddrInfo(conf.BootstrapAddrs)
opts := []lp2pdht.Option{
lp2pdht.Mode(mode),
lp2pdht.ProtocolPrefix(protocolID),
lp2pdht.BootstrapPeers(bootsrapAddrs...),
lp2pdht.BootstrapPeers(bootstrapAddrs...),
}

kademlia, err := lp2pdht.New(ctx, host, opts...)
if err != nil {
logger.Panic("unable to start DHT service", "error", err)
return nil
panic(err)

Check warning on line 35 in network/dht.go

View check run for this annotation

Codecov / codecov/patch

network/dht.go#L35

Added line #L35 was not covered by tests
}

err = kademlia.Bootstrap(ctx)
if err != nil {
panic(err.Error())
}

bootstrap := newPeerMgr(ctx, host, host.Network(), kademlia,
bootsrapAddrs, conf.MinConns, conf.MaxConns, logger)

return &dhtService{
ctx: ctx,
host: host,
kademlia: kademlia,
peerMgr: bootstrap,
logger: logger,
}
}

func (dht *dhtService) Start() error {
dht.peerMgr.Start()
return nil
func (s *dhtService) Start() error {
return s.kademlia.Bootstrap(s.ctx)
}

func (dht *dhtService) Stop() {
if err := dht.kademlia.Close(); err != nil {
dht.logger.Error("unable to close Kademlia", "error", err)
func (s *dhtService) Stop() {
if err := s.kademlia.Close(); err != nil {
s.logger.Error("unable to close Kademlia", "error", err)

Check warning on line 52 in network/dht.go

View check run for this annotation

Codecov / codecov/patch

network/dht.go#L52

Added line #L52 was not covered by tests
}

dht.peerMgr.Stop()
}
22 changes: 11 additions & 11 deletions network/mdns.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,30 +34,30 @@ func newMdnsService(ctx context.Context, host lp2phost.Host, logger *logger.SubL
// HandlePeerFound connects to peers discovered via mDNS. Once they're connected,
// the PubSub system will automatically start interacting with them if they also
// support PubSub.
func (mdns *mdnsService) HandlePeerFound(pi lp2ppeer.AddrInfo) {
ctx, cancel := context.WithTimeout(mdns.ctx, time.Second*10)
func (s *mdnsService) HandlePeerFound(pi lp2ppeer.AddrInfo) {
ctx, cancel := context.WithTimeout(s.ctx, time.Second*10)
defer cancel()

if pi.ID != mdns.host.ID() {
mdns.logger.Debug("connecting to new peer", "addr", pi.Addrs, "id", pi.ID.Pretty())
if err := mdns.host.Connect(ctx, pi); err != nil {
mdns.logger.Error("error on connecting to peer", "id", pi.ID.Pretty(), "error", err)
if pi.ID != s.host.ID() {
s.logger.Debug("connecting to new peer", "addr", pi.Addrs, "id", pi.ID.Pretty())
if err := s.host.Connect(ctx, pi); err != nil {
s.logger.Error("error on connecting to peer", "id", pi.ID.Pretty(), "error", err)

Check warning on line 44 in network/mdns.go

View check run for this annotation

Codecov / codecov/patch

network/mdns.go#L44

Added line #L44 was not covered by tests
}
}
}

func (mdns *mdnsService) Start() error {
err := mdns.service.Start()
func (s *mdnsService) Start() error {
err := s.service.Start()
if err != nil {
return LibP2PError{Err: err}
}

return nil
}

func (mdns *mdnsService) Stop() {
err := mdns.service.Close()
func (s *mdnsService) Stop() {
err := s.service.Close()
if err != nil {
mdns.logger.Error("unable to close the network", "error", err)
s.logger.Error("unable to close the network", "error", err)

Check warning on line 61 in network/mdns.go

View check run for this annotation

Codecov / codecov/patch

network/mdns.go#L61

Added line #L61 was not covered by tests
}
}
3 changes: 3 additions & 0 deletions network/mdns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,7 @@ func TestMDNS(t *testing.T) {
se := shouldReceiveEvent(t, net2, EventTypeStream).(*StreamMessage)
assert.Equal(t, se.Source, net1.SelfID())
assert.Equal(t, readData(t, se.Reader, len(msg)), msg)

net1.Stop()
net2.Stop()
}
5 changes: 5 additions & 0 deletions network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type network struct {
host lp2phost.Host
mdns *mdnsService
dht *dhtService
peerMgr *peerMgr
stream *streamService
gossip *gossipService
notifee *NotifeeService
Expand Down Expand Up @@ -152,6 +153,7 @@ func newNetwork(networkName string, conf *Config, opts []lp2p.Option) (*network,
if err != nil {
return nil, LibP2PError{Err: err}
}

opts = append(opts,
lp2p.EnableRelay(),
lp2p.EnableAutoRelayWithStaticRelays(static),
Expand Down Expand Up @@ -189,6 +191,7 @@ func newNetwork(networkName string, conf *Config, opts []lp2p.Option) (*network,
streamProtocolID := lp2pcore.ProtocolID(fmt.Sprintf("/%s/stream/v1", n.name))

n.dht = newDHTService(n.ctx, n.host, kadProtocolID, conf, n.logger)
n.peerMgr = newPeerMgr(ctx, host, n.dht.kademlia, conf, n.logger)
n.stream = newStreamService(ctx, n.host, streamProtocolID, relayAddrs, n.eventChannel, n.logger)
n.gossip = newGossipService(ctx, n.host, n.eventChannel, conf, n.logger)
n.notifee = newNotifeeService(n.host, n.eventChannel, n.logger, streamProtocolID)
Expand Down Expand Up @@ -217,6 +220,7 @@ func (n *network) Start() error {
}
n.gossip.Start()
n.stream.Start()
n.peerMgr.Start()

n.logger.Info("network started", "addr", n.host.Addrs())
return nil
Expand All @@ -232,6 +236,7 @@ func (n *network) Stop() {
n.dht.Stop()
n.gossip.Stop()
n.stream.Stop()
n.peerMgr.Stop()

if err := n.host.Close(); err != nil {
n.logger.Error("unable to close the network", "error", err)
Expand Down
33 changes: 22 additions & 11 deletions network/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,21 +150,20 @@ func TestNetwork(t *testing.T) {
relayAddrs = append(relayAddrs, addr2)
}

bootstrapPort := ts.RandInt32(9999) + 10000

// Bootstrap node
confB := testConfig()
bootstrapPort := ts.RandInt32(9999) + 10000
confB.Bootstrapper = true
confB.Listens = []string{
fmt.Sprintf("/ip4/127.0.0.1/tcp/%v", bootstrapPort),
fmt.Sprintf("/ip6/::1/tcp/%v", bootstrapPort),
}
fmt.Println("Starting Bootstrap node")
networkB := makeTestNetwork(t, confB, []lp2p.Option{
lp2p.ForceReachabilityPublic(),
})
bootstrapAddresses := []string{
fmt.Sprintf("/ip4/127.0.0.1/tcp/%v/p2p/%v", bootstrapPort, networkB.SelfID().String()),
fmt.Sprintf("/ip6/::1/tcp/%v/p2p/%v", bootstrapPort, networkB.SelfID().String()),
}
assert.NoError(t, networkB.JoinConsensusTopic())

Expand All @@ -174,7 +173,6 @@ func TestNetwork(t *testing.T) {
confP.BootstrapAddrs = bootstrapAddresses
confP.Listens = []string{
"/ip4/127.0.0.1/tcp/0",
"/ip6/::1/tcp/0",
}
fmt.Println("Starting Public node")
networkP := makeTestNetwork(t, confP, []lp2p.Option{
Expand All @@ -189,7 +187,6 @@ func TestNetwork(t *testing.T) {
confM.BootstrapAddrs = bootstrapAddresses
confM.Listens = []string{
"/ip4/127.0.0.1/tcp/0",
"/ip6/::1/tcp/0",
}
fmt.Println("Starting Private node M")
networkM := makeTestNetwork(t, confM, []lp2p.Option{
Expand All @@ -204,26 +201,31 @@ func TestNetwork(t *testing.T) {
confN.BootstrapAddrs = bootstrapAddresses
confN.Listens = []string{
"/ip4/127.0.0.1/tcp/0",
"/ip6/::1/tcp/0",
}
fmt.Println("Starting Private node N")
networkN := makeTestNetwork(t, confN, []lp2p.Option{
lp2p.ForceReachabilityPrivate(),
})
assert.NoError(t, networkN.JoinConsensusTopic())

// Private node X, doesn't join consensus topic
// Private node X, doesn't join consensus topic and without relay address
confX := testConfig()
confX.EnableRelay = false
confX.BootstrapAddrs = bootstrapAddresses
confX.Listens = []string{
"/ip4/127.0.0.1/tcp/0",
"/ip6/::1/tcp/0",
}
fmt.Println("Starting Private node X")
networkX := makeTestNetwork(t, confX, []lp2p.Option{
lp2p.ForceReachabilityPrivate(),
})

networkB.Start()

Check failure on line 223 in network/network_test.go

View workflow job for this annotation

GitHub Actions / linting

Error return value of `networkB.Start` is not checked (errcheck)

Check failure on line 223 in network/network_test.go

View workflow job for this annotation

GitHub Actions / build-linux

Error return value of `networkB.Start` is not checked (errcheck)
networkP.Start()

Check failure on line 224 in network/network_test.go

View workflow job for this annotation

GitHub Actions / linting

Error return value of `networkP.Start` is not checked (errcheck)

Check failure on line 224 in network/network_test.go

View workflow job for this annotation

GitHub Actions / build-linux

Error return value of `networkP.Start` is not checked (errcheck)
networkM.Start()

Check failure on line 225 in network/network_test.go

View workflow job for this annotation

GitHub Actions / linting

Error return value of `networkM.Start` is not checked (errcheck)

Check failure on line 225 in network/network_test.go

View workflow job for this annotation

GitHub Actions / build-linux

Error return value of `networkM.Start` is not checked (errcheck)
networkN.Start()

Check failure on line 226 in network/network_test.go

View workflow job for this annotation

GitHub Actions / linting

Error return value of `networkN.Start` is not checked (errcheck)

Check failure on line 226 in network/network_test.go

View workflow job for this annotation

GitHub Actions / build-linux

Error return value of `networkN.Start` is not checked (errcheck)
networkX.Start()

Check failure on line 227 in network/network_test.go

View workflow job for this annotation

GitHub Actions / linting

Error return value of `networkX.Start` is not checked (errcheck)

Check failure on line 227 in network/network_test.go

View workflow job for this annotation

GitHub Actions / build-linux

Error return value of `networkX.Start` is not checked (errcheck)

time.Sleep(2 * time.Second)

t.Run("all nodes have at least one connection to the bootstrap node B", func(t *testing.T) {
Expand Down Expand Up @@ -278,9 +280,18 @@ func TestNetwork(t *testing.T) {
msgM := []byte("test-stream-from-m")

require.NoError(t, networkM.SendTo(msgM, networkP.SelfID()))
eB := shouldReceiveEvent(t, networkP, EventTypeStream).(*StreamMessage)
assert.Equal(t, eB.Source, networkM.SelfID())
assert.Equal(t, readData(t, eB.Reader, len(msgM)), msgM)
eP := shouldReceiveEvent(t, networkP, EventTypeStream).(*StreamMessage)
assert.Equal(t, eP.Source, networkM.SelfID())
assert.Equal(t, readData(t, eP.Reader, len(msgM)), msgM)
})

t.Run("node P (public) is directly accessible by node X (private behind NAT, without relay)", func(t *testing.T) {
msgX := []byte("test-stream-from-x")

require.NoError(t, networkX.SendTo(msgX, networkP.SelfID()))
eP := shouldReceiveEvent(t, networkP, EventTypeStream).(*StreamMessage)
assert.Equal(t, eP.Source, networkX.SelfID())
assert.Equal(t, readData(t, eP.Reader, len(msgX)), msgX)
})

t.Run("node P (public) is directly accessible by node B (bootstrap)", func(t *testing.T) {
Expand Down
6 changes: 2 additions & 4 deletions network/notifee.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,11 @@ func (n *NotifeeService) Connected(lp2pn lp2pnetwork.Network, conn lp2pnetwork.C

protocols, _ := lp2pn.Peerstore().SupportsProtocols(peerID, n.protocolID)
if len(protocols) > 0 {
n.eventChannel <- &ConnectEvent{PeerID: peerID}
return
break
}
}

n.logger.Info("unable to get supported protocols", "pid", peerID)
_ = n.host.Network().ClosePeer(peerID)
n.eventChannel <- &ConnectEvent{PeerID: peerID}
}()
}

Expand Down
18 changes: 6 additions & 12 deletions network/peermgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,17 @@ type peerMgr struct {
// newPeerMgr creates a new Peer Manager instance.
// Peer Manager attempts to establish connections with other nodes when the
// number of connections falls below the minimum threshold.
func newPeerMgr(ctx context.Context, h lp2phost.Host, d lp2pnet.Dialer, dht *lp2pdht.IpfsDHT,
bootstrapAddrs []lp2ppeer.AddrInfo, minConns int, maxConns int, logger *logger.SubLogger,
func newPeerMgr(ctx context.Context, h lp2phost.Host, dht *lp2pdht.IpfsDHT,
conf *Config, logger *logger.SubLogger,
) *peerMgr {
bootstrapAddrs := PeerAddrsToAddrInfo(conf.BootstrapAddrs)
b := &peerMgr{
ctx: ctx,
bootstrapAddrs: bootstrapAddrs,
minConns: minConns,
maxConns: maxConns,
minConns: conf.MinConns,
maxConns: conf.MaxConns,
host: h,
dialer: d,
dialer: h.Network(),
dht: dht,
logger: logger,
}
Expand Down Expand Up @@ -119,12 +120,5 @@ func (mgr *peerMgr) checkConnectivity() {

ConnectAsync(mgr.ctx, mgr.host, pi, mgr.logger)
}

mgr.logger.Debug("expanding the connections")

err := mgr.dht.Bootstrap(mgr.ctx)
if err != nil {
mgr.logger.Warn("peer discovery may suffer", "error", err)
}
}
}

0 comments on commit 69472a1

Please sign in to comment.