Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(network): check connection threshold on gater #803

Merged
merged 7 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ func SaveTestnetConfig(path string, numValidators int) error {
"/dns/pactus.nodesync.top/tcp/21777/p2p/12D3KooWP25ejVsd7cL5DvWAPwEu4JTUwnPniHBf4w93tgSezVt8", // NodeSync.Top ([email protected])
"/ip4/95.217.89.202/tcp/21777/p2p/12D3KooWMsi5oYkbbpyyXctmPXzF8UZu2pCvKPRZGyvymhN9BzTD", // CodeBlockLabs ([email protected])
}
conf.Network.MinConns = 8
conf.Network.MaxConns = 16
conf.Network.MinConns = 16
conf.Network.MaxConns = 32
conf.Network.EnableNAT = false
conf.Network.EnableRelay = false
conf.Network.RelayAddrStrings = []string{
Expand Down
8 changes: 4 additions & 4 deletions config/example_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@
## bootstrap_addrs = ["/ip4/172.104.46.145/tcp/21777/p2p/12D3KooWNYD4bB82YZRXv6oNyYPwc5ozabx2epv75ATV3D8VD3Mq"]

# `min_connections` is the minimum number of connections that the Pactus node should maintain.
# Default is 8
## min_connections = 8
# Default is 16
## min_connections = 16

# `max_connections` is the maximum number of connections that the Pactus node should maintain.
# Default is 16
## max_connections = 16
# Default is 32
## max_connections = 32

# `enable_nat` indicates whether NAT service should be enabled or not.
# NAT service allows many machines to share a single public address.
Expand Down
4 changes: 2 additions & 2 deletions network/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ func DefaultConfig() *Config {
},
RelayAddrStrings: []string{},
BootstrapAddrStrings: bootstrapAddrs,
MinConns: 8,
MaxConns: 16,
MinConns: 16,
MaxConns: 32,
EnableNAT: false,
EnableRelay: false,
EnableMdns: false,
Expand Down
106 changes: 66 additions & 40 deletions network/gater.go
Original file line number Diff line number Diff line change
@@ -1,82 +1,108 @@
package network

import (
"sync"

lp2pconnmgr "github.com/libp2p/go-libp2p/core/connmgr"
lp2pcontrol "github.com/libp2p/go-libp2p/core/control"
lp2pnetwork "github.com/libp2p/go-libp2p/core/network"
lp2ppeer "github.com/libp2p/go-libp2p/core/peer"
lp2pconngater "github.com/libp2p/go-libp2p/p2p/net/conngater"
"github.com/multiformats/go-multiaddr"
"github.com/pactus-project/pactus/util/logger"
)

var _ lp2pconnmgr.ConnectionGater = &ConnectionGater{}

type ConnectionGater struct {
*lp2pconngater.BasicConnectionGater
logger *logger.SubLogger
lk sync.RWMutex

filters *multiaddr.Filters
peerMgr *peerMgr
maxConn int
logger *logger.SubLogger
}

func NewConnectionGater(conf *Config, log *logger.SubLogger) (*ConnectionGater, error) {
connGater, err := lp2pconngater.NewBasicConnectionGater(nil)
if err != nil {
return nil, err
}

filters := multiaddr.NewFilters()
if !conf.ForcePrivateNetwork {
privateSubnets := PrivateSubnets()
for _, sn := range privateSubnets {
err := connGater.BlockSubnet(sn)
if err != nil {
return nil, LibP2PError{Err: err}
}
}
filters = SubnetsToFilters(privateSubnets, multiaddr.ActionDeny)
}

return &ConnectionGater{
BasicConnectionGater: connGater,
logger: log,
filters: filters,
maxConn: conf.MaxConns,
logger: log,
}, nil
}

func (g *ConnectionGater) InterceptPeerDial(p lp2ppeer.ID) bool {
allow := g.BasicConnectionGater.InterceptPeerDial(p)
if !allow {
g.logger.Debug("InterceptPeerDial not allowed", "p")
func (g *ConnectionGater) SetPeerManager(peerMgr *peerMgr) {
g.lk.Lock()
defer g.lk.Unlock()

g.peerMgr = peerMgr
}

func (g *ConnectionGater) hasMaxConnections() bool {
if g.peerMgr == nil {
return false
}

return allow
return g.peerMgr.NumOfConnected() > g.maxConn
}

func (g *ConnectionGater) InterceptAddrDial(p lp2ppeer.ID, ma multiaddr.Multiaddr) bool {
allow := g.BasicConnectionGater.InterceptAddrDial(p, ma)
if !allow {
g.logger.Debug("InterceptAddrDial not allowed", "p", p, "ma", ma.String())
func (g *ConnectionGater) InterceptPeerDial(pid lp2ppeer.ID) bool {
g.lk.RLock()
defer g.lk.RUnlock()

if g.hasMaxConnections() {
g.logger.Debug("InterceptPeerDial rejected: many connections", "pid", pid)
return false
}

return allow
return true
}

func (g *ConnectionGater) InterceptAccept(cma lp2pnetwork.ConnMultiaddrs) bool {
allow := g.BasicConnectionGater.InterceptAccept(cma)
if !allow {
g.logger.Debug("InterceptAccept not allowed")
func (g *ConnectionGater) InterceptAddrDial(pid lp2ppeer.ID, ma multiaddr.Multiaddr) bool {
g.lk.RLock()
defer g.lk.RUnlock()

if g.hasMaxConnections() {
g.logger.Debug("InterceptAddrDial rejected: many connections", "pid", pid, "ma", ma.String())
return false
}

deny := g.filters.AddrBlocked(ma)
if deny {
g.logger.Debug("InterceptAddrDial rejected", "pid", pid, "ma", ma.String())
return false
}

return allow
return true
}

func (g *ConnectionGater) InterceptSecured(dir lp2pnetwork.Direction, p lp2ppeer.ID,
cma lp2pnetwork.ConnMultiaddrs,
) bool {
allow := g.BasicConnectionGater.InterceptSecured(dir, p, cma)
if !allow {
g.logger.Debug("InterceptSecured not allowed", "p", p)
func (g *ConnectionGater) InterceptAccept(cma lp2pnetwork.ConnMultiaddrs) bool {
g.lk.RLock()
defer g.lk.RUnlock()

if g.hasMaxConnections() {
g.logger.Debug("InterceptAccept rejected: many connections")
return false
}

return allow
deny := g.filters.AddrBlocked(cma.RemoteMultiaddr())
if deny {
g.logger.Debug("InterceptAccept rejected")
return false
}

return true
}

func (g *ConnectionGater) InterceptSecured(_ lp2pnetwork.Direction, _ lp2ppeer.ID, _ lp2pnetwork.ConnMultiaddrs) bool {
return true
}

func (g *ConnectionGater) InterceptUpgraded(con lp2pnetwork.Conn) (bool, lp2pcontrol.DisconnectReason) {
return g.BasicConnectionGater.InterceptUpgraded(con)
func (g *ConnectionGater) InterceptUpgraded(_ lp2pnetwork.Conn) (bool, lp2pcontrol.DisconnectReason) {
return true, 0
}
82 changes: 82 additions & 0 deletions network/gater_test.go
Original file line number Diff line number Diff line change
@@ -1 +1,83 @@
package network

import (
"testing"

lp2pnet "github.com/libp2p/go-libp2p/core/network"
"github.com/multiformats/go-multiaddr"
"github.com/pactus-project/pactus/util/testsuite"
"github.com/stretchr/testify/assert"
)

type mockConnMultiaddrs struct {
remote multiaddr.Multiaddr
}

func (cma *mockConnMultiaddrs) LocalMultiaddr() multiaddr.Multiaddr {
return nil
}

func (cma *mockConnMultiaddrs) RemoteMultiaddr() multiaddr.Multiaddr {
return cma.remote
}

func TestAllowedConnections(t *testing.T) {
ts := testsuite.NewTestSuite(t)
conf := testConfig()
net := makeTestNetwork(t, conf, nil)

maPrivate := multiaddr.StringCast("/ip4/127.0.0.1/tcp/1234")
maPublic := multiaddr.StringCast("/ip4/8.8.8.8/tcp/1234")
cmaPrivate := &mockConnMultiaddrs{remote: maPrivate}
cmaPublic := &mockConnMultiaddrs{remote: maPublic}
pid := ts.RandPeerID()

assert.True(t, net.connGater.InterceptPeerDial(pid))
assert.True(t, net.connGater.InterceptAddrDial(pid, maPrivate))
assert.True(t, net.connGater.InterceptAddrDial(pid, maPublic))
assert.True(t, net.connGater.InterceptAccept(cmaPrivate))
assert.True(t, net.connGater.InterceptAccept(cmaPublic))
}

func TestDenyPrivate(t *testing.T) {
ts := testsuite.NewTestSuite(t)
conf := testConfig()
conf.ForcePrivateNetwork = false
net := makeTestNetwork(t, conf, nil)

maPrivate := multiaddr.StringCast("/ip4/127.0.0.1/tcp/1234")
maPublic := multiaddr.StringCast("/ip4/8.8.8.8/tcp/1234")
cmaPrivate := &mockConnMultiaddrs{remote: maPrivate}
cmaPublic := &mockConnMultiaddrs{remote: maPublic}
pid := ts.RandPeerID()

assert.True(t, net.connGater.InterceptPeerDial(pid))
assert.False(t, net.connGater.InterceptAddrDial(pid, maPrivate))
assert.True(t, net.connGater.InterceptAddrDial(pid, maPublic))
assert.False(t, net.connGater.InterceptAccept(cmaPrivate))
assert.True(t, net.connGater.InterceptAccept(cmaPublic))
}

func TestMaxConnection(t *testing.T) {
ts := testsuite.NewTestSuite(t)
conf := testConfig()
conf.MaxConns = 1
net := makeTestNetwork(t, conf, nil)

maPrivate := multiaddr.StringCast("/ip4/127.0.0.1/tcp/1234")
maPublic := multiaddr.StringCast("/ip4/8.8.8.8/tcp/1234")
cmaPrivate := &mockConnMultiaddrs{remote: maPrivate}
cmaPublic := &mockConnMultiaddrs{remote: maPublic}
pid := ts.RandPeerID()

net.peerMgr.AddPeer(ts.RandPeerID(),
multiaddr.StringCast("/ip4/2.2.2.2/tcp/1234"), lp2pnet.DirInbound, nil)
net.peerMgr.AddPeer(ts.RandPeerID(),
multiaddr.StringCast("/ip4/3.3.3.3/tcp/1234"), lp2pnet.DirInbound, nil)

assert.False(t, net.connGater.InterceptPeerDial(pid))
assert.False(t, net.connGater.InterceptAddrDial(pid, maPrivate))
assert.False(t, net.connGater.InterceptAddrDial(pid, maPublic))
assert.False(t, net.connGater.InterceptAccept(cmaPrivate))
assert.False(t, net.connGater.InterceptAccept(cmaPublic))
}
6 changes: 4 additions & 2 deletions network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,13 @@ func newNetwork(networkName string, conf *Config, log *logger.SubLogger, opts []
n.mdns = newMdnsService(ctx, n.host, n.logger)
}
n.dht = newDHTService(n.ctx, n.host, kadProtocolID, conf, n.logger)
n.peerMgr = newPeerMgr(ctx, host, n.dht.kademlia, conf, n.logger)
n.peerMgr = newPeerMgr(ctx, host, n.dht.kademlia, streamProtocolID, conf, n.logger)
n.stream = newStreamService(ctx, n.host, streamProtocolID, n.eventChannel, n.logger)
n.gossip = newGossipService(ctx, n.host, n.eventChannel, conf, n.logger)
n.notifee = newNotifeeService(n.host, n.eventChannel, n.logger, streamProtocolID, conf.Bootstrapper)
n.notifee = newNotifeeService(n.host, n.eventChannel, n.peerMgr, streamProtocolID, conf.Bootstrapper, n.logger)

n.host.Network().Notify(n.notifee)
n.connGater.SetPeerManager(n.peerMgr)

n.logger.Info("network setup", "id", n.host.ID(),
"address", conf.ListenAddrStrings,
Expand Down Expand Up @@ -272,6 +273,7 @@ func (n *network) SelfID() lp2ppeer.ID {
}

func (n *network) SendTo(msg []byte, pid lp2pcore.PeerID) error {
n.logger.Trace("Sending new message", "to", pid)
return n.stream.SendRequest(msg, pid)
}

Expand Down
44 changes: 27 additions & 17 deletions network/notifee.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import (
"time"

lp2pcore "github.com/libp2p/go-libp2p/core"
lp2phost "github.com/libp2p/go-libp2p/core/host"
lp2pnetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
Expand All @@ -16,61 +17,70 @@
eventChannel chan<- Event
logger *logger.SubLogger
protocolID protocol.ID
peerMgr *peerMgr
bootstrapper bool
}

func newNotifeeService(host lp2phost.Host, eventChannel chan<- Event,
log *logger.SubLogger, protocolID protocol.ID, bootstrapper bool,
func newNotifeeService(host lp2phost.Host, eventChannel chan<- Event, peerMgr *peerMgr,
protocolID protocol.ID, bootstrapper bool, log *logger.SubLogger,
) *NotifeeService {
notifee := &NotifeeService{
host: host,
eventChannel: eventChannel,
logger: log,
protocolID: protocolID,
bootstrapper: bootstrapper,
peerMgr: peerMgr,
logger: log,
}
host.Network().Notify(notifee)
return notifee
}

func (n *NotifeeService) Connected(lp2pn lp2pnetwork.Network, conn lp2pnetwork.Conn) {
peerID := conn.RemotePeer()
n.logger.Info("connected to peer", "pid", peerID)
pid := conn.RemotePeer()
n.logger.Info("connected to peer", "pid", pid, "direction", conn.Stat().Direction)

var protocols []lp2pcore.ProtocolID
go func() {
for i := 0; i < 10; i++ {
// TODO: better way?
// Wait to complete libp2p identify
time.Sleep(1 * time.Second)

peerStore := lp2pn.Peerstore()
protocols, _ := peerStore.GetProtocols(peerID)
protocols, _ = peerStore.GetProtocols(pid)
if len(protocols) > 0 {
if slices.Contains(protocols, n.protocolID) {
n.logger.Debug("peer supports the stream protocol",
"pid", peerID, "protocols", protocols)
"pid", pid, "protocols", protocols)

n.eventChannel <- &ConnectEvent{PeerID: peerID}
n.eventChannel <- &ConnectEvent{PeerID: pid}
} else {
n.logger.Debug("peer doesn't support the stream protocol",
"pid", peerID, "protocols", protocols)
"pid", pid, "protocols", protocols)
}
return
break
}
}

n.logger.Info("unable to get supported protocols", "pid", peerID)
if !n.bootstrapper {
// Close this connection since we can't send a direct message to this peer.
_ = n.host.Network().ClosePeer(peerID)
if len(protocols) == 0 {
n.logger.Info("unable to get supported protocols", "pid", pid)
if !n.bootstrapper {

Check warning on line 68 in network/notifee.go

View check run for this annotation

Codecov / codecov/patch

network/notifee.go#L67-L68

Added lines #L67 - L68 were not covered by tests
// Close this connection since we can't send a direct message to this peer.
_ = n.host.Network().ClosePeer(pid)

Check warning on line 70 in network/notifee.go

View check run for this annotation

Codecov / codecov/patch

network/notifee.go#L70

Added line #L70 was not covered by tests
}
}

n.peerMgr.AddPeer(pid, conn.RemoteMultiaddr(), conn.Stat().Direction, protocols)
}()
}

func (n *NotifeeService) Disconnected(_ lp2pnetwork.Network, conn lp2pnetwork.Conn) {
peerID := conn.RemotePeer()
n.logger.Info("disconnected from peer", "pid", peerID)
n.eventChannel <- &DisconnectEvent{PeerID: peerID}
pid := conn.RemotePeer()
n.logger.Info("disconnected from peer", "pid", pid)
n.eventChannel <- &DisconnectEvent{PeerID: pid}

n.peerMgr.RemovePeer(pid)
}

func (n *NotifeeService) Listen(_ lp2pnetwork.Network, ma multiaddr.Multiaddr) {
Expand Down
Loading
Loading