diff --git a/config/config.go b/config/config.go index 600fdf10e..0b7215cf9 100644 --- a/config/config.go +++ b/config/config.go @@ -105,7 +105,7 @@ func SaveTestnetConfig(path string, numValidators int) error { "/ip4/0.0.0.0/tcp/21777", "/ip4/0.0.0.0/udp/21777/quic-v1", "/ip6/::/tcp/21777", "/ip6/::/udp/21777/quic-v1", } - conf.Network.Bootstrap.Addresses = []string{ + conf.Network.BootstrapAddrs = []string{ "/ip4/94.101.184.118/tcp/21777/p2p/12D3KooWCwQZt8UriVXobQHPXPR8m83eceXVoeT6brPNiBHomebc", "/ip4/94.101.184.118/udp/21777/quic-v1/p2p/12D3KooWCwQZt8UriVXobQHPXPR8m83eceXVoeT6brPNiBHomebc", "/ip4/172.104.46.145/tcp/21777/p2p/12D3KooWNYD4bB82YZRXv6oNyYPwc5ozabx2epv75ATV3D8VD3Mq", @@ -119,8 +119,8 @@ func SaveTestnetConfig(path string, numValidators int) error { "/ip6/2001:bc8:700:8017::1/tcp/21777/p2p/12D3KooWDF8a4goNCHriP1y922y4jagaPwHdX4eSrG5WtQpjzS6k", "/ip6/2001:bc8:700:8017::1/udp/21777/quic-v1/p2p/12D3KooWDF8a4goNCHriP1y922y4jagaPwHdX4eSrG5WtQpjzS6k", } - conf.Network.Bootstrap.MinThreshold = 4 - conf.Network.Bootstrap.MaxThreshold = 8 + conf.Network.MinConns = 8 + conf.Network.MaxConns = 16 conf.Network.EnableRelay = true conf.Network.RelayAddrs = []string{ "/ip4/139.162.153.10/tcp/4002/p2p/12D3KooWNR79jqHVVNhNVrqnDbxbJJze4VjbEsBjZhz6mkvinHAN", @@ -148,10 +148,11 @@ func SaveLocalnetConfig(path string, numValidators int) error { conf := DefaultConfig() conf.Node.NumValidators = numValidators conf.Network.Listens = []string{} + conf.Network.EnableRelay = false conf.Network.EnableNAT = false - conf.Network.Bootstrap.Addresses = []string{} - conf.Network.Bootstrap.MinThreshold = 4 - conf.Network.Bootstrap.MaxThreshold = 8 + conf.Network.BootstrapAddrs = []string{} + conf.Network.MinConns = 0 + conf.Network.MaxConns = 0 conf.GRPC.Enable = true conf.GRPC.Listen = "[::]:0" conf.GRPC.Gateway.Enable = true diff --git a/config/example_config.toml b/config/example_config.toml index 7991ed5a6..a474da16f 100644 --- a/config/example_config.toml +++ b/config/example_config.toml @@ -21,11 +21,27 @@ # `network` contains configuration options for the network module, which manages communication between nodes. [network] + # `network_key` specifies the private key filename to use for node authentication and encryption in the p2p protocol. + ## network_key = "network_key" + # `listens` specifies the addresses and ports where the node will listen for incoming connections from other nodes. ## listens = ["/ip4/0.0.0.0/tcp/21888", "/ip6/::/tcp/21888", "/ip4/0.0.0.0/udp/21888/quic-v1", "/ip6/::/udp/21888/quic-v1"] - # `network_key` specifies the private key filename to use for node authentication and encryption in the p2p protocol. - ## network_key = "network_key" + # `relay_addresses` provides the necessary relay addresses. These should be specified if 'enable_relay' is 'true'. + # Note: this parameter will be ignored if 'enable_relay' is 'false'. + ## relay_addresses = [] + + # `bootstrap_addresses` is a list of peer addresses needed for peer discovery. + # These addresses are used by the Pactus node to discover and connect to other peers on the network. + ## bootstrap_addresses = ["/ip4/172.104.46.145/tcp/21777/p2p/12D3KooWNYD4bB82YZRXv6oNyYPwc5ozabx2epv75ATV3D8VD3Mq"] + + # `min_connections` is the minimum number of connections that the Pactus node should maintain. + # Default is 8 + ## min_connections = 8 + + # `max_connections` is the maximum number of connections that the Pactus node should maintain. + # Default is 16 + ## max_connections = 16 # `enable_nat` indicates whether NAT service should be enabled or not. # NAT service allows many machines to share a single public address. @@ -34,13 +50,9 @@ # `enable_relay` indicates whether relay service should be enabled or not. # Relay service is a transport protocol that routes traffic between two peers over a third-party “relay” peer. - # Default is true. + # Default is false. ## enable_relay = false - # `relay_addresses` provides the necessary relay addresses. These should be specified if 'enable_relay' is 'true'. - # Note: this parameter will be ignored if 'enable_relay' is 'false'. - ## relay_addresses = [] - # `enable_mdns` indicates whether MDNS should be enabled or not. # MDNS is a protocol to discover local peers quickly and efficiently. # Default is false. @@ -54,25 +66,6 @@ # Default is false. ## bootstrapper = false - # `network.bootstrap` contains configuration for bootstrapping the node. - [network.bootstrap] - - # `addresses` is a list of peer addresses needed for peer discovery. - # These addresses are used by the Pactus node to discover and connect to other peers on the network. - ## addresses = ["/ip4/172.104.46.145/tcp/21777/p2p/12D3KooWNYD4bB82YZRXv6oNyYPwc5ozabx2epv75ATV3D8VD3Mq"] - - # `min_threshold` is the minimum number of connections that the Pactus node should maintain. - # Default is 8 - ## min_threshold = 8 - - # `max_threshold` is the maximum number of connections that the Pactus node should maintain. - # Default is 16 - ## max_threshold = 16 - - # `period` periodically checks to see if the threshold is maintained. - # Default is 1 minute - ## period = "1m0s" - # `sync` contains configuration of sync module. [sync] diff --git a/network/address.go b/network/address.go deleted file mode 100644 index fa71ce0de..000000000 --- a/network/address.go +++ /dev/null @@ -1,25 +0,0 @@ -package network - -import ( - lp2ppeer "github.com/libp2p/go-libp2p/core/peer" - "github.com/multiformats/go-multiaddr" -) - -// PeerAddrsToAddrInfo converts a slice of string peer addresses -// to AddrInfo. -func PeerAddrsToAddrInfo(addrs []string) ([]lp2ppeer.AddrInfo, error) { - pis := make([]lp2ppeer.AddrInfo, 0, len(addrs)) - for _, addr := range addrs { - a, err := multiaddr.NewMultiaddr(addr) - if err != nil { - return nil, err - } - - pinfo, err := lp2ppeer.AddrInfoFromP2pAddr(a) - if err != nil { - return nil, err - } - pis = append(pis, *pinfo) - } - return pis, nil -} diff --git a/network/bootstrap.go b/network/bootstrap.go deleted file mode 100644 index 84e516b28..000000000 --- a/network/bootstrap.go +++ /dev/null @@ -1,152 +0,0 @@ -package network - -import ( - "context" - "time" - - lp2pdht "github.com/libp2p/go-libp2p-kad-dht" - lp2phost "github.com/libp2p/go-libp2p/core/host" - lp2pnet "github.com/libp2p/go-libp2p/core/network" - lp2ppeer "github.com/libp2p/go-libp2p/core/peer" - lp2prouting "github.com/libp2p/go-libp2p/core/routing" - "github.com/pactus-project/pactus/util/logger" -) - -// bootstrap attempts to keep the p2p host connected to the network -// by keeping a minimum threshold of connections. If the threshold isn't met it -// connects to a random subset of the bootstrap peers. It does not use peer routing -// to discover new peers. To stop a bootstrap cancel the context passed in Start() -// or call Stop(). -type bootstrap struct { - ctx context.Context - config *BootstrapConfig - - bootstrapPeers []lp2ppeer.AddrInfo - - // Dependencies - host lp2phost.Host - dialer lp2pnet.Dialer - routing lp2prouting.Routing - - logger *logger.SubLogger -} - -// newBootstrap returns a new Bootstrap that will attempt to keep connected -// to the network by connecting to the given bootstrap peers. -func newBootstrap(ctx context.Context, h lp2phost.Host, d lp2pnet.Dialer, r lp2prouting.Routing, - conf *BootstrapConfig, logger *logger.SubLogger, -) *bootstrap { - b := &bootstrap{ - ctx: ctx, - config: conf, - host: h, - dialer: d, - routing: r, - logger: logger, - } - - addresses, err := PeerAddrsToAddrInfo(conf.Addresses) - if err != nil { - b.logger.Panic("couldn't parse bootstrap addresses", "error", err, "addresses", conf.Addresses) - } - b.bootstrapPeers = addresses - - return b -} - -// Start starts the Bootstrap bootstrapping. Cancel `ctx` or call Stop() to stop it. -func (b *bootstrap) Start() { - // Protecting bootstrap peers - for _, a := range b.bootstrapPeers { - b.host.ConnManager().Protect(a.ID, "bootstrap") - } - - b.checkConnectivity() - - go func() { - ticker := time.NewTicker(b.config.Period) - defer ticker.Stop() - - for { - select { - case <-b.ctx.Done(): - return - case <-ticker.C: - b.checkConnectivity() - } - } - }() -} - -// Stop stops the Bootstrap. -func (b *bootstrap) Stop() { -} - -// checkConnectivity does the actual work. If the number of connected peers -// has fallen below b.MinPeerThreshold it will attempt to connect to -// a random subset of its bootstrap peers. -func (b *bootstrap) checkConnectivity() { - currentPeers := b.dialer.Peers() - b.logger.Debug("check connectivity", "peers", len(currentPeers)) - - // Let's check if some peers are disconnected - var connectedPeers []lp2ppeer.ID - for _, p := range currentPeers { - connectedness := b.dialer.Connectedness(p) - if connectedness == lp2pnet.Connected { - connectedPeers = append(connectedPeers, p) - } else { - b.logger.Warn("peer is not connected to us", "peer", p) - } - } - - if len(connectedPeers) > b.config.MaxThreshold { - b.logger.Debug("peer count is about maximum threshold", - "count", len(connectedPeers), - "threshold", b.config.MaxThreshold) - return - } - - if len(connectedPeers) < b.config.MinThreshold { - b.logger.Debug("peer count is less than minimum threshold", - "count", len(connectedPeers), - "threshold", b.config.MinThreshold) - - for _, pi := range b.bootstrapPeers { - b.logger.Debug("try connecting to a bootstrap peer", "peer", pi.String()) - - // Don't try to connect to an already connected peer. - if hasPID(connectedPeers, pi.ID) { - b.logger.Trace("already connected", "peer", pi.String()) - continue - } - - ConnectAsync(b.ctx, b.host, pi, b.logger) - } - - b.logger.Debug("expanding the connections") - b.expand() - } -} - -func hasPID(pids []lp2ppeer.ID, pid lp2ppeer.ID) bool { - for _, p := range pids { - if p == pid { - return true - } - } - return false -} - -func (b *bootstrap) expand() { - dht, ok := b.routing.(*lp2pdht.IpfsDHT) - if !ok { - b.logger.Warn("no bootstrapping to do exit quietly.") - return - } - - err := dht.Bootstrap(b.ctx) - if err != nil { - b.logger.Warn("peer discovery may suffer", "error", err) - } -} diff --git a/network/config.go b/network/config.go index 0f83091a5..efa2400f0 100644 --- a/network/config.go +++ b/network/config.go @@ -2,30 +2,23 @@ package network import ( "fmt" - "time" "github.com/multiformats/go-multiaddr" "github.com/pactus-project/pactus/util/errors" ) type Config struct { - Listens []string `toml:"listens"` - NetworkKey string `toml:"network_key"` - EnableNAT bool `toml:"enable_nat"` - EnableRelay bool `toml:"enable_relay"` - RelayAddrs []string `toml:"relay_addresses"` - EnableMdns bool `toml:"enable_mdns"` - EnableMetrics bool `toml:"enable_metrics"` - Bootstrapper bool `toml:"bootstrapper"` - Bootstrap *BootstrapConfig `toml:"bootstrap"` -} - -// BootstrapConfig holds all configuration options related to bootstrap nodes. -type BootstrapConfig struct { - Addresses []string `toml:"addresses"` - MinThreshold int `toml:"min_threshold"` - MaxThreshold int `toml:"max_threshold"` - Period time.Duration `toml:"period"` + NetworkKey string `toml:"network_key"` + Listens []string `toml:"listens"` + RelayAddrs []string `toml:"relay_addresses"` + BootstrapAddrs []string `toml:"bootstrap_addresses"` + MinConns int `toml:"min_connections"` + MaxConns int `toml:"max_connections"` + EnableNAT bool `toml:"enable_nat"` + EnableRelay bool `toml:"enable_relay"` + EnableMdns bool `toml:"enable_mdns"` + EnableMetrics bool `toml:"enable_metrics"` + Bootstrapper bool `toml:"bootstrapper"` } func DefaultConfig() *Config { @@ -41,29 +34,27 @@ func DefaultConfig() *Config { }, } - addresses := []string{} + bootstrapAddrs := []string{} for _, n := range nodes { - addresses = append(addresses, + bootstrapAddrs = append(bootstrapAddrs, fmt.Sprintf("/ip4/%s/tcp/%s/p2p/%s", n.ip, n.port, n.id)) } return &Config{ + NetworkKey: "network_key", Listens: []string{ "/ip4/0.0.0.0/tcp/21888", "/ip6/::/tcp/21888", "/ip4/0.0.0.0/udp/21888/quic-v1", "/ip6/::/udp/21888/quic-v1", }, - NetworkKey: "network_key", - EnableNAT: true, - EnableRelay: false, - EnableMdns: false, - EnableMetrics: false, - Bootstrapper: false, - Bootstrap: &BootstrapConfig{ - Addresses: addresses, - MinThreshold: 8, - MaxThreshold: 16, - Period: 1 * time.Minute, - }, + RelayAddrs: []string{}, + BootstrapAddrs: bootstrapAddrs, + MinConns: 8, + MaxConns: 16, + EnableNAT: true, + EnableRelay: false, + EnableMdns: false, + EnableMetrics: false, + Bootstrapper: false, } } @@ -87,5 +78,8 @@ func (conf *Config) BasicCheck() error { if err := validateAddresses(conf.RelayAddrs); err != nil { return err } - return validateAddresses(conf.Listens) + if err := validateAddresses(conf.Listens); err != nil { + return err + } + return validateAddresses(conf.BootstrapAddrs) } diff --git a/network/config_test.go b/network/config_test.go index 947578af3..c230a810f 100644 --- a/network/config_test.go +++ b/network/config_test.go @@ -8,28 +8,38 @@ import ( func TestDefaultConfigCheck(t *testing.T) { conf := DefaultConfig() - conf.EnableRelay = true assert.Error(t, conf.BasicCheck()) + conf = DefaultConfig() conf.Listens = []string{""} assert.Error(t, conf.BasicCheck()) + conf = DefaultConfig() conf.Listens = []string{"127.0.0.1"} assert.Error(t, conf.BasicCheck()) + conf = DefaultConfig() conf.Listens = []string{"/ip4"} assert.Error(t, conf.BasicCheck()) + conf = DefaultConfig() conf.RelayAddrs = []string{"/ip4"} assert.Error(t, conf.BasicCheck()) - conf.RelayAddrs = []string{} - conf.Listens = []string{} + conf = DefaultConfig() + conf.BootstrapAddrs = []string{"/ip4"} + assert.Error(t, conf.BasicCheck()) + conf = DefaultConfig() conf.RelayAddrs = []string{"/ip4/127.0.0.1"} assert.NoError(t, conf.BasicCheck()) + conf = DefaultConfig() conf.Listens = []string{"/ip4/127.0.0.1"} assert.NoError(t, conf.BasicCheck()) + + conf = DefaultConfig() + conf.BootstrapAddrs = []string{"/ip4/127.0.0.1"} + assert.NoError(t, conf.BasicCheck()) } diff --git a/network/dht.go b/network/dht.go index 463b93729..81cb43862 100644 --- a/network/dht.go +++ b/network/dht.go @@ -10,25 +10,27 @@ import ( ) type dhtService struct { - ctx context.Context - host lp2phost.Host - kademlia *lp2pdht.IpfsDHT - bootstrap *bootstrap - logger *logger.SubLogger + ctx context.Context + host lp2phost.Host + kademlia *lp2pdht.IpfsDHT + peerMgr *peerMgr + logger *logger.SubLogger } func newDHTService(ctx context.Context, host lp2phost.Host, protocolID lp2pcore.ProtocolID, - conf *BootstrapConfig, bootstrapper bool, logger *logger.SubLogger, + conf *Config, logger *logger.SubLogger, ) *dhtService { mode := lp2pdht.ModeAuto - if bootstrapper { + if conf.Bootstrapper { mode = lp2pdht.ModeServer } + bootsrapAddrs := PeerAddrsToAddrInfo(conf.BootstrapAddrs) opts := []lp2pdht.Option{ lp2pdht.Mode(mode), lp2pdht.ProtocolPrefix(protocolID), lp2pdht.DisableProviders(), lp2pdht.DisableValues(), + lp2pdht.BootstrapPeers(bootsrapAddrs...), } kademlia, err := lp2pdht.New(ctx, host, opts...) @@ -42,21 +44,20 @@ func newDHTService(ctx context.Context, host lp2phost.Host, protocolID lp2pcore. panic(err.Error()) } - bootstrap := newBootstrap(ctx, - host, host.Network(), kademlia, - conf, logger) + bootstrap := newPeerMgr(ctx, host, host.Network(), kademlia, + bootsrapAddrs, conf.MinConns, conf.MaxConns, logger) return &dhtService{ - ctx: ctx, - host: host, - kademlia: kademlia, - bootstrap: bootstrap, - logger: logger, + ctx: ctx, + host: host, + kademlia: kademlia, + peerMgr: bootstrap, + logger: logger, } } func (dht *dhtService) Start() error { - dht.bootstrap.Start() + dht.peerMgr.Start() return nil } @@ -65,5 +66,5 @@ func (dht *dhtService) Stop() { dht.logger.Error("unable to close Kademlia", "error", err) } - dht.bootstrap.Stop() + dht.peerMgr.Stop() } diff --git a/network/network.go b/network/network.go index 916e60379..5a6ff315a 100644 --- a/network/network.go +++ b/network/network.go @@ -4,6 +4,7 @@ import ( "context" "encoding/hex" "fmt" + "time" lp2p "github.com/libp2p/go-libp2p" lp2pps "github.com/libp2p/go-libp2p-pubsub" @@ -12,6 +13,7 @@ import ( lp2phost "github.com/libp2p/go-libp2p/core/host" lp2ppeer "github.com/libp2p/go-libp2p/core/peer" lp2prcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" + lp2pconnmgr "github.com/libp2p/go-libp2p/p2p/net/connmgr" ma "github.com/multiformats/go-multiaddr" "github.com/pactus-project/pactus/util" "github.com/pactus-project/pactus/util/logger" @@ -101,11 +103,21 @@ func newNetwork(networkName string, conf *Config, opts []lp2p.Option) (*network, return nil, LibP2PError{Err: err} } + connMgr, err := lp2pconnmgr.NewConnManager( + conf.MinConns, // Low Watermark + conf.MaxConns, // High Watermark + lp2pconnmgr.WithGracePeriod(time.Minute), + ) + if err != nil { + return nil, LibP2PError{Err: err} + } + opts = append(opts, lp2p.Identity(networkKey), lp2p.ListenAddrStrings(conf.Listens...), lp2p.UserAgent(version.Agent()), lp2p.ResourceManager(rmgr), + lp2p.ConnectionManager(connMgr), ) if !conf.EnableMetrics { @@ -169,7 +181,7 @@ func newNetwork(networkName string, conf *Config, opts []lp2p.Option) (*network, kadProtocolID := lp2pcore.ProtocolID(fmt.Sprintf("/%s/gossip/v1", n.name)) streamProtocolID := lp2pcore.ProtocolID(fmt.Sprintf("/%s/stream/v1", n.name)) - n.dht = newDHTService(n.ctx, n.host, kadProtocolID, conf.Bootstrap, conf.Bootstrapper, n.logger) + n.dht = newDHTService(n.ctx, n.host, kadProtocolID, conf, n.logger) n.stream = newStreamService(ctx, n.host, streamProtocolID, relayAddrs, n.eventChannel, n.logger) n.gossip = newGossipService(ctx, n.host, n.eventChannel, n.logger) n.notifee = newNotifeeService(n.host, n.eventChannel, n.logger, streamProtocolID) @@ -197,12 +209,11 @@ func (n *network) Start() error { n.gossip.Start() n.stream.Start() - for _, relayAddr := range n.config.RelayAddrs { - addrInfo, err := MakeAddressInfo(relayAddr) - if err != nil { - return err + if n.config.EnableRelay { + for _, relayAddr := range n.config.RelayAddrs { + addrInfo, _ := MakeAddressInfo(relayAddr) + ConnectAsync(n.ctx, n.host, *addrInfo, n.logger) } - ConnectAsync(n.ctx, n.host, *addrInfo, n.logger) } n.logger.Info("network started", "addr", n.host.Addrs()) diff --git a/network/network_test.go b/network/network_test.go index 30be12050..4838c405b 100644 --- a/network/network_test.go +++ b/network/network_test.go @@ -56,17 +56,14 @@ func makeTestNetwork(t *testing.T, conf *Config, opts []lp2p.Option) *network { func testConfig() *Config { return &Config{ - Listens: []string{}, - NetworkKey: util.TempFilePath(), - EnableNAT: false, - EnableRelay: false, - EnableMdns: false, - Bootstrap: &BootstrapConfig{ - Addresses: []string{}, - MinThreshold: 4, - MaxThreshold: 8, - Period: 2 * time.Second, - }, + Listens: []string{}, + NetworkKey: util.TempFilePath(), + BootstrapAddrs: []string{}, + MinConns: 4, + MaxConns: 8, + EnableNAT: false, + EnableRelay: false, + EnableMdns: false, } } @@ -174,7 +171,7 @@ func TestNetwork(t *testing.T) { // Public node confP := testConfig() confP.EnableNAT = true - confP.Bootstrap.Addresses = bootstrapAddresses + confP.BootstrapAddrs = bootstrapAddresses confP.Listens = []string{ "/ip4/127.0.0.1/tcp/0", "/ip6/::1/tcp/0", @@ -189,7 +186,7 @@ func TestNetwork(t *testing.T) { confM := testConfig() confM.EnableRelay = true confM.RelayAddrs = relayAddrs - confM.Bootstrap.Addresses = bootstrapAddresses + confM.BootstrapAddrs = bootstrapAddresses confM.Listens = []string{ "/ip4/127.0.0.1/tcp/0", "/ip6/::1/tcp/0", @@ -204,7 +201,7 @@ func TestNetwork(t *testing.T) { confN := testConfig() confN.EnableRelay = true confN.RelayAddrs = relayAddrs - confN.Bootstrap.Addresses = bootstrapAddresses + confN.BootstrapAddrs = bootstrapAddresses confN.Listens = []string{ "/ip4/127.0.0.1/tcp/0", "/ip6/::1/tcp/0", @@ -218,7 +215,7 @@ func TestNetwork(t *testing.T) { // Private node X, doesn't join consensus topic confX := testConfig() confX.EnableRelay = false - confX.Bootstrap.Addresses = bootstrapAddresses + confX.BootstrapAddrs = bootstrapAddresses confX.Listens = []string{ "/ip4/127.0.0.1/tcp/0", "/ip6/::1/tcp/0", @@ -362,7 +359,7 @@ func TestConnections(t *testing.T) { // Public node confP := testConfig() - confP.Bootstrap.Addresses = []string{ + confP.BootstrapAddrs = []string{ fmt.Sprintf("%s/p2p/%v", bootstrapAddr, networkB.SelfID().String()), } confP.Listens = []string{test.peerAddr} diff --git a/network/peermgr.go b/network/peermgr.go new file mode 100644 index 000000000..e1836161d --- /dev/null +++ b/network/peermgr.go @@ -0,0 +1,130 @@ +package network + +import ( + "context" + "time" + + lp2pdht "github.com/libp2p/go-libp2p-kad-dht" + lp2phost "github.com/libp2p/go-libp2p/core/host" + lp2pnet "github.com/libp2p/go-libp2p/core/network" + lp2ppeer "github.com/libp2p/go-libp2p/core/peer" + lp2pswarm "github.com/libp2p/go-libp2p/p2p/net/swarm" + "github.com/pactus-project/pactus/util/logger" +) + +// peerMgr attempts to keep the p2p host connected to the network +// by keeping a minimum threshold of connections. If the threshold isn't met it +// connects to a random subset of the peerMgr peers. It does not use peer routing +// to discover new peers. To stop a peerMgr cancel the context passed in Start() +// or call Stop(). +type peerMgr struct { + ctx context.Context + bootstrapAddrs []lp2ppeer.AddrInfo + minConns int + maxConns int + + // Dependencies + host lp2phost.Host + dialer lp2pnet.Dialer + dht *lp2pdht.IpfsDHT + + logger *logger.SubLogger +} + +// newPeerMgr creates a new Peer Manager instance. +// Peer Manager attempts to establish connections with other nodes when the +// number of connections falls below the minimum threshold. +func newPeerMgr(ctx context.Context, h lp2phost.Host, d lp2pnet.Dialer, dht *lp2pdht.IpfsDHT, + bootstrapAddrs []lp2ppeer.AddrInfo, minConns int, maxConns int, logger *logger.SubLogger, +) *peerMgr { + b := &peerMgr{ + ctx: ctx, + bootstrapAddrs: bootstrapAddrs, + minConns: minConns, + maxConns: maxConns, + host: h, + dialer: d, + dht: dht, + logger: logger, + } + + return b +} + +// Start starts the Peer Manager. +func (mgr *peerMgr) Start() { + mgr.checkConnectivity() + + go func() { + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-mgr.ctx.Done(): + return + case <-ticker.C: + mgr.checkConnectivity() + } + } + }() +} + +// Stop stops the Bootstrap. +func (mgr *peerMgr) Stop() { + // TODO: complete me +} + +// checkConnectivity performs the actual work of maintaining connections. +// It ensures that the number of connections stays within the minimum and maximum thresholds. +func (mgr *peerMgr) checkConnectivity() { + currentPeers := mgr.dialer.Peers() + mgr.logger.Debug("check connectivity", "peers", len(currentPeers)) + + // Let's check if some peers are disconnected + var connectedPeers []lp2ppeer.ID + for _, p := range currentPeers { + connectedness := mgr.dialer.Connectedness(p) + if connectedness == lp2pnet.Connected { + connectedPeers = append(connectedPeers, p) + } else { + mgr.logger.Warn("peer is not connected to us", "peer", p) + } + } + + if len(connectedPeers) > mgr.maxConns { + mgr.logger.Debug("peer count is about maximum threshold", + "count", len(connectedPeers), + "max", mgr.maxConns) + return + } + + if len(connectedPeers) < mgr.minConns { + mgr.logger.Debug("peer count is less than minimum threshold", + "count", len(connectedPeers), + "min", mgr.minConns) + + for _, pi := range mgr.bootstrapAddrs { + mgr.logger.Debug("try connecting to a bootstrap peer", "peer", pi.String()) + + // Don't try to connect to an already connected peer. + if HasPID(connectedPeers, pi.ID) { + mgr.logger.Trace("already connected", "peer", pi.String()) + continue + } + + if swarm, ok := mgr.host.Network().(*lp2pswarm.Swarm); ok { + swarm.Backoff().Clear(pi.ID) + } + + ConnectAsync(mgr.ctx, mgr.host, pi, mgr.logger) + } + + mgr.logger.Debug("expanding the connections") + + err := mgr.dht.Bootstrap(mgr.ctx) + if err != nil { + mgr.logger.Warn("peer discovery may suffer", "error", err) + } + } +} diff --git a/network/utils.go b/network/utils.go index ff8023dbe..6df001764 100644 --- a/network/utils.go +++ b/network/utils.go @@ -7,22 +7,45 @@ import ( lp2phost "github.com/libp2p/go-libp2p/core/host" lp2pnetwork "github.com/libp2p/go-libp2p/core/network" lp2ppeer "github.com/libp2p/go-libp2p/core/peer" - ma "github.com/multiformats/go-multiaddr" + "github.com/multiformats/go-multiaddr" "github.com/pactus-project/pactus/util/logger" ) +// PeerAddrsToAddrInfo converts a slice of string peer addresses +// to AddrInfo. +func PeerAddrsToAddrInfo(addrs []string) []lp2ppeer.AddrInfo { + pis := make([]lp2ppeer.AddrInfo, 0, len(addrs)) + for _, addr := range addrs { + pinfo, _ := MakeAddressInfo(addr) + if pinfo != nil { + pis = append(pis, *pinfo) + } + } + return pis +} + // MakeAddressInfo from Multi-address string. func MakeAddressInfo(addr string) (*lp2ppeer.AddrInfo, error) { - maddr, err := ma.NewMultiaddr(addr) + maddr, err := multiaddr.NewMultiaddr(addr) if err != nil { return nil, err } return lp2ppeer.AddrInfoFromP2pAddr(maddr) } +// HasPID checks if a peer ID exists in a list of peer IDs. +func HasPID(pids []lp2ppeer.ID, pid lp2ppeer.ID) bool { + for _, p := range pids { + if p == pid { + return true + } + } + return false +} + func ConnectAsync(ctx context.Context, h lp2phost.Host, addrInfo lp2ppeer.AddrInfo, logger *logger.SubLogger) { go func() { - err := h.Connect(lp2pnetwork.WithDialPeerTimeout(ctx, 10*time.Second), addrInfo) + err := h.Connect(lp2pnetwork.WithDialPeerTimeout(ctx, 30*time.Second), addrInfo) if err != nil { if logger != nil { logger.Warn("connection failed", "addr", addrInfo.Addrs, "err", err) diff --git a/state/state.go b/state/state.go index c766d68dd..56c58f436 100644 --- a/state/state.go +++ b/state/state.go @@ -521,7 +521,7 @@ func (st *state) commitSandbox(sb sandbox.Sandbox, round int16) { joiningCommittee := make([]*validator.Validator, 0) sb.IterateValidators(func(val *validator.Validator, _ bool, joined bool) { if joined { - st.logger.Info("new validator joined", "address", val.Address(), "power", val.Power()) + st.logger.Debug("new validator joined", "address", val.Address(), "power", val.Power()) joiningCommittee = append(joiningCommittee, val) } diff --git a/tests/main_test.go b/tests/main_test.go index e1d5ba292..b8ac5b9e6 100644 --- a/tests/main_test.go +++ b/tests/main_test.go @@ -85,9 +85,8 @@ func TestMain(m *testing.M) { tConfigs[i].Network.Bootstrapper = true tConfigs[i].Network.NetworkKey = util.TempFilePath() tConfigs[i].Network.Listens = []string{"/ip4/127.0.0.1/tcp/0", "/ip4/127.0.0.1/udp/0/quic-v1"} - tConfigs[i].Network.Bootstrap.Addresses = []string{} - tConfigs[i].Network.Bootstrap.Period = 10 * time.Second - tConfigs[i].Network.Bootstrap.MinThreshold = 3 + tConfigs[i].Network.BootstrapAddrs = []string{} + tConfigs[i].Network.MinConns = 3 tConfigs[i].HTTP.Enable = false tConfigs[i].GRPC.Enable = false