Skip to content

Commit

Permalink
fix(network): check connection threshold on gater
Browse files Browse the repository at this point in the history
  • Loading branch information
b00f committed Nov 6, 2023
1 parent 749116c commit 9a7adcd
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 60 deletions.
38 changes: 33 additions & 5 deletions network/gater.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package network
import (
lp2pconnmgr "github.com/libp2p/go-libp2p/core/connmgr"
lp2pcontrol "github.com/libp2p/go-libp2p/core/control"
lp2phost "github.com/libp2p/go-libp2p/core/host"
lp2pnetwork "github.com/libp2p/go-libp2p/core/network"
lp2ppeer "github.com/libp2p/go-libp2p/core/peer"
lp2pconngater "github.com/libp2p/go-libp2p/p2p/net/conngater"
Expand All @@ -14,7 +15,10 @@ var _ lp2pconnmgr.ConnectionGater = &ConnectionGater{}

type ConnectionGater struct {
*lp2pconngater.BasicConnectionGater
logger *logger.SubLogger

host lp2phost.Host
maxConn int
logger *logger.SubLogger
}

func NewConnectionGater(conf *Config, log *logger.SubLogger) (*ConnectionGater, error) {
Expand All @@ -35,32 +39,56 @@ func NewConnectionGater(conf *Config, log *logger.SubLogger) (*ConnectionGater,

return &ConnectionGater{
BasicConnectionGater: connGater,
maxConn: conf.MaxConns,
logger: log,
}, nil
}

func (g *ConnectionGater) SetHost(host lp2phost.Host) {
g.host = host
}

func (g *ConnectionGater) checkThreshold() bool {
return len(g.host.Network().Peers()) > g.maxConn
}

func (g *ConnectionGater) InterceptPeerDial(p lp2ppeer.ID) bool {
if g.checkThreshold() {
g.logger.Debug("InterceptPeerDial rejected: many connections")
return false
}

allow := g.BasicConnectionGater.InterceptPeerDial(p)
if !allow {
g.logger.Debug("InterceptPeerDial not allowed", "p")
g.logger.Debug("InterceptPeerDial rejected", "p")
}

return allow
}

func (g *ConnectionGater) InterceptAddrDial(p lp2ppeer.ID, ma multiaddr.Multiaddr) bool {
if g.checkThreshold() {
g.logger.Debug("InterceptAddrDial rejected: many connections")
return false
}

allow := g.BasicConnectionGater.InterceptAddrDial(p, ma)
if !allow {
g.logger.Debug("InterceptAddrDial not allowed", "p", p, "ma", ma.String())
g.logger.Debug("InterceptAddrDial rejected", "p", p, "ma", ma.String())
}

return allow
}

func (g *ConnectionGater) InterceptAccept(cma lp2pnetwork.ConnMultiaddrs) bool {
if g.checkThreshold() {
g.logger.Debug("InterceptAccept rejected: many connections")
return false
}

allow := g.BasicConnectionGater.InterceptAccept(cma)
if !allow {
g.logger.Debug("InterceptAccept not allowed")
g.logger.Debug("InterceptAccept rejected")
}

return allow
Expand All @@ -71,7 +99,7 @@ func (g *ConnectionGater) InterceptSecured(dir lp2pnetwork.Direction, p lp2ppeer
) bool {
allow := g.BasicConnectionGater.InterceptSecured(dir, p, cma)
if !allow {
g.logger.Debug("InterceptSecured not allowed", "p", p)
g.logger.Debug("InterceptSecured rejected", "p", p)
}

return allow
Expand Down
6 changes: 4 additions & 2 deletions network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func newNetwork(networkName string, conf *Config, log *logger.SubLogger, opts []
if err != nil {
return nil, LibP2PError{Err: err}
}
connGater.SetHost(host)

ctx, cancel := context.WithCancel(context.Background())

Expand All @@ -215,10 +216,10 @@ func newNetwork(networkName string, conf *Config, log *logger.SubLogger, opts []
n.mdns = newMdnsService(ctx, n.host, n.logger)
}
n.dht = newDHTService(n.ctx, n.host, kadProtocolID, conf, n.logger)
n.peerMgr = newPeerMgr(ctx, host, n.dht.kademlia, conf, n.logger)
n.peerMgr = newPeerMgr(ctx, host, n.dht.kademlia, streamProtocolID, conf, n.logger)
n.stream = newStreamService(ctx, n.host, streamProtocolID, n.eventChannel, n.logger)
n.gossip = newGossipService(ctx, n.host, n.eventChannel, conf, n.logger)
n.notifee = newNotifeeService(n.host, n.eventChannel, n.logger, streamProtocolID, conf.Bootstrapper)
n.notifee = newNotifeeService(n.host, n.eventChannel, n.peerMgr, streamProtocolID, conf.Bootstrapper, n.logger)

n.host.Network().Notify(n.notifee)

Expand Down Expand Up @@ -272,6 +273,7 @@ func (n *network) SelfID() lp2ppeer.ID {
}

func (n *network) SendTo(msg []byte, pid lp2pcore.PeerID) error {
n.logger.Trace("Sending new message", "to", pid)
return n.stream.SendRequest(msg, pid)
}

Expand Down
44 changes: 27 additions & 17 deletions network/notifee.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package network
import (
"time"

lp2pcore "github.com/libp2p/go-libp2p/core"
lp2phost "github.com/libp2p/go-libp2p/core/host"
lp2pnetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
Expand All @@ -16,61 +17,70 @@ type NotifeeService struct {
eventChannel chan<- Event
logger *logger.SubLogger
protocolID protocol.ID
peerMgr *peerMgr
bootstrapper bool
}

func newNotifeeService(host lp2phost.Host, eventChannel chan<- Event,
log *logger.SubLogger, protocolID protocol.ID, bootstrapper bool,
func newNotifeeService(host lp2phost.Host, eventChannel chan<- Event, peerMgr *peerMgr,
protocolID protocol.ID, bootstrapper bool, log *logger.SubLogger,
) *NotifeeService {
notifee := &NotifeeService{
host: host,
eventChannel: eventChannel,
logger: log,
protocolID: protocolID,
bootstrapper: bootstrapper,
peerMgr: peerMgr,
logger: log,
}
host.Network().Notify(notifee)
return notifee
}

func (n *NotifeeService) Connected(lp2pn lp2pnetwork.Network, conn lp2pnetwork.Conn) {
peerID := conn.RemotePeer()
n.logger.Info("connected to peer", "pid", peerID)
pid := conn.RemotePeer()
n.logger.Info("connected to peer", "pid", pid, "direction", conn.Stat().Direction)

var protocols []lp2pcore.ProtocolID
go func() {
for i := 0; i < 10; i++ {
// TODO: better way?
// Wait to complete libp2p identify
time.Sleep(1 * time.Second)

peerStore := lp2pn.Peerstore()
protocols, _ := peerStore.GetProtocols(peerID)
protocols, _ = peerStore.GetProtocols(pid)
if len(protocols) > 0 {
if slices.Contains(protocols, n.protocolID) {
n.logger.Debug("peer supports the stream protocol",
"pid", peerID, "protocols", protocols)
"pid", pid, "protocols", protocols)

n.eventChannel <- &ConnectEvent{PeerID: peerID}
n.eventChannel <- &ConnectEvent{PeerID: pid}
} else {
n.logger.Debug("peer doesn't support the stream protocol",
"pid", peerID, "protocols", protocols)
"pid", pid, "protocols", protocols)
}
return
break
}
}

n.logger.Info("unable to get supported protocols", "pid", peerID)
if !n.bootstrapper {
// Close this connection since we can't send a direct message to this peer.
_ = n.host.Network().ClosePeer(peerID)
if len(protocols) == 0 {
n.logger.Info("unable to get supported protocols", "pid", pid)
if !n.bootstrapper {
// Close this connection since we can't send a direct message to this peer.
_ = n.host.Network().ClosePeer(pid)
}
}

n.peerMgr.AddPeer(pid, conn.RemoteMultiaddr(), conn.Stat().Direction, protocols)
}()
}

func (n *NotifeeService) Disconnected(_ lp2pnetwork.Network, conn lp2pnetwork.Conn) {
peerID := conn.RemotePeer()
n.logger.Info("disconnected from peer", "pid", peerID)
n.eventChannel <- &DisconnectEvent{PeerID: peerID}
pid := conn.RemotePeer()
n.logger.Info("disconnected from peer", "pid", pid)
n.eventChannel <- &DisconnectEvent{PeerID: pid}

n.peerMgr.RemovePeer(pid)
}

func (n *NotifeeService) Listen(_ lp2pnetwork.Network, ma multiaddr.Multiaddr) {
Expand Down
116 changes: 80 additions & 36 deletions network/peermgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,58 +2,64 @@ package network

import (
"context"
"sync"
"time"

lp2pdht "github.com/libp2p/go-libp2p-kad-dht"
lp2pcore "github.com/libp2p/go-libp2p/core"
lp2phost "github.com/libp2p/go-libp2p/core/host"
lp2pnet "github.com/libp2p/go-libp2p/core/network"
lp2ppeer "github.com/libp2p/go-libp2p/core/peer"
lp2pswarm "github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/multiformats/go-multiaddr"
"github.com/pactus-project/pactus/util/logger"
"golang.org/x/exp/slices"
)

// peerMgr attempts to keep the p2p host connected to the network
// by keeping a minimum threshold of connections. If the threshold isn't met it
// connects to a random subset of the peerMgr peers. It does not use peer routing
// to discover new peers. To stop a peerMgr cancel the context passed in Start()
// or call Stop().
type peerMgr struct {
ctx context.Context
bootstrapAddrs []lp2ppeer.AddrInfo
minConns int
maxConns int

// Dependencies
host lp2phost.Host
dialer lp2pnet.Dialer
dht *lp2pdht.IpfsDHT

logger *logger.SubLogger
type peerInfo struct {
MultiAddress multiaddr.Multiaddr
Direction lp2pnet.Direction
Protocols []lp2pcore.ProtocolID
}

// newPeerMgr creates a new Peer Manager instance.
// Peer Manager attempts to establish connections with other nodes when the
// number of connections falls below the minimum threshold.
type peerMgr struct {
lk sync.RWMutex

ctx context.Context
bootstrapAddrs []lp2ppeer.AddrInfo
minConns int
maxConns int
host lp2phost.Host
dht *lp2pdht.IpfsDHT
peers map[lp2ppeer.ID]*peerInfo
streamProtocolID lp2pcore.ProtocolID
logger *logger.SubLogger
}

// newPeerMgr creates a new Peer Manager instance.
func newPeerMgr(ctx context.Context, h lp2phost.Host, dht *lp2pdht.IpfsDHT,
conf *Config, log *logger.SubLogger,
streamProtocolID lp2pcore.ProtocolID, conf *Config, log *logger.SubLogger,
) *peerMgr {
b := &peerMgr{
ctx: ctx,
bootstrapAddrs: conf.BootstrapAddrInfos(),
minConns: conf.MinConns,
maxConns: conf.MaxConns,
host: h,
dialer: h.Network(),
dht: dht,
logger: log,
ctx: ctx,
bootstrapAddrs: conf.BootstrapAddrInfos(),
minConns: conf.MinConns,
maxConns: conf.MaxConns,
streamProtocolID: streamProtocolID,
peers: make(map[lp2ppeer.ID]*peerInfo),
host: h,
dht: dht,
logger: log,
}

return b
}

// Start starts the Peer Manager.
func (mgr *peerMgr) Start() {
mgr.checkConnectivity()
mgr.CheckConnectivity()

go func() {
ticker := time.NewTicker(1 * time.Minute)
Expand All @@ -64,7 +70,7 @@ func (mgr *peerMgr) Start() {
case <-mgr.ctx.Done():
return
case <-ticker.C:
mgr.checkConnectivity()
mgr.CheckConnectivity()
}
}
}()
Expand All @@ -75,20 +81,58 @@ func (mgr *peerMgr) Stop() {
// TODO: complete me
}

func (mgr *peerMgr) AddPeer(pid lp2ppeer.ID, ma multiaddr.Multiaddr,
direction lp2pnet.Direction, protocols []lp2pcore.ProtocolID,
) {
mgr.lk.Lock()
defer mgr.lk.Unlock()

mgr.peers[pid] = &peerInfo{
MultiAddress: ma,
Direction: direction,
Protocols: protocols,
}
}

func (mgr *peerMgr) RemovePeer(pid lp2ppeer.ID) {
mgr.lk.Lock()
defer mgr.lk.Unlock()

delete(mgr.peers, pid)
}

// checkConnectivity performs the actual work of maintaining connections.
// It ensures that the number of connections stays within the minimum and maximum thresholds.
func (mgr *peerMgr) checkConnectivity() {
currentPeers := mgr.dialer.Peers()
mgr.logger.Debug("check connectivity", "peers", len(currentPeers))
func (mgr *peerMgr) CheckConnectivity() {
mgr.lk.Lock()
defer mgr.lk.Unlock()

mgr.logger.Debug("check connectivity", "peers", len(mgr.peers))

// Let's check if some peers are disconnected
var connectedPeers []lp2ppeer.ID
for _, p := range currentPeers {
connectedness := mgr.dialer.Connectedness(p)
var streamPeers []lp2ppeer.ID
net := mgr.host.Network()
for pid, pi := range mgr.peers {
connectedness := net.Connectedness(pid)
if connectedness == lp2pnet.Connected {
connectedPeers = append(connectedPeers, p)
connectedPeers = append(connectedPeers, pid)
} else {
mgr.logger.Debug("peer is not connected to us", "peer", p)
mgr.logger.Debug("peer is not connected to us", "peer", pid)
delete(mgr.peers, pid)
}

if slices.Contains(pi.Protocols, mgr.streamProtocolID) {
streamPeers = append(streamPeers, pid)
}
}

// Make sure we have connected to at least one peer that supports the stream protocol.
if len(streamPeers) == 0 {
mgr.logger.Warn("no stream connection") // TODO: is it possible?

for pid := range mgr.peers {
_ = net.ClosePeer(pid)
}
}

Expand Down
Loading

0 comments on commit 9a7adcd

Please sign in to comment.