From 9a7adcdf04a4f7c117b514dd9807d5cffa422e98 Mon Sep 17 00:00:00 2001 From: Mostafa Date: Mon, 6 Nov 2023 22:02:24 +0800 Subject: [PATCH 1/7] fix(network): check connection threshold on gater --- network/gater.go | 38 +++++++++++++-- network/network.go | 6 ++- network/notifee.go | 44 ++++++++++------- network/peermgr.go | 116 +++++++++++++++++++++++++++++++-------------- network/utils.go | 6 +++ 5 files changed, 150 insertions(+), 60 deletions(-) diff --git a/network/gater.go b/network/gater.go index 420e569b5..4a47f4c10 100644 --- a/network/gater.go +++ b/network/gater.go @@ -3,6 +3,7 @@ package network import ( lp2pconnmgr "github.com/libp2p/go-libp2p/core/connmgr" lp2pcontrol "github.com/libp2p/go-libp2p/core/control" + lp2phost "github.com/libp2p/go-libp2p/core/host" lp2pnetwork "github.com/libp2p/go-libp2p/core/network" lp2ppeer "github.com/libp2p/go-libp2p/core/peer" lp2pconngater "github.com/libp2p/go-libp2p/p2p/net/conngater" @@ -14,7 +15,10 @@ var _ lp2pconnmgr.ConnectionGater = &ConnectionGater{} type ConnectionGater struct { *lp2pconngater.BasicConnectionGater - logger *logger.SubLogger + + host lp2phost.Host + maxConn int + logger *logger.SubLogger } func NewConnectionGater(conf *Config, log *logger.SubLogger) (*ConnectionGater, error) { @@ -35,32 +39,56 @@ func NewConnectionGater(conf *Config, log *logger.SubLogger) (*ConnectionGater, return &ConnectionGater{ BasicConnectionGater: connGater, + maxConn: conf.MaxConns, logger: log, }, nil } +func (g *ConnectionGater) SetHost(host lp2phost.Host) { + g.host = host +} + +func (g *ConnectionGater) checkThreshold() bool { + return len(g.host.Network().Peers()) > g.maxConn +} + func (g *ConnectionGater) InterceptPeerDial(p lp2ppeer.ID) bool { + if g.checkThreshold() { + g.logger.Debug("InterceptPeerDial rejected: many connections") + return false + } + allow := g.BasicConnectionGater.InterceptPeerDial(p) if !allow { - g.logger.Debug("InterceptPeerDial not allowed", "p") + g.logger.Debug("InterceptPeerDial rejected", "p") } return allow } func (g *ConnectionGater) InterceptAddrDial(p lp2ppeer.ID, ma multiaddr.Multiaddr) bool { + if g.checkThreshold() { + g.logger.Debug("InterceptAddrDial rejected: many connections") + return false + } + allow := g.BasicConnectionGater.InterceptAddrDial(p, ma) if !allow { - g.logger.Debug("InterceptAddrDial not allowed", "p", p, "ma", ma.String()) + g.logger.Debug("InterceptAddrDial rejected", "p", p, "ma", ma.String()) } return allow } func (g *ConnectionGater) InterceptAccept(cma lp2pnetwork.ConnMultiaddrs) bool { + if g.checkThreshold() { + g.logger.Debug("InterceptAccept rejected: many connections") + return false + } + allow := g.BasicConnectionGater.InterceptAccept(cma) if !allow { - g.logger.Debug("InterceptAccept not allowed") + g.logger.Debug("InterceptAccept rejected") } return allow @@ -71,7 +99,7 @@ func (g *ConnectionGater) InterceptSecured(dir lp2pnetwork.Direction, p lp2ppeer ) bool { allow := g.BasicConnectionGater.InterceptSecured(dir, p, cma) if !allow { - g.logger.Debug("InterceptSecured not allowed", "p", p) + g.logger.Debug("InterceptSecured rejected", "p", p) } return allow diff --git a/network/network.go b/network/network.go index f874679d9..58dc8661d 100644 --- a/network/network.go +++ b/network/network.go @@ -192,6 +192,7 @@ func newNetwork(networkName string, conf *Config, log *logger.SubLogger, opts [] if err != nil { return nil, LibP2PError{Err: err} } + connGater.SetHost(host) ctx, cancel := context.WithCancel(context.Background()) @@ -215,10 +216,10 @@ func newNetwork(networkName string, conf *Config, log *logger.SubLogger, opts [] n.mdns = newMdnsService(ctx, n.host, n.logger) } n.dht = newDHTService(n.ctx, n.host, kadProtocolID, conf, n.logger) - n.peerMgr = newPeerMgr(ctx, host, n.dht.kademlia, conf, n.logger) + n.peerMgr = newPeerMgr(ctx, host, n.dht.kademlia, streamProtocolID, conf, n.logger) n.stream = newStreamService(ctx, n.host, streamProtocolID, n.eventChannel, n.logger) n.gossip = newGossipService(ctx, n.host, n.eventChannel, conf, n.logger) - n.notifee = newNotifeeService(n.host, n.eventChannel, n.logger, streamProtocolID, conf.Bootstrapper) + n.notifee = newNotifeeService(n.host, n.eventChannel, n.peerMgr, streamProtocolID, conf.Bootstrapper, n.logger) n.host.Network().Notify(n.notifee) @@ -272,6 +273,7 @@ func (n *network) SelfID() lp2ppeer.ID { } func (n *network) SendTo(msg []byte, pid lp2pcore.PeerID) error { + n.logger.Trace("Sending new message", "to", pid) return n.stream.SendRequest(msg, pid) } diff --git a/network/notifee.go b/network/notifee.go index 02282ba76..2b7c11093 100644 --- a/network/notifee.go +++ b/network/notifee.go @@ -3,6 +3,7 @@ package network import ( "time" + lp2pcore "github.com/libp2p/go-libp2p/core" lp2phost "github.com/libp2p/go-libp2p/core/host" lp2pnetwork "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/protocol" @@ -16,27 +17,30 @@ type NotifeeService struct { eventChannel chan<- Event logger *logger.SubLogger protocolID protocol.ID + peerMgr *peerMgr bootstrapper bool } -func newNotifeeService(host lp2phost.Host, eventChannel chan<- Event, - log *logger.SubLogger, protocolID protocol.ID, bootstrapper bool, +func newNotifeeService(host lp2phost.Host, eventChannel chan<- Event, peerMgr *peerMgr, + protocolID protocol.ID, bootstrapper bool, log *logger.SubLogger, ) *NotifeeService { notifee := &NotifeeService{ host: host, eventChannel: eventChannel, - logger: log, protocolID: protocolID, bootstrapper: bootstrapper, + peerMgr: peerMgr, + logger: log, } host.Network().Notify(notifee) return notifee } func (n *NotifeeService) Connected(lp2pn lp2pnetwork.Network, conn lp2pnetwork.Conn) { - peerID := conn.RemotePeer() - n.logger.Info("connected to peer", "pid", peerID) + pid := conn.RemotePeer() + n.logger.Info("connected to peer", "pid", pid, "direction", conn.Stat().Direction) + var protocols []lp2pcore.ProtocolID go func() { for i := 0; i < 10; i++ { // TODO: better way? @@ -44,33 +48,39 @@ func (n *NotifeeService) Connected(lp2pn lp2pnetwork.Network, conn lp2pnetwork.C time.Sleep(1 * time.Second) peerStore := lp2pn.Peerstore() - protocols, _ := peerStore.GetProtocols(peerID) + protocols, _ = peerStore.GetProtocols(pid) if len(protocols) > 0 { if slices.Contains(protocols, n.protocolID) { n.logger.Debug("peer supports the stream protocol", - "pid", peerID, "protocols", protocols) + "pid", pid, "protocols", protocols) - n.eventChannel <- &ConnectEvent{PeerID: peerID} + n.eventChannel <- &ConnectEvent{PeerID: pid} } else { n.logger.Debug("peer doesn't support the stream protocol", - "pid", peerID, "protocols", protocols) + "pid", pid, "protocols", protocols) } - return + break } } - n.logger.Info("unable to get supported protocols", "pid", peerID) - if !n.bootstrapper { - // Close this connection since we can't send a direct message to this peer. - _ = n.host.Network().ClosePeer(peerID) + if len(protocols) == 0 { + n.logger.Info("unable to get supported protocols", "pid", pid) + if !n.bootstrapper { + // Close this connection since we can't send a direct message to this peer. + _ = n.host.Network().ClosePeer(pid) + } } + + n.peerMgr.AddPeer(pid, conn.RemoteMultiaddr(), conn.Stat().Direction, protocols) }() } func (n *NotifeeService) Disconnected(_ lp2pnetwork.Network, conn lp2pnetwork.Conn) { - peerID := conn.RemotePeer() - n.logger.Info("disconnected from peer", "pid", peerID) - n.eventChannel <- &DisconnectEvent{PeerID: peerID} + pid := conn.RemotePeer() + n.logger.Info("disconnected from peer", "pid", pid) + n.eventChannel <- &DisconnectEvent{PeerID: pid} + + n.peerMgr.RemovePeer(pid) } func (n *NotifeeService) Listen(_ lp2pnetwork.Network, ma multiaddr.Multiaddr) { diff --git a/network/peermgr.go b/network/peermgr.go index a0f6ad97a..6b0db7122 100644 --- a/network/peermgr.go +++ b/network/peermgr.go @@ -2,50 +2,56 @@ package network import ( "context" + "sync" "time" lp2pdht "github.com/libp2p/go-libp2p-kad-dht" + lp2pcore "github.com/libp2p/go-libp2p/core" lp2phost "github.com/libp2p/go-libp2p/core/host" lp2pnet "github.com/libp2p/go-libp2p/core/network" lp2ppeer "github.com/libp2p/go-libp2p/core/peer" lp2pswarm "github.com/libp2p/go-libp2p/p2p/net/swarm" + "github.com/multiformats/go-multiaddr" "github.com/pactus-project/pactus/util/logger" + "golang.org/x/exp/slices" ) -// peerMgr attempts to keep the p2p host connected to the network -// by keeping a minimum threshold of connections. If the threshold isn't met it -// connects to a random subset of the peerMgr peers. It does not use peer routing -// to discover new peers. To stop a peerMgr cancel the context passed in Start() -// or call Stop(). -type peerMgr struct { - ctx context.Context - bootstrapAddrs []lp2ppeer.AddrInfo - minConns int - maxConns int - - // Dependencies - host lp2phost.Host - dialer lp2pnet.Dialer - dht *lp2pdht.IpfsDHT - - logger *logger.SubLogger +type peerInfo struct { + MultiAddress multiaddr.Multiaddr + Direction lp2pnet.Direction + Protocols []lp2pcore.ProtocolID } -// newPeerMgr creates a new Peer Manager instance. // Peer Manager attempts to establish connections with other nodes when the // number of connections falls below the minimum threshold. +type peerMgr struct { + lk sync.RWMutex + + ctx context.Context + bootstrapAddrs []lp2ppeer.AddrInfo + minConns int + maxConns int + host lp2phost.Host + dht *lp2pdht.IpfsDHT + peers map[lp2ppeer.ID]*peerInfo + streamProtocolID lp2pcore.ProtocolID + logger *logger.SubLogger +} + +// newPeerMgr creates a new Peer Manager instance. func newPeerMgr(ctx context.Context, h lp2phost.Host, dht *lp2pdht.IpfsDHT, - conf *Config, log *logger.SubLogger, + streamProtocolID lp2pcore.ProtocolID, conf *Config, log *logger.SubLogger, ) *peerMgr { b := &peerMgr{ - ctx: ctx, - bootstrapAddrs: conf.BootstrapAddrInfos(), - minConns: conf.MinConns, - maxConns: conf.MaxConns, - host: h, - dialer: h.Network(), - dht: dht, - logger: log, + ctx: ctx, + bootstrapAddrs: conf.BootstrapAddrInfos(), + minConns: conf.MinConns, + maxConns: conf.MaxConns, + streamProtocolID: streamProtocolID, + peers: make(map[lp2ppeer.ID]*peerInfo), + host: h, + dht: dht, + logger: log, } return b @@ -53,7 +59,7 @@ func newPeerMgr(ctx context.Context, h lp2phost.Host, dht *lp2pdht.IpfsDHT, // Start starts the Peer Manager. func (mgr *peerMgr) Start() { - mgr.checkConnectivity() + mgr.CheckConnectivity() go func() { ticker := time.NewTicker(1 * time.Minute) @@ -64,7 +70,7 @@ func (mgr *peerMgr) Start() { case <-mgr.ctx.Done(): return case <-ticker.C: - mgr.checkConnectivity() + mgr.CheckConnectivity() } } }() @@ -75,20 +81,58 @@ func (mgr *peerMgr) Stop() { // TODO: complete me } +func (mgr *peerMgr) AddPeer(pid lp2ppeer.ID, ma multiaddr.Multiaddr, + direction lp2pnet.Direction, protocols []lp2pcore.ProtocolID, +) { + mgr.lk.Lock() + defer mgr.lk.Unlock() + + mgr.peers[pid] = &peerInfo{ + MultiAddress: ma, + Direction: direction, + Protocols: protocols, + } +} + +func (mgr *peerMgr) RemovePeer(pid lp2ppeer.ID) { + mgr.lk.Lock() + defer mgr.lk.Unlock() + + delete(mgr.peers, pid) +} + // checkConnectivity performs the actual work of maintaining connections. // It ensures that the number of connections stays within the minimum and maximum thresholds. -func (mgr *peerMgr) checkConnectivity() { - currentPeers := mgr.dialer.Peers() - mgr.logger.Debug("check connectivity", "peers", len(currentPeers)) +func (mgr *peerMgr) CheckConnectivity() { + mgr.lk.Lock() + defer mgr.lk.Unlock() + + mgr.logger.Debug("check connectivity", "peers", len(mgr.peers)) // Let's check if some peers are disconnected var connectedPeers []lp2ppeer.ID - for _, p := range currentPeers { - connectedness := mgr.dialer.Connectedness(p) + var streamPeers []lp2ppeer.ID + net := mgr.host.Network() + for pid, pi := range mgr.peers { + connectedness := net.Connectedness(pid) if connectedness == lp2pnet.Connected { - connectedPeers = append(connectedPeers, p) + connectedPeers = append(connectedPeers, pid) } else { - mgr.logger.Debug("peer is not connected to us", "peer", p) + mgr.logger.Debug("peer is not connected to us", "peer", pid) + delete(mgr.peers, pid) + } + + if slices.Contains(pi.Protocols, mgr.streamProtocolID) { + streamPeers = append(streamPeers, pid) + } + } + + // Make sure we have connected to at least one peer that supports the stream protocol. + if len(streamPeers) == 0 { + mgr.logger.Warn("no stream connection") // TODO: is it possible? + + for pid := range mgr.peers { + _ = net.ClosePeer(pid) } } diff --git a/network/utils.go b/network/utils.go index 33887a6ee..201dc24f0 100644 --- a/network/utils.go +++ b/network/utils.go @@ -139,18 +139,24 @@ func SubnetsToFilters(subnets []*net.IPNet, action multiaddr.Action) *multiaddr. func MakeScalingLimitConfig(minConns, maxConns int) lp2prcmgr.ScalingLimitConfig { limit := lp2prcmgr.DefaultLimits + limit.SystemBaseLimit.ConnsOutbound = LogScale(maxConns / 2) limit.SystemBaseLimit.ConnsInbound = LogScale(maxConns / 2) limit.SystemBaseLimit.Conns = LogScale(maxConns) + limit.SystemBaseLimit.StreamsOutbound = LogScale(maxConns / 2) limit.SystemBaseLimit.StreamsInbound = LogScale(maxConns / 2) limit.SystemBaseLimit.Streams = LogScale(maxConns) + limit.ServiceLimitIncrease.ConnsOutbound = LogScale(minConns / 2) limit.ServiceLimitIncrease.ConnsInbound = LogScale(minConns / 2) limit.ServiceLimitIncrease.Conns = LogScale(minConns) + limit.ServiceLimitIncrease.StreamsOutbound = LogScale(minConns / 2) limit.ServiceLimitIncrease.StreamsInbound = LogScale(minConns / 2) limit.ServiceLimitIncrease.Streams = LogScale(minConns) + limit.TransientBaseLimit.ConnsOutbound = LogScale(maxConns / 2) limit.TransientBaseLimit.ConnsInbound = LogScale(maxConns / 2) limit.TransientBaseLimit.Conns = LogScale(maxConns) + limit.TransientBaseLimit.StreamsOutbound = LogScale(maxConns / 2) limit.TransientBaseLimit.StreamsInbound = LogScale(maxConns / 2) limit.TransientBaseLimit.Streams = LogScale(maxConns) From 4d4965ee83126933aabca2222b8eba7cdbbcc9bc Mon Sep 17 00:00:00 2001 From: Mostafa Date: Tue, 7 Nov 2023 00:23:28 +0800 Subject: [PATCH 2/7] fix(network): updating peer manager --- network/peermgr.go | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/network/peermgr.go b/network/peermgr.go index 6b0db7122..bd2147acf 100644 --- a/network/peermgr.go +++ b/network/peermgr.go @@ -109,31 +109,37 @@ func (mgr *peerMgr) CheckConnectivity() { mgr.logger.Debug("check connectivity", "peers", len(mgr.peers)) - // Let's check if some peers are disconnected - var connectedPeers []lp2ppeer.ID - var streamPeers []lp2ppeer.ID net := mgr.host.Network() - for pid, pi := range mgr.peers { - connectedness := net.Connectedness(pid) - if connectedness == lp2pnet.Connected { - connectedPeers = append(connectedPeers, pid) - } else { - mgr.logger.Debug("peer is not connected to us", "peer", pid) - delete(mgr.peers, pid) - } + // Make sure we have connected to at least one peer that supports the stream protocol. + hasStreamConn := 0 + for _, pi := range mgr.peers { if slices.Contains(pi.Protocols, mgr.streamProtocolID) { - streamPeers = append(streamPeers, pid) + hasStreamConn++ } } - // Make sure we have connected to at least one peer that supports the stream protocol. - if len(streamPeers) == 0 { - mgr.logger.Warn("no stream connection") // TODO: is it possible? + if hasStreamConn == 0 { + // TODO: is it possible? + mgr.logger.Warn("no stream connection") for pid := range mgr.peers { _ = net.ClosePeer(pid) } + + time.Sleep(1 * time.Second) + } + + // Let's check if some peers are disconnected + var connectedPeers []lp2ppeer.ID + for pid := range mgr.peers { + connectedness := net.Connectedness(pid) + if connectedness == lp2pnet.Connected { + connectedPeers = append(connectedPeers, pid) + } else { + mgr.logger.Debug("peer is not connected to us", "peer", pid) + delete(mgr.peers, pid) + } } if len(connectedPeers) > mgr.maxConns { From bbcc1b6460043a4aa9eeeeb02341262f8b8f2955 Mon Sep 17 00:00:00 2001 From: Mostafa Date: Tue, 7 Nov 2023 01:18:20 +0800 Subject: [PATCH 3/7] fix: update the default connections (max and min) --- config/config.go | 4 ++-- network/config.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/config/config.go b/config/config.go index 6e2bb78dc..5e41739e0 100644 --- a/config/config.go +++ b/config/config.go @@ -119,8 +119,8 @@ func SaveTestnetConfig(path string, numValidators int) error { "/dns/pactus.nodesync.top/tcp/21777/p2p/12D3KooWP25ejVsd7cL5DvWAPwEu4JTUwnPniHBf4w93tgSezVt8", // NodeSync.Top (lthuan2011@gmail.com) "/ip4/95.217.89.202/tcp/21777/p2p/12D3KooWMsi5oYkbbpyyXctmPXzF8UZu2pCvKPRZGyvymhN9BzTD", // CodeBlockLabs (emailbuatcariduit@gmail.com) } - conf.Network.MinConns = 8 - conf.Network.MaxConns = 16 + conf.Network.MinConns = 16 + conf.Network.MaxConns = 32 conf.Network.EnableNAT = false conf.Network.EnableRelay = false conf.Network.RelayAddrStrings = []string{ diff --git a/network/config.go b/network/config.go index 1e49bce13..6267703e6 100644 --- a/network/config.go +++ b/network/config.go @@ -53,8 +53,8 @@ func DefaultConfig() *Config { }, RelayAddrStrings: []string{}, BootstrapAddrStrings: bootstrapAddrs, - MinConns: 8, - MaxConns: 16, + MinConns: 16, + MaxConns: 32, EnableNAT: false, EnableRelay: false, EnableMdns: false, From b9b4214a06b0dcb47f89a7e0f1354c678d0c4698 Mon Sep 17 00:00:00 2001 From: Mostafa Date: Tue, 7 Nov 2023 02:01:25 +0800 Subject: [PATCH 4/7] test: updating the config test --- config/example_config.toml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/config/example_config.toml b/config/example_config.toml index 64c93ed85..0b52b19eb 100644 --- a/config/example_config.toml +++ b/config/example_config.toml @@ -39,12 +39,12 @@ ## bootstrap_addrs = ["/ip4/172.104.46.145/tcp/21777/p2p/12D3KooWNYD4bB82YZRXv6oNyYPwc5ozabx2epv75ATV3D8VD3Mq"] # `min_connections` is the minimum number of connections that the Pactus node should maintain. - # Default is 8 - ## min_connections = 8 + # Default is 16 + ## min_connections = 16 # `max_connections` is the maximum number of connections that the Pactus node should maintain. - # Default is 16 - ## max_connections = 16 + # Default is 32 + ## max_connections = 32 # `enable_nat` indicates whether NAT service should be enabled or not. # NAT service allows many machines to share a single public address. From 9a176b300ccc202845666b75b55436219222bbf9 Mon Sep 17 00:00:00 2001 From: Mostafa Date: Tue, 7 Nov 2023 18:00:49 +0800 Subject: [PATCH 5/7] chore: renaming a function --- network/gater.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/network/gater.go b/network/gater.go index 4a47f4c10..b8df15571 100644 --- a/network/gater.go +++ b/network/gater.go @@ -48,12 +48,12 @@ func (g *ConnectionGater) SetHost(host lp2phost.Host) { g.host = host } -func (g *ConnectionGater) checkThreshold() bool { +func (g *ConnectionGater) hasMaxConnections() bool { return len(g.host.Network().Peers()) > g.maxConn } func (g *ConnectionGater) InterceptPeerDial(p lp2ppeer.ID) bool { - if g.checkThreshold() { + if g.hasMaxConnections() { g.logger.Debug("InterceptPeerDial rejected: many connections") return false } @@ -67,7 +67,7 @@ func (g *ConnectionGater) InterceptPeerDial(p lp2ppeer.ID) bool { } func (g *ConnectionGater) InterceptAddrDial(p lp2ppeer.ID, ma multiaddr.Multiaddr) bool { - if g.checkThreshold() { + if g.hasMaxConnections() { g.logger.Debug("InterceptAddrDial rejected: many connections") return false } @@ -81,7 +81,7 @@ func (g *ConnectionGater) InterceptAddrDial(p lp2ppeer.ID, ma multiaddr.Multiadd } func (g *ConnectionGater) InterceptAccept(cma lp2pnetwork.ConnMultiaddrs) bool { - if g.checkThreshold() { + if g.hasMaxConnections() { g.logger.Debug("InterceptAccept rejected: many connections") return false } From fcab4ac4e88c8a77b97c1384cbdb5cad73dc24b5 Mon Sep 17 00:00:00 2001 From: Mostafa Date: Tue, 7 Nov 2023 20:02:14 +0800 Subject: [PATCH 6/7] refactor: refactoring connection gater --- network/gater.go | 96 +++++++++++++++++++++---------------------- network/gater_test.go | 82 ++++++++++++++++++++++++++++++++++++ network/network.go | 2 +- network/peermgr.go | 44 +++++++++++--------- 4 files changed, 155 insertions(+), 69 deletions(-) diff --git a/network/gater.go b/network/gater.go index b8df15571..e05648118 100644 --- a/network/gater.go +++ b/network/gater.go @@ -1,12 +1,12 @@ package network import ( + "sync" + lp2pconnmgr "github.com/libp2p/go-libp2p/core/connmgr" lp2pcontrol "github.com/libp2p/go-libp2p/core/control" - lp2phost "github.com/libp2p/go-libp2p/core/host" lp2pnetwork "github.com/libp2p/go-libp2p/core/network" lp2ppeer "github.com/libp2p/go-libp2p/core/peer" - lp2pconngater "github.com/libp2p/go-libp2p/p2p/net/conngater" "github.com/multiformats/go-multiaddr" "github.com/pactus-project/pactus/util/logger" ) @@ -14,97 +14,95 @@ import ( var _ lp2pconnmgr.ConnectionGater = &ConnectionGater{} type ConnectionGater struct { - *lp2pconngater.BasicConnectionGater + lk sync.RWMutex - host lp2phost.Host + filters *multiaddr.Filters + peerMgr *peerMgr maxConn int logger *logger.SubLogger } func NewConnectionGater(conf *Config, log *logger.SubLogger) (*ConnectionGater, error) { - connGater, err := lp2pconngater.NewBasicConnectionGater(nil) - if err != nil { - return nil, err - } - + filters := multiaddr.NewFilters() if !conf.ForcePrivateNetwork { privateSubnets := PrivateSubnets() - for _, sn := range privateSubnets { - err := connGater.BlockSubnet(sn) - if err != nil { - return nil, LibP2PError{Err: err} - } - } + filters = SubnetsToFilters(privateSubnets, multiaddr.ActionDeny) } return &ConnectionGater{ - BasicConnectionGater: connGater, - maxConn: conf.MaxConns, - logger: log, + filters: filters, + maxConn: conf.MaxConns, + logger: log, }, nil } -func (g *ConnectionGater) SetHost(host lp2phost.Host) { - g.host = host +func (g *ConnectionGater) SetPeerManager(peerMgr *peerMgr) { + g.lk.Lock() + defer g.lk.Unlock() + + g.peerMgr = peerMgr } func (g *ConnectionGater) hasMaxConnections() bool { - return len(g.host.Network().Peers()) > g.maxConn + if g.peerMgr == nil { + return false + } + + return g.peerMgr.NumOfConnected() > g.maxConn } -func (g *ConnectionGater) InterceptPeerDial(p lp2ppeer.ID) bool { +func (g *ConnectionGater) InterceptPeerDial(pid lp2ppeer.ID) bool { + g.lk.RLock() + defer g.lk.RUnlock() + if g.hasMaxConnections() { - g.logger.Debug("InterceptPeerDial rejected: many connections") + g.logger.Debug("InterceptPeerDial rejected: many connections", "pid", pid) return false } - allow := g.BasicConnectionGater.InterceptPeerDial(p) - if !allow { - g.logger.Debug("InterceptPeerDial rejected", "p") - } - - return allow + return true } -func (g *ConnectionGater) InterceptAddrDial(p lp2ppeer.ID, ma multiaddr.Multiaddr) bool { +func (g *ConnectionGater) InterceptAddrDial(pid lp2ppeer.ID, ma multiaddr.Multiaddr) bool { + g.lk.RLock() + defer g.lk.RUnlock() + if g.hasMaxConnections() { - g.logger.Debug("InterceptAddrDial rejected: many connections") + g.logger.Debug("InterceptAddrDial rejected: many connections", "pid", pid, "ma", ma.String()) return false } - allow := g.BasicConnectionGater.InterceptAddrDial(p, ma) - if !allow { - g.logger.Debug("InterceptAddrDial rejected", "p", p, "ma", ma.String()) + deny := g.filters.AddrBlocked(ma) + if deny { + g.logger.Debug("InterceptAddrDial rejected", "pid", pid, "ma", ma.String()) + return false } - return allow + return true } func (g *ConnectionGater) InterceptAccept(cma lp2pnetwork.ConnMultiaddrs) bool { + g.lk.RLock() + defer g.lk.RUnlock() + if g.hasMaxConnections() { g.logger.Debug("InterceptAccept rejected: many connections") return false } - allow := g.BasicConnectionGater.InterceptAccept(cma) - if !allow { + deny := g.filters.AddrBlocked(cma.RemoteMultiaddr()) + if deny { g.logger.Debug("InterceptAccept rejected") + return false } - return allow + return true } -func (g *ConnectionGater) InterceptSecured(dir lp2pnetwork.Direction, p lp2ppeer.ID, - cma lp2pnetwork.ConnMultiaddrs, -) bool { - allow := g.BasicConnectionGater.InterceptSecured(dir, p, cma) - if !allow { - g.logger.Debug("InterceptSecured rejected", "p", p) - } - - return allow +func (g *ConnectionGater) InterceptSecured(_ lp2pnetwork.Direction, _ lp2ppeer.ID, _ lp2pnetwork.ConnMultiaddrs) bool { + return true } -func (g *ConnectionGater) InterceptUpgraded(con lp2pnetwork.Conn) (bool, lp2pcontrol.DisconnectReason) { - return g.BasicConnectionGater.InterceptUpgraded(con) +func (g *ConnectionGater) InterceptUpgraded(_ lp2pnetwork.Conn) (bool, lp2pcontrol.DisconnectReason) { + return true, 0 } diff --git a/network/gater_test.go b/network/gater_test.go index 1ae2e9d50..df2a8c48e 100644 --- a/network/gater_test.go +++ b/network/gater_test.go @@ -1 +1,83 @@ package network + +import ( + "testing" + + lp2pnet "github.com/libp2p/go-libp2p/core/network" + "github.com/multiformats/go-multiaddr" + "github.com/pactus-project/pactus/util/testsuite" + "github.com/stretchr/testify/assert" +) + +type mockConnMultiaddrs struct { + remote multiaddr.Multiaddr +} + +func (cma *mockConnMultiaddrs) LocalMultiaddr() multiaddr.Multiaddr { + return nil +} + +func (cma *mockConnMultiaddrs) RemoteMultiaddr() multiaddr.Multiaddr { + return cma.remote +} + +func TestAllowedConnections(t *testing.T) { + ts := testsuite.NewTestSuite(t) + conf := testConfig() + net := makeTestNetwork(t, conf, nil) + + maPrivate := multiaddr.StringCast("/ip4/127.0.0.1/tcp/1234") + maPublic := multiaddr.StringCast("/ip4/8.8.8.8/tcp/1234") + cmaPrivate := &mockConnMultiaddrs{remote: maPrivate} + cmaPublic := &mockConnMultiaddrs{remote: maPublic} + pid := ts.RandPeerID() + + assert.True(t, net.connGater.InterceptPeerDial(pid)) + assert.True(t, net.connGater.InterceptAddrDial(pid, maPrivate)) + assert.True(t, net.connGater.InterceptAddrDial(pid, maPublic)) + assert.True(t, net.connGater.InterceptAccept(cmaPrivate)) + assert.True(t, net.connGater.InterceptAccept(cmaPublic)) +} + +func TestDenyPrivate(t *testing.T) { + ts := testsuite.NewTestSuite(t) + conf := testConfig() + conf.ForcePrivateNetwork = false + net := makeTestNetwork(t, conf, nil) + + maPrivate := multiaddr.StringCast("/ip4/127.0.0.1/tcp/1234") + maPublic := multiaddr.StringCast("/ip4/8.8.8.8/tcp/1234") + cmaPrivate := &mockConnMultiaddrs{remote: maPrivate} + cmaPublic := &mockConnMultiaddrs{remote: maPublic} + pid := ts.RandPeerID() + + assert.True(t, net.connGater.InterceptPeerDial(pid)) + assert.False(t, net.connGater.InterceptAddrDial(pid, maPrivate)) + assert.True(t, net.connGater.InterceptAddrDial(pid, maPublic)) + assert.False(t, net.connGater.InterceptAccept(cmaPrivate)) + assert.True(t, net.connGater.InterceptAccept(cmaPublic)) +} + +func TestMaxConnection(t *testing.T) { + ts := testsuite.NewTestSuite(t) + conf := testConfig() + conf.MaxConns = 1 + net := makeTestNetwork(t, conf, nil) + + maPrivate := multiaddr.StringCast("/ip4/127.0.0.1/tcp/1234") + maPublic := multiaddr.StringCast("/ip4/8.8.8.8/tcp/1234") + cmaPrivate := &mockConnMultiaddrs{remote: maPrivate} + cmaPublic := &mockConnMultiaddrs{remote: maPublic} + pid := ts.RandPeerID() + + net.peerMgr.AddPeer(ts.RandPeerID(), + multiaddr.StringCast("/ip4/2.2.2.2/tcp/1234"), lp2pnet.DirInbound, nil) + net.peerMgr.AddPeer(ts.RandPeerID(), + multiaddr.StringCast("/ip4/3.3.3.3/tcp/1234"), lp2pnet.DirInbound, nil) + + assert.False(t, net.connGater.InterceptPeerDial(pid)) + assert.False(t, net.connGater.InterceptAddrDial(pid, maPrivate)) + assert.False(t, net.connGater.InterceptAddrDial(pid, maPublic)) + assert.False(t, net.connGater.InterceptAccept(cmaPrivate)) + assert.False(t, net.connGater.InterceptAccept(cmaPublic)) +} diff --git a/network/network.go b/network/network.go index 58dc8661d..834ca0a5f 100644 --- a/network/network.go +++ b/network/network.go @@ -192,7 +192,6 @@ func newNetwork(networkName string, conf *Config, log *logger.SubLogger, opts [] if err != nil { return nil, LibP2PError{Err: err} } - connGater.SetHost(host) ctx, cancel := context.WithCancel(context.Background()) @@ -222,6 +221,7 @@ func newNetwork(networkName string, conf *Config, log *logger.SubLogger, opts [] n.notifee = newNotifeeService(n.host, n.eventChannel, n.peerMgr, streamProtocolID, conf.Bootstrapper, n.logger) n.host.Network().Notify(n.notifee) + n.connGater.SetPeerManager(n.peerMgr) n.logger.Info("network setup", "id", n.host.ID(), "address", conf.ListenAddrStrings, diff --git a/network/peermgr.go b/network/peermgr.go index bd2147acf..cc745baba 100644 --- a/network/peermgr.go +++ b/network/peermgr.go @@ -81,6 +81,13 @@ func (mgr *peerMgr) Stop() { // TODO: complete me } +func (mgr *peerMgr) NumOfConnected() int { + mgr.lk.RLock() + defer mgr.lk.RUnlock() + + return len(mgr.peers) // TODO: try to keep record of all peers + connected peers +} + func (mgr *peerMgr) AddPeer(pid lp2ppeer.ID, ma multiaddr.Multiaddr, direction lp2pnet.Direction, protocols []lp2pcore.ProtocolID, ) { @@ -111,25 +118,6 @@ func (mgr *peerMgr) CheckConnectivity() { net := mgr.host.Network() - // Make sure we have connected to at least one peer that supports the stream protocol. - hasStreamConn := 0 - for _, pi := range mgr.peers { - if slices.Contains(pi.Protocols, mgr.streamProtocolID) { - hasStreamConn++ - } - } - - if hasStreamConn == 0 { - // TODO: is it possible? - mgr.logger.Warn("no stream connection") - - for pid := range mgr.peers { - _ = net.ClosePeer(pid) - } - - time.Sleep(1 * time.Second) - } - // Let's check if some peers are disconnected var connectedPeers []lp2ppeer.ID for pid := range mgr.peers { @@ -143,6 +131,24 @@ func (mgr *peerMgr) CheckConnectivity() { } if len(connectedPeers) > mgr.maxConns { + // Make sure we have connected to at least one peer that supports the stream protocol. + hasStreamConn := false + for _, pi := range mgr.peers { + if slices.Contains(pi.Protocols, mgr.streamProtocolID) { + hasStreamConn = true + break + } + } + + if !hasStreamConn { + // TODO: is it possible? + mgr.logger.Warn("no stream connection") + + for pid := range mgr.peers { + _ = net.ClosePeer(pid) + } + } + mgr.logger.Debug("peer count is about maximum threshold", "count", len(connectedPeers), "max", mgr.maxConns) From 94ee81c53ac8a36e754d5bad8f7bcaed884161de Mon Sep 17 00:00:00 2001 From: Mostafa Date: Tue, 7 Nov 2023 23:24:49 +0800 Subject: [PATCH 7/7] fix: remove stream count --- network/peermgr.go | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/network/peermgr.go b/network/peermgr.go index cc745baba..83b504684 100644 --- a/network/peermgr.go +++ b/network/peermgr.go @@ -13,7 +13,6 @@ import ( lp2pswarm "github.com/libp2p/go-libp2p/p2p/net/swarm" "github.com/multiformats/go-multiaddr" "github.com/pactus-project/pactus/util/logger" - "golang.org/x/exp/slices" ) type peerInfo struct { @@ -131,24 +130,6 @@ func (mgr *peerMgr) CheckConnectivity() { } if len(connectedPeers) > mgr.maxConns { - // Make sure we have connected to at least one peer that supports the stream protocol. - hasStreamConn := false - for _, pi := range mgr.peers { - if slices.Contains(pi.Protocols, mgr.streamProtocolID) { - hasStreamConn = true - break - } - } - - if !hasStreamConn { - // TODO: is it possible? - mgr.logger.Warn("no stream connection") - - for pid := range mgr.peers { - _ = net.ClosePeer(pid) - } - } - mgr.logger.Debug("peer count is about maximum threshold", "count", len(connectedPeers), "max", mgr.maxConns)