diff --git a/pkg/p2p/libp2p/internal/reacher/reacher.go b/pkg/p2p/libp2p/internal/reacher/reacher.go index ad888df4d0f..ee070755fe9 100644 --- a/pkg/p2p/libp2p/internal/reacher/reacher.go +++ b/pkg/p2p/libp2p/internal/reacher/reacher.go @@ -18,7 +18,7 @@ import ( const ( pingTimeout = time.Second * 15 - workers = 8 + workers = 32 retryAfterDuration = time.Minute * 5 ) @@ -32,8 +32,8 @@ type reacher struct { mu sync.Mutex peers map[string]*peer - work chan struct{} - quit chan struct{} + newPeer chan struct{} + quit chan struct{} pinger p2p.Pinger notifier p2p.ReachableNotifier @@ -53,7 +53,7 @@ type Options struct { func New(streamer p2p.Pinger, notifier p2p.ReachableNotifier, o *Options) *reacher { r := &reacher{ - work: make(chan struct{}, 1), + newPeer: make(chan struct{}, 1), quit: make(chan struct{}), pinger: streamer, peers: make(map[string]*peer), @@ -93,17 +93,15 @@ func (r *reacher) manage() { for { - p, tryAfter := r.tryAcquirePeer() + p, tryAfter := r.nextPeer() // if no peer is returned, // wait until either more work or the closest retry-after time. - - // wait for work and tryAfter if tryAfter > 0 { select { case <-r.quit: return - case <-r.work: + case <-r.newPeer: continue case <-time.After(tryAfter): continue @@ -115,12 +113,12 @@ func (r *reacher) manage() { select { case <-r.quit: return - case <-r.work: + case <-r.newPeer: continue } } - // send p to channel + // ping peer select { case <-r.quit: return @@ -135,10 +133,6 @@ func (r *reacher) ping(c chan *peer, ctx context.Context) { for p := range c { - r.mu.Lock() - overlay := p.overlay - r.mu.Unlock() - now := time.Now() ctxt, cancel := context.WithTimeout(ctx, r.options.PingTimeout) @@ -149,30 +143,25 @@ func (r *reacher) ping(c chan *peer, ctx context.Context) { if err == nil { r.metrics.Pings.WithLabelValues("success").Inc() r.metrics.PingTime.WithLabelValues("success").Observe(time.Since(now).Seconds()) - r.notifier.Reachable(overlay, p2p.ReachabilityStatusPublic) + r.notifier.Reachable(p.overlay, p2p.ReachabilityStatusPublic) } else { r.metrics.Pings.WithLabelValues("failure").Inc() r.metrics.PingTime.WithLabelValues("failure").Observe(time.Since(now).Seconds()) - r.notifier.Reachable(overlay, p2p.ReachabilityStatusPrivate) + r.notifier.Reachable(p.overlay, p2p.ReachabilityStatusPrivate) } - - r.notifyManage() } } -func (r *reacher) tryAcquirePeer() (*peer, time.Duration) { +func (r *reacher) nextPeer() (*peer, time.Duration) { r.mu.Lock() defer r.mu.Unlock() - var ( - now = time.Now() - nextClosest time.Time - ) + var nextClosest time.Time for _, p := range r.peers { // retry after has expired, retry - if now.After(p.retryAfter) { + if time.Now().After(p.retryAfter) { p.retryAfter = time.Now().Add(r.options.RetryAfterDuration) return p, 0 } @@ -193,7 +182,7 @@ func (r *reacher) tryAcquirePeer() (*peer, time.Duration) { func (r *reacher) notifyManage() { select { - case r.work <- struct{}{}: + case r.newPeer <- struct{}{}: default: } } diff --git a/pkg/salud/salud.go b/pkg/salud/salud.go index d47abf3fd76..38484189dae 100644 --- a/pkg/salud/salud.go +++ b/pkg/salud/salud.go @@ -184,6 +184,11 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64, s.logger.Debug("computed", "avg_dur", avgDur, "pDur", pDur, "pConns", pConns, "network_radius", networkRadius, "neighborhood_radius", nHoodRadius, "batch_commitment", commitment) + // sort peers by duration, highest first + sort.Slice(peers, func(i, j int) bool { + return peers[i].dur > peers[j].dur // descending + }) + for _, peer := range peers { var healthy bool diff --git a/pkg/topology/kademlia/binprefix.go b/pkg/topology/kademlia/binprefix.go index 94e2550ea2d..5785e86971c 100644 --- a/pkg/topology/kademlia/binprefix.go +++ b/pkg/topology/kademlia/binprefix.go @@ -13,7 +13,29 @@ import ( // generateCommonBinPrefixes generates the common bin prefixes // used by the bin balancer. -func generateCommonBinPrefixes(base swarm.Address, suffixLength int) [][]swarm.Address { +func generateConstCommonBinPrefixes(base swarm.Address, suffixLength int) [][]swarm.Address { + binPrefixes := make([][]swarm.Address, int(swarm.MaxBins)) + + for i := 0; i < int(swarm.MaxBins); i++ { + binPrefixes[i] = generateBinPrefixes(base, i, suffixLength) + } + + return binPrefixes +} + +// generateCommonBinPrefixes generates the common bin prefixes +// used by the bin balancer. +func generateCommonBinPrefixes(base swarm.Address, suffixLengths []int) [][]swarm.Address { + binPrefixes := make([][]swarm.Address, int(swarm.MaxBins)) + + for i := 0; i < int(swarm.MaxBins); i++ { + binPrefixes[i] = generateBinPrefixes(base, i, suffixLengths[i]) + } + + return binPrefixes +} + +func generateBinPrefixes(base swarm.Address, bin, suffixLength int) []swarm.Address { bitCombinationsCount := int(math.Pow(2, float64(suffixLength))) bitSuffixes := make([]uint8, bitCombinationsCount) @@ -21,51 +43,46 @@ func generateCommonBinPrefixes(base swarm.Address, suffixLength int) [][]swarm.A bitSuffixes[i] = uint8(i) } - binPrefixes := make([][]swarm.Address, int(swarm.MaxBins)) + binPrefixes := make([]swarm.Address, bitCombinationsCount) // copy base address - for i := range binPrefixes { - binPrefixes[i] = make([]swarm.Address, bitCombinationsCount) - for j := range binPrefixes[i] { - binPrefixes[i][j] = base.Clone() - } + for j := range binPrefixes { + binPrefixes[j] = base.Clone() } - for i := range binPrefixes { - for j := range binPrefixes[i] { - pseudoAddrBytes := binPrefixes[i][j].Bytes() - - if len(pseudoAddrBytes) < 1 { - continue - } - - // flip first bit for bin - indexByte, posBit := i/8, i%8 - if hasBit(bits.Reverse8(pseudoAddrBytes[indexByte]), uint8(posBit)) { - pseudoAddrBytes[indexByte] = bits.Reverse8(clearBit(bits.Reverse8(pseudoAddrBytes[indexByte]), uint8(posBit))) - } else { - pseudoAddrBytes[indexByte] = bits.Reverse8(setBit(bits.Reverse8(pseudoAddrBytes[indexByte]), uint8(posBit))) - } + for j := range binPrefixes { + pseudoAddrBytes := binPrefixes[j].Bytes() - // set pseudo suffix - bitSuffixPos := suffixLength - 1 - for l := i + 1; l < i+suffixLength+1; l++ { - index, pos := l/8, l%8 + if len(pseudoAddrBytes) < 1 { + continue + } - if hasBit(bitSuffixes[j], uint8(bitSuffixPos)) { - pseudoAddrBytes[index] = bits.Reverse8(setBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos))) - } else { - pseudoAddrBytes[index] = bits.Reverse8(clearBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos))) - } + // flip first bit for bin + indexByte, posBit := bin/8, bin%8 + if hasBit(bits.Reverse8(pseudoAddrBytes[indexByte]), uint8(posBit)) { + pseudoAddrBytes[indexByte] = bits.Reverse8(clearBit(bits.Reverse8(pseudoAddrBytes[indexByte]), uint8(posBit))) + } else { + pseudoAddrBytes[indexByte] = bits.Reverse8(setBit(bits.Reverse8(pseudoAddrBytes[indexByte]), uint8(posBit))) + } - bitSuffixPos-- - } + // set pseudo suffix + bitSuffixPos := suffixLength - 1 + for l := bin + 1; l < bin+suffixLength+1; l++ { + index, pos := l/8, l%8 - // clear rest of the bits - for l := i + suffixLength + 1; l < len(pseudoAddrBytes)*8; l++ { - index, pos := l/8, l%8 + if hasBit(bitSuffixes[j], uint8(bitSuffixPos)) { + pseudoAddrBytes[index] = bits.Reverse8(setBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos))) + } else { pseudoAddrBytes[index] = bits.Reverse8(clearBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos))) } + + bitSuffixPos-- + } + + // clear rest of the bits + for l := bin + suffixLength + 1; l < len(pseudoAddrBytes)*8; l++ { + index, pos := l/8, l%8 + pseudoAddrBytes[index] = bits.Reverse8(clearBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos))) } } diff --git a/pkg/topology/kademlia/binprefix_test.go b/pkg/topology/kademlia/binprefix_test.go new file mode 100644 index 00000000000..d44cd2e08b1 --- /dev/null +++ b/pkg/topology/kademlia/binprefix_test.go @@ -0,0 +1,45 @@ +package kademlia + +import ( + "testing" + + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +func TestGenerateCommonBinPrefixes(t *testing.T) { + base := swarm.MustParseHexAddress("abcdef1234567890") + suffixLength := 3 + + suffixLengths := make([]int, 32) + for i := range suffixLengths { + suffixLengths[i] = 3 + } + + prefixes := generateCommonBinPrefixes(base, suffixLengths) + + if len(prefixes) != int(swarm.MaxBins) { + t.Fatalf("expected %d bins, got %d", swarm.MaxBins, len(prefixes)) + } + + for i := 0; i < int(swarm.MaxBins); i++ { + for j := 0; j < len(prefixes[i]); j++ { + proximity := swarm.Proximity(base.Bytes(), prefixes[i][j].Bytes()) + if want := uint8(i); want != proximity { + t.Fatalf("expected proximity %d for bin %d, got %d", want, i, proximity) + } + } + } + + bitCombinationsCount := 1 << suffixLength + for i := 0; i < int(swarm.MaxBins); i++ { + if len(prefixes[i]) != bitCombinationsCount { + t.Fatalf("expected %d addresses in bin %d, got %d", bitCombinationsCount, i, len(prefixes[i])) + } + + for j := 0; j < bitCombinationsCount; j++ { + if len(prefixes[i][j].Bytes()) != len(base.Bytes()) { + t.Fatalf("expected address length %d, got %d", len(base.Bytes()), len(prefixes[i][j].Bytes())) + } + } + } +} diff --git a/pkg/topology/kademlia/export_test.go b/pkg/topology/kademlia/export_test.go index 3c5ad3e1390..93ab87f064d 100644 --- a/pkg/topology/kademlia/export_test.go +++ b/pkg/topology/kademlia/export_test.go @@ -14,7 +14,7 @@ var ( PruneOversaturatedBinsFunc = func(k *Kad) func(uint8) { return k.pruneOversaturatedBins } - GenerateCommonBinPrefixes = generateCommonBinPrefixes + GenerateCommonBinPrefixes = generateConstCommonBinPrefixes ) const ( diff --git a/pkg/topology/kademlia/kademlia.go b/pkg/topology/kademlia/kademlia.go index a310e47b6c6..1530d6629b1 100644 --- a/pkg/topology/kademlia/kademlia.go +++ b/pkg/topology/kademlia/kademlia.go @@ -49,7 +49,7 @@ const ( // Default option values const ( - defaultBitSuffixLength = 4 // the number of bits used to create pseudo addresses for balancing, 2^4, 16 addresses + defaultBitSuffixLength = 4 // the number of bits used to create pseudo addresses for balancing, 2^2, 8 addresses defaultLowWaterMark = 3 // the number of peers in consecutive deepest bins that constitute as nearest neighbours defaultSaturationPeers = 8 defaultOverSaturationPeers = 18 @@ -111,10 +111,10 @@ type kadOptions struct { StaticNodes []swarm.Address ExcludeFunc excludeFunc - TimeToRetry time.Duration - ShortRetry time.Duration - PruneWakeup time.Duration - BitSuffixLength int // additional depth of common prefix for bin + TimeToRetry time.Duration + ShortRetry time.Duration + PruneWakeup time.Duration + // BitSuffixLength int // additional depth of common prefix for bin SaturationPeers int OverSaturationPeers int BootnodeOverSaturationPeers int @@ -122,6 +122,11 @@ type kadOptions struct { LowWaterMark int } +var ( + oversaturationCounts = [swarm.MaxBins]int{36, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18} + suffixBits = [swarm.MaxBins]int{5, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4} +) + func newKadOptions(o Options) kadOptions { ko := kadOptions{ // copy values @@ -132,12 +137,10 @@ func newKadOptions(o Options) kadOptions { StaticNodes: o.StaticNodes, ExcludeFunc: o.ExcludeFunc, // copy or use default - TimeToRetry: defaultValDuration(o.TimeToRetry, defaultTimeToRetry), - ShortRetry: defaultValDuration(o.ShortRetry, defaultShortRetry), - PruneWakeup: defaultValDuration(o.PruneWakeup, defaultPruneWakeup), - BitSuffixLength: defaultValInt(o.BitSuffixLength, defaultBitSuffixLength), - SaturationPeers: defaultValInt(o.SaturationPeers, defaultSaturationPeers), - OverSaturationPeers: defaultValInt(o.OverSaturationPeers, defaultOverSaturationPeers), + TimeToRetry: defaultValDuration(o.TimeToRetry, defaultTimeToRetry), + ShortRetry: defaultValDuration(o.ShortRetry, defaultShortRetry), + PruneWakeup: defaultValDuration(o.PruneWakeup, defaultPruneWakeup), + // BitSuffixLength: defaultValInt(o.BitSuffixLength, defaultBitSuffixLength), BootnodeOverSaturationPeers: defaultValInt(o.BootnodeOverSaturationPeers, defaultBootNodeOverSaturationPeers), BroadcastBinSize: defaultValInt(o.BroadcastBinSize, defaultBroadcastBinSize), LowWaterMark: defaultValInt(o.LowWaterMark, defaultLowWaterMark), @@ -165,11 +168,7 @@ func defaultValDuration(v *time.Duration, d time.Duration) time.Duration { } func makeSaturationFunc(o kadOptions) binSaturationFunc { - os := o.OverSaturationPeers - if o.BootnodeMode { - os = o.BootnodeOverSaturationPeers - } - return binSaturated(os, isStaticPeer(o.StaticNodes)) + return binSaturated(isStaticPeer(o.StaticNodes)) } // Kad is the Swarm forwarding kademlia implementation. @@ -256,11 +255,7 @@ func New( k.opt.PruneFunc = k.pruneOversaturatedBins } - os := k.opt.OverSaturationPeers - if k.opt.BootnodeMode { - os = k.opt.BootnodeOverSaturationPeers - } - k.opt.PruneCountFunc = binPruneCount(os, isStaticPeer(k.opt.StaticNodes)) + k.opt.PruneCountFunc = binPruneCount(isStaticPeer(k.opt.StaticNodes)) if k.opt.ExcludeFunc == nil { k.opt.ExcludeFunc = func(f ...im.ExcludeOp) peerExcludeFunc { @@ -270,9 +265,7 @@ func New( } } - if k.opt.BitSuffixLength > 0 { - k.commonBinPrefixes = generateCommonBinPrefixes(k.base, k.opt.BitSuffixLength) - } + k.commonBinPrefixes = generateCommonBinPrefixes(k.base, suffixBits[:]) k.bgBroadcastCtx, k.bgBroadcastCancel = context.WithCancel(context.Background()) @@ -297,30 +290,29 @@ func (k *Kad) connectBalanced(wg *sync.WaitGroup, peerConnChan chan<- *peerConnI depth := k.neighborhoodDepth() - for i := range k.commonBinPrefixes { + for bin := range k.commonBinPrefixes { - binPeersLength := k.knownPeers.BinSize(uint8(i)) + binPeersLength := k.knownPeers.BinSize(uint8(bin)) // balancer should skip on bins where neighborhood connector would connect to peers anyway // and there are not enough peers in known addresses to properly balance the bin - if i >= int(depth) && binPeersLength < len(k.commonBinPrefixes[i]) { + if bin >= int(depth) && binPeersLength < len(k.commonBinPrefixes[bin]) { continue } - binPeers := k.knownPeers.BinPeers(uint8(i)) - binConnectedPeers := k.connectedPeers.BinPeers(uint8(i)) + binPeers := k.knownPeers.BinPeers(uint8(bin)) + binConnectedPeers := k.connectedPeers.BinPeers(uint8(bin)) - for j := range k.commonBinPrefixes[i] { - pseudoAddr := k.commonBinPrefixes[i][j] + for j := range k.commonBinPrefixes[bin] { + pseudoAddr := k.commonBinPrefixes[bin][j] // Connect to closest known peer which we haven't tried connecting to recently. - - _, exists := nClosePeerInSlice(binConnectedPeers, pseudoAddr, noopSanctionedPeerFn, uint8(i+k.opt.BitSuffixLength+1)) + _, exists := nClosePeerInSlice(binConnectedPeers, pseudoAddr, noopSanctionedPeerFn, uint8(bin+suffixBits[bin]+1)) if exists { continue } - closestKnownPeer, exists := nClosePeerInSlice(binPeers, pseudoAddr, skipPeers, uint8(i+k.opt.BitSuffixLength+1)) + closestKnownPeer, exists := nClosePeerInSlice(binPeers, pseudoAddr, skipPeers, uint8(bin+suffixBits[bin]+1)) if !exists { continue } @@ -736,7 +728,7 @@ func (k *Kad) balancedSlotPeers(pseudoAddr swarm.Address, peers []swarm.Address, var ret []swarm.Address for _, peer := range peers { - if int(swarm.ExtendedProximity(peer.Bytes(), pseudoAddr.Bytes())) >= po+k.opt.BitSuffixLength+1 { + if int(swarm.ExtendedProximity(peer.Bytes(), pseudoAddr.Bytes())) >= po+suffixBits[po]+1 { ret = append(ret, peer) } } @@ -864,7 +856,7 @@ func (k *Kad) connectBootNodes(ctx context.Context) { // binSaturated indicates whether a certain bin is saturated or not. // when a bin is not saturated it means we would like to proactively // initiate connections to other peers in the bin. -func binSaturated(oversaturationAmount int, staticNode staticPeerFunc) binSaturationFunc { +func binSaturated(staticNode staticPeerFunc) binSaturationFunc { return func(bin uint8, connected *pslice.PSlice, exclude peerExcludeFunc) bool { size := 0 _ = connected.EachBin(func(addr swarm.Address, po uint8) (bool, bool, error) { @@ -874,12 +866,12 @@ func binSaturated(oversaturationAmount int, staticNode staticPeerFunc) binSatura return false, false, nil }) - return size >= oversaturationAmount + return size >= oversaturationCounts[bin] } } // binPruneCount counts how many peers should be pruned from a bin. -func binPruneCount(oversaturationAmount int, staticNode staticPeerFunc) pruneCountFunc { +func binPruneCount(staticNode staticPeerFunc) pruneCountFunc { return func(bin uint8, connected *pslice.PSlice, exclude peerExcludeFunc) (int, int) { size := 0 _ = connected.EachBin(func(addr swarm.Address, po uint8) (bool, bool, error) { @@ -889,7 +881,7 @@ func binPruneCount(oversaturationAmount int, staticNode staticPeerFunc) pruneCou return false, false, nil }) - return size, size - oversaturationAmount + return size, size - oversaturationCounts[bin] } } diff --git a/pkg/topology/kademlia/kademlia_test.go b/pkg/topology/kademlia/kademlia_test.go index 674b2c77d6b..8bd6792985e 100644 --- a/pkg/topology/kademlia/kademlia_test.go +++ b/pkg/topology/kademlia/kademlia_test.go @@ -1898,6 +1898,7 @@ func mineBin(t *testing.T, base swarm.Address, bin, count int, isBalanced bool) } if isBalanced { + prefixes := kademlia.GenerateCommonBinPrefixes(base, kademlia.DefaultBitSuffixLength) for i := 0; i < int(math.Pow(2, float64(kademlia.DefaultBitSuffixLength))); i++ { rndAddrs[i] = prefixes[bin][i]