From cbefced36884c2324633301374a59fef48daab68 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Tue, 21 Jan 2025 01:21:02 +0300 Subject: [PATCH 1/8] fix: debug log --- split.sh | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 split.sh diff --git a/split.sh b/split.sh new file mode 100644 index 00000000000..930c25a5ccc --- /dev/null +++ b/split.sh @@ -0,0 +1,30 @@ +#!/bin/bash + +# Base URL +base_url="http:/localhost:1633/chunks" + +# File containing paths +file_path="refs.txt" + + +success_count=0 + +# Read each line from the file +while IFS= read -r line +do + # Construct the full URL + url="${base_url}/${line}" + + # Fetch the URL + echo "$url" + + # response=$(curl "$url" -s -o /dev/null -w "%{http_code}") + + # if [ "$response" -eq 200 ]; then + # success_count=$((success_count + 1)) + # fi + +done < "$file_path" + +# Output the count of successful requests +echo "Number of successful requests: $success_count" From 59d7871c54a9e1359c4bdd96bdf296a90654e9e1 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Tue, 21 Jan 2025 01:21:26 +0300 Subject: [PATCH 2/8] fix: removed file --- split.sh | 30 ------------------------------ 1 file changed, 30 deletions(-) delete mode 100644 split.sh diff --git a/split.sh b/split.sh deleted file mode 100644 index 930c25a5ccc..00000000000 --- a/split.sh +++ /dev/null @@ -1,30 +0,0 @@ -#!/bin/bash - -# Base URL -base_url="http:/localhost:1633/chunks" - -# File containing paths -file_path="refs.txt" - - -success_count=0 - -# Read each line from the file -while IFS= read -r line -do - # Construct the full URL - url="${base_url}/${line}" - - # Fetch the URL - echo "$url" - - # response=$(curl "$url" -s -o /dev/null -w "%{http_code}") - - # if [ "$response" -eq 200 ]; then - # success_count=$((success_count + 1)) - # fi - -done < "$file_path" - -# Output the count of successful requests -echo "Number of successful requests: $success_count" From cdc0f491caeff3973a5c875e2a8e3b340df54ff5 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Tue, 21 Jan 2025 18:33:42 +0300 Subject: [PATCH 3/8] perf: kademlia higher peer count in lower bins --- pkg/topology/kademlia/kademlia.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/topology/kademlia/kademlia.go b/pkg/topology/kademlia/kademlia.go index a310e47b6c6..726b75463bb 100644 --- a/pkg/topology/kademlia/kademlia.go +++ b/pkg/topology/kademlia/kademlia.go @@ -49,7 +49,7 @@ const ( // Default option values const ( - defaultBitSuffixLength = 4 // the number of bits used to create pseudo addresses for balancing, 2^4, 16 addresses + defaultBitSuffixLength = 2 // the number of bits used to create pseudo addresses for balancing, 2^2, 8 addresses defaultLowWaterMark = 3 // the number of peers in consecutive deepest bins that constitute as nearest neighbours defaultSaturationPeers = 8 defaultOverSaturationPeers = 18 @@ -122,6 +122,8 @@ type kadOptions struct { LowWaterMark int } +var saturationCounts = [swarm.MaxBins]int{64, 32, 16, 12, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8} + func newKadOptions(o Options) kadOptions { ko := kadOptions{ // copy values @@ -874,7 +876,7 @@ func binSaturated(oversaturationAmount int, staticNode staticPeerFunc) binSatura return false, false, nil }) - return size >= oversaturationAmount + return size >= saturationCounts[bin] } } @@ -889,7 +891,7 @@ func binPruneCount(oversaturationAmount int, staticNode staticPeerFunc) pruneCou return false, false, nil }) - return size, size - oversaturationAmount + return size, size - saturationCounts[bin] } } From c19974eff203adc369debabfa83eac39ebf8df7d Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Tue, 21 Jan 2025 19:14:30 +0300 Subject: [PATCH 4/8] fix: prefix --- pkg/topology/kademlia/binprefix.go | 89 +++++++++++++++---------- pkg/topology/kademlia/binprefix_test.go | 45 +++++++++++++ pkg/topology/kademlia/export_test.go | 2 +- pkg/topology/kademlia/kademlia.go | 51 ++++++-------- pkg/topology/kademlia/kademlia_test.go | 1 + 5 files changed, 121 insertions(+), 67 deletions(-) create mode 100644 pkg/topology/kademlia/binprefix_test.go diff --git a/pkg/topology/kademlia/binprefix.go b/pkg/topology/kademlia/binprefix.go index 94e2550ea2d..5785e86971c 100644 --- a/pkg/topology/kademlia/binprefix.go +++ b/pkg/topology/kademlia/binprefix.go @@ -13,7 +13,29 @@ import ( // generateCommonBinPrefixes generates the common bin prefixes // used by the bin balancer. -func generateCommonBinPrefixes(base swarm.Address, suffixLength int) [][]swarm.Address { +func generateConstCommonBinPrefixes(base swarm.Address, suffixLength int) [][]swarm.Address { + binPrefixes := make([][]swarm.Address, int(swarm.MaxBins)) + + for i := 0; i < int(swarm.MaxBins); i++ { + binPrefixes[i] = generateBinPrefixes(base, i, suffixLength) + } + + return binPrefixes +} + +// generateCommonBinPrefixes generates the common bin prefixes +// used by the bin balancer. +func generateCommonBinPrefixes(base swarm.Address, suffixLengths []int) [][]swarm.Address { + binPrefixes := make([][]swarm.Address, int(swarm.MaxBins)) + + for i := 0; i < int(swarm.MaxBins); i++ { + binPrefixes[i] = generateBinPrefixes(base, i, suffixLengths[i]) + } + + return binPrefixes +} + +func generateBinPrefixes(base swarm.Address, bin, suffixLength int) []swarm.Address { bitCombinationsCount := int(math.Pow(2, float64(suffixLength))) bitSuffixes := make([]uint8, bitCombinationsCount) @@ -21,51 +43,46 @@ func generateCommonBinPrefixes(base swarm.Address, suffixLength int) [][]swarm.A bitSuffixes[i] = uint8(i) } - binPrefixes := make([][]swarm.Address, int(swarm.MaxBins)) + binPrefixes := make([]swarm.Address, bitCombinationsCount) // copy base address - for i := range binPrefixes { - binPrefixes[i] = make([]swarm.Address, bitCombinationsCount) - for j := range binPrefixes[i] { - binPrefixes[i][j] = base.Clone() - } + for j := range binPrefixes { + binPrefixes[j] = base.Clone() } - for i := range binPrefixes { - for j := range binPrefixes[i] { - pseudoAddrBytes := binPrefixes[i][j].Bytes() - - if len(pseudoAddrBytes) < 1 { - continue - } - - // flip first bit for bin - indexByte, posBit := i/8, i%8 - if hasBit(bits.Reverse8(pseudoAddrBytes[indexByte]), uint8(posBit)) { - pseudoAddrBytes[indexByte] = bits.Reverse8(clearBit(bits.Reverse8(pseudoAddrBytes[indexByte]), uint8(posBit))) - } else { - pseudoAddrBytes[indexByte] = bits.Reverse8(setBit(bits.Reverse8(pseudoAddrBytes[indexByte]), uint8(posBit))) - } + for j := range binPrefixes { + pseudoAddrBytes := binPrefixes[j].Bytes() - // set pseudo suffix - bitSuffixPos := suffixLength - 1 - for l := i + 1; l < i+suffixLength+1; l++ { - index, pos := l/8, l%8 + if len(pseudoAddrBytes) < 1 { + continue + } - if hasBit(bitSuffixes[j], uint8(bitSuffixPos)) { - pseudoAddrBytes[index] = bits.Reverse8(setBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos))) - } else { - pseudoAddrBytes[index] = bits.Reverse8(clearBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos))) - } + // flip first bit for bin + indexByte, posBit := bin/8, bin%8 + if hasBit(bits.Reverse8(pseudoAddrBytes[indexByte]), uint8(posBit)) { + pseudoAddrBytes[indexByte] = bits.Reverse8(clearBit(bits.Reverse8(pseudoAddrBytes[indexByte]), uint8(posBit))) + } else { + pseudoAddrBytes[indexByte] = bits.Reverse8(setBit(bits.Reverse8(pseudoAddrBytes[indexByte]), uint8(posBit))) + } - bitSuffixPos-- - } + // set pseudo suffix + bitSuffixPos := suffixLength - 1 + for l := bin + 1; l < bin+suffixLength+1; l++ { + index, pos := l/8, l%8 - // clear rest of the bits - for l := i + suffixLength + 1; l < len(pseudoAddrBytes)*8; l++ { - index, pos := l/8, l%8 + if hasBit(bitSuffixes[j], uint8(bitSuffixPos)) { + pseudoAddrBytes[index] = bits.Reverse8(setBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos))) + } else { pseudoAddrBytes[index] = bits.Reverse8(clearBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos))) } + + bitSuffixPos-- + } + + // clear rest of the bits + for l := bin + suffixLength + 1; l < len(pseudoAddrBytes)*8; l++ { + index, pos := l/8, l%8 + pseudoAddrBytes[index] = bits.Reverse8(clearBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos))) } } diff --git a/pkg/topology/kademlia/binprefix_test.go b/pkg/topology/kademlia/binprefix_test.go new file mode 100644 index 00000000000..d44cd2e08b1 --- /dev/null +++ b/pkg/topology/kademlia/binprefix_test.go @@ -0,0 +1,45 @@ +package kademlia + +import ( + "testing" + + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +func TestGenerateCommonBinPrefixes(t *testing.T) { + base := swarm.MustParseHexAddress("abcdef1234567890") + suffixLength := 3 + + suffixLengths := make([]int, 32) + for i := range suffixLengths { + suffixLengths[i] = 3 + } + + prefixes := generateCommonBinPrefixes(base, suffixLengths) + + if len(prefixes) != int(swarm.MaxBins) { + t.Fatalf("expected %d bins, got %d", swarm.MaxBins, len(prefixes)) + } + + for i := 0; i < int(swarm.MaxBins); i++ { + for j := 0; j < len(prefixes[i]); j++ { + proximity := swarm.Proximity(base.Bytes(), prefixes[i][j].Bytes()) + if want := uint8(i); want != proximity { + t.Fatalf("expected proximity %d for bin %d, got %d", want, i, proximity) + } + } + } + + bitCombinationsCount := 1 << suffixLength + for i := 0; i < int(swarm.MaxBins); i++ { + if len(prefixes[i]) != bitCombinationsCount { + t.Fatalf("expected %d addresses in bin %d, got %d", bitCombinationsCount, i, len(prefixes[i])) + } + + for j := 0; j < bitCombinationsCount; j++ { + if len(prefixes[i][j].Bytes()) != len(base.Bytes()) { + t.Fatalf("expected address length %d, got %d", len(base.Bytes()), len(prefixes[i][j].Bytes())) + } + } + } +} diff --git a/pkg/topology/kademlia/export_test.go b/pkg/topology/kademlia/export_test.go index 3c5ad3e1390..93ab87f064d 100644 --- a/pkg/topology/kademlia/export_test.go +++ b/pkg/topology/kademlia/export_test.go @@ -14,7 +14,7 @@ var ( PruneOversaturatedBinsFunc = func(k *Kad) func(uint8) { return k.pruneOversaturatedBins } - GenerateCommonBinPrefixes = generateCommonBinPrefixes + GenerateCommonBinPrefixes = generateConstCommonBinPrefixes ) const ( diff --git a/pkg/topology/kademlia/kademlia.go b/pkg/topology/kademlia/kademlia.go index 726b75463bb..9d6e6f0f3e1 100644 --- a/pkg/topology/kademlia/kademlia.go +++ b/pkg/topology/kademlia/kademlia.go @@ -49,7 +49,7 @@ const ( // Default option values const ( - defaultBitSuffixLength = 2 // the number of bits used to create pseudo addresses for balancing, 2^2, 8 addresses + defaultBitSuffixLength = 4 // the number of bits used to create pseudo addresses for balancing, 2^2, 8 addresses defaultLowWaterMark = 3 // the number of peers in consecutive deepest bins that constitute as nearest neighbours defaultSaturationPeers = 8 defaultOverSaturationPeers = 18 @@ -111,10 +111,10 @@ type kadOptions struct { StaticNodes []swarm.Address ExcludeFunc excludeFunc - TimeToRetry time.Duration - ShortRetry time.Duration - PruneWakeup time.Duration - BitSuffixLength int // additional depth of common prefix for bin + TimeToRetry time.Duration + ShortRetry time.Duration + PruneWakeup time.Duration + // BitSuffixLength int // additional depth of common prefix for bin SaturationPeers int OverSaturationPeers int BootnodeOverSaturationPeers int @@ -122,7 +122,10 @@ type kadOptions struct { LowWaterMark int } -var saturationCounts = [swarm.MaxBins]int{64, 32, 16, 12, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8} +var ( + saturationCounts = [swarm.MaxBins]int{64, 32, 16, 12, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8} + prefixBits = [swarm.MaxBins]int{6, 5, 4, 4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3} +) func newKadOptions(o Options) kadOptions { ko := kadOptions{ @@ -134,12 +137,10 @@ func newKadOptions(o Options) kadOptions { StaticNodes: o.StaticNodes, ExcludeFunc: o.ExcludeFunc, // copy or use default - TimeToRetry: defaultValDuration(o.TimeToRetry, defaultTimeToRetry), - ShortRetry: defaultValDuration(o.ShortRetry, defaultShortRetry), - PruneWakeup: defaultValDuration(o.PruneWakeup, defaultPruneWakeup), - BitSuffixLength: defaultValInt(o.BitSuffixLength, defaultBitSuffixLength), - SaturationPeers: defaultValInt(o.SaturationPeers, defaultSaturationPeers), - OverSaturationPeers: defaultValInt(o.OverSaturationPeers, defaultOverSaturationPeers), + TimeToRetry: defaultValDuration(o.TimeToRetry, defaultTimeToRetry), + ShortRetry: defaultValDuration(o.ShortRetry, defaultShortRetry), + PruneWakeup: defaultValDuration(o.PruneWakeup, defaultPruneWakeup), + // BitSuffixLength: defaultValInt(o.BitSuffixLength, defaultBitSuffixLength), BootnodeOverSaturationPeers: defaultValInt(o.BootnodeOverSaturationPeers, defaultBootNodeOverSaturationPeers), BroadcastBinSize: defaultValInt(o.BroadcastBinSize, defaultBroadcastBinSize), LowWaterMark: defaultValInt(o.LowWaterMark, defaultLowWaterMark), @@ -167,11 +168,7 @@ func defaultValDuration(v *time.Duration, d time.Duration) time.Duration { } func makeSaturationFunc(o kadOptions) binSaturationFunc { - os := o.OverSaturationPeers - if o.BootnodeMode { - os = o.BootnodeOverSaturationPeers - } - return binSaturated(os, isStaticPeer(o.StaticNodes)) + return binSaturated(isStaticPeer(o.StaticNodes)) } // Kad is the Swarm forwarding kademlia implementation. @@ -258,11 +255,7 @@ func New( k.opt.PruneFunc = k.pruneOversaturatedBins } - os := k.opt.OverSaturationPeers - if k.opt.BootnodeMode { - os = k.opt.BootnodeOverSaturationPeers - } - k.opt.PruneCountFunc = binPruneCount(os, isStaticPeer(k.opt.StaticNodes)) + k.opt.PruneCountFunc = binPruneCount(isStaticPeer(k.opt.StaticNodes)) if k.opt.ExcludeFunc == nil { k.opt.ExcludeFunc = func(f ...im.ExcludeOp) peerExcludeFunc { @@ -272,9 +265,7 @@ func New( } } - if k.opt.BitSuffixLength > 0 { - k.commonBinPrefixes = generateCommonBinPrefixes(k.base, k.opt.BitSuffixLength) - } + k.commonBinPrefixes = generateCommonBinPrefixes(k.base, prefixBits[:]) k.bgBroadcastCtx, k.bgBroadcastCancel = context.WithCancel(context.Background()) @@ -317,12 +308,12 @@ func (k *Kad) connectBalanced(wg *sync.WaitGroup, peerConnChan chan<- *peerConnI // Connect to closest known peer which we haven't tried connecting to recently. - _, exists := nClosePeerInSlice(binConnectedPeers, pseudoAddr, noopSanctionedPeerFn, uint8(i+k.opt.BitSuffixLength+1)) + _, exists := nClosePeerInSlice(binConnectedPeers, pseudoAddr, noopSanctionedPeerFn, uint8(i+prefixBits[i]+1)) if exists { continue } - closestKnownPeer, exists := nClosePeerInSlice(binPeers, pseudoAddr, skipPeers, uint8(i+k.opt.BitSuffixLength+1)) + closestKnownPeer, exists := nClosePeerInSlice(binPeers, pseudoAddr, skipPeers, uint8(i+prefixBits[i]+1)) if !exists { continue } @@ -738,7 +729,7 @@ func (k *Kad) balancedSlotPeers(pseudoAddr swarm.Address, peers []swarm.Address, var ret []swarm.Address for _, peer := range peers { - if int(swarm.ExtendedProximity(peer.Bytes(), pseudoAddr.Bytes())) >= po+k.opt.BitSuffixLength+1 { + if int(swarm.ExtendedProximity(peer.Bytes(), pseudoAddr.Bytes())) >= po+prefixBits[po]+1 { ret = append(ret, peer) } } @@ -866,7 +857,7 @@ func (k *Kad) connectBootNodes(ctx context.Context) { // binSaturated indicates whether a certain bin is saturated or not. // when a bin is not saturated it means we would like to proactively // initiate connections to other peers in the bin. -func binSaturated(oversaturationAmount int, staticNode staticPeerFunc) binSaturationFunc { +func binSaturated(staticNode staticPeerFunc) binSaturationFunc { return func(bin uint8, connected *pslice.PSlice, exclude peerExcludeFunc) bool { size := 0 _ = connected.EachBin(func(addr swarm.Address, po uint8) (bool, bool, error) { @@ -881,7 +872,7 @@ func binSaturated(oversaturationAmount int, staticNode staticPeerFunc) binSatura } // binPruneCount counts how many peers should be pruned from a bin. -func binPruneCount(oversaturationAmount int, staticNode staticPeerFunc) pruneCountFunc { +func binPruneCount(staticNode staticPeerFunc) pruneCountFunc { return func(bin uint8, connected *pslice.PSlice, exclude peerExcludeFunc) (int, int) { size := 0 _ = connected.EachBin(func(addr swarm.Address, po uint8) (bool, bool, error) { diff --git a/pkg/topology/kademlia/kademlia_test.go b/pkg/topology/kademlia/kademlia_test.go index 674b2c77d6b..8bd6792985e 100644 --- a/pkg/topology/kademlia/kademlia_test.go +++ b/pkg/topology/kademlia/kademlia_test.go @@ -1898,6 +1898,7 @@ func mineBin(t *testing.T, base swarm.Address, bin, count int, isBalanced bool) } if isBalanced { + prefixes := kademlia.GenerateCommonBinPrefixes(base, kademlia.DefaultBitSuffixLength) for i := 0; i < int(math.Pow(2, float64(kademlia.DefaultBitSuffixLength))); i++ { rndAddrs[i] = prefixes[bin][i] From 885a21bedc1f64229723cec6359eb6b07c1f1e40 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Tue, 21 Jan 2025 19:32:01 +0300 Subject: [PATCH 5/8] fix: asd --- pkg/topology/kademlia/kademlia.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/topology/kademlia/kademlia.go b/pkg/topology/kademlia/kademlia.go index 9d6e6f0f3e1..4ea0af9c37a 100644 --- a/pkg/topology/kademlia/kademlia.go +++ b/pkg/topology/kademlia/kademlia.go @@ -123,8 +123,9 @@ type kadOptions struct { } var ( - saturationCounts = [swarm.MaxBins]int{64, 32, 16, 12, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8} - prefixBits = [swarm.MaxBins]int{6, 5, 4, 4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3} + saturationCounts = [swarm.MaxBins]int{64, 32, 16, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8} + prefixBits = [swarm.MaxBins]int{6, 5, 4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3} + // prefixBits = [swarm.MaxBins]int{5, 4, 3, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2} ) func newKadOptions(o Options) kadOptions { From f6628dccf379c9f0ccb3a699e0567005fa74a447 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Wed, 22 Jan 2025 14:17:34 +0300 Subject: [PATCH 6/8] fix: reduced suffix bit count --- pkg/topology/kademlia/kademlia.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/topology/kademlia/kademlia.go b/pkg/topology/kademlia/kademlia.go index 4ea0af9c37a..5f5c9e8316a 100644 --- a/pkg/topology/kademlia/kademlia.go +++ b/pkg/topology/kademlia/kademlia.go @@ -123,9 +123,9 @@ type kadOptions struct { } var ( - saturationCounts = [swarm.MaxBins]int{64, 32, 16, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8} - prefixBits = [swarm.MaxBins]int{6, 5, 4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3} - // prefixBits = [swarm.MaxBins]int{5, 4, 3, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2} + oversaturationCounts = [swarm.MaxBins]int{64, 32, 16, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8} + suffixBits = [swarm.MaxBins]int{5, 4, 3, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2} + // prefixBits = [swarm.MaxBins]int{6, 5, 4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3} ) func newKadOptions(o Options) kadOptions { @@ -266,7 +266,7 @@ func New( } } - k.commonBinPrefixes = generateCommonBinPrefixes(k.base, prefixBits[:]) + k.commonBinPrefixes = generateCommonBinPrefixes(k.base, suffixBits[:]) k.bgBroadcastCtx, k.bgBroadcastCancel = context.WithCancel(context.Background()) @@ -309,12 +309,12 @@ func (k *Kad) connectBalanced(wg *sync.WaitGroup, peerConnChan chan<- *peerConnI // Connect to closest known peer which we haven't tried connecting to recently. - _, exists := nClosePeerInSlice(binConnectedPeers, pseudoAddr, noopSanctionedPeerFn, uint8(i+prefixBits[i]+1)) + _, exists := nClosePeerInSlice(binConnectedPeers, pseudoAddr, noopSanctionedPeerFn, uint8(i+suffixBits[i]+1)) if exists { continue } - closestKnownPeer, exists := nClosePeerInSlice(binPeers, pseudoAddr, skipPeers, uint8(i+prefixBits[i]+1)) + closestKnownPeer, exists := nClosePeerInSlice(binPeers, pseudoAddr, skipPeers, uint8(i+suffixBits[i]+1)) if !exists { continue } @@ -730,7 +730,7 @@ func (k *Kad) balancedSlotPeers(pseudoAddr swarm.Address, peers []swarm.Address, var ret []swarm.Address for _, peer := range peers { - if int(swarm.ExtendedProximity(peer.Bytes(), pseudoAddr.Bytes())) >= po+prefixBits[po]+1 { + if int(swarm.ExtendedProximity(peer.Bytes(), pseudoAddr.Bytes())) >= po+suffixBits[po]+1 { ret = append(ret, peer) } } @@ -868,7 +868,7 @@ func binSaturated(staticNode staticPeerFunc) binSaturationFunc { return false, false, nil }) - return size >= saturationCounts[bin] + return size >= oversaturationCounts[bin] } } @@ -883,7 +883,7 @@ func binPruneCount(staticNode staticPeerFunc) pruneCountFunc { return false, false, nil }) - return size, size - saturationCounts[bin] + return size, size - oversaturationCounts[bin] } } From c31379b4c74fc9fd274f551d66912ded6f12c8ed Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Wed, 22 Jan 2025 17:12:35 +0300 Subject: [PATCH 7/8] fix: reacher --- pkg/p2p/libp2p/internal/reacher/reacher.go | 39 ++++++++-------------- pkg/topology/kademlia/kademlia.go | 5 ++- 2 files changed, 16 insertions(+), 28 deletions(-) diff --git a/pkg/p2p/libp2p/internal/reacher/reacher.go b/pkg/p2p/libp2p/internal/reacher/reacher.go index ad888df4d0f..ee070755fe9 100644 --- a/pkg/p2p/libp2p/internal/reacher/reacher.go +++ b/pkg/p2p/libp2p/internal/reacher/reacher.go @@ -18,7 +18,7 @@ import ( const ( pingTimeout = time.Second * 15 - workers = 8 + workers = 32 retryAfterDuration = time.Minute * 5 ) @@ -32,8 +32,8 @@ type reacher struct { mu sync.Mutex peers map[string]*peer - work chan struct{} - quit chan struct{} + newPeer chan struct{} + quit chan struct{} pinger p2p.Pinger notifier p2p.ReachableNotifier @@ -53,7 +53,7 @@ type Options struct { func New(streamer p2p.Pinger, notifier p2p.ReachableNotifier, o *Options) *reacher { r := &reacher{ - work: make(chan struct{}, 1), + newPeer: make(chan struct{}, 1), quit: make(chan struct{}), pinger: streamer, peers: make(map[string]*peer), @@ -93,17 +93,15 @@ func (r *reacher) manage() { for { - p, tryAfter := r.tryAcquirePeer() + p, tryAfter := r.nextPeer() // if no peer is returned, // wait until either more work or the closest retry-after time. - - // wait for work and tryAfter if tryAfter > 0 { select { case <-r.quit: return - case <-r.work: + case <-r.newPeer: continue case <-time.After(tryAfter): continue @@ -115,12 +113,12 @@ func (r *reacher) manage() { select { case <-r.quit: return - case <-r.work: + case <-r.newPeer: continue } } - // send p to channel + // ping peer select { case <-r.quit: return @@ -135,10 +133,6 @@ func (r *reacher) ping(c chan *peer, ctx context.Context) { for p := range c { - r.mu.Lock() - overlay := p.overlay - r.mu.Unlock() - now := time.Now() ctxt, cancel := context.WithTimeout(ctx, r.options.PingTimeout) @@ -149,30 +143,25 @@ func (r *reacher) ping(c chan *peer, ctx context.Context) { if err == nil { r.metrics.Pings.WithLabelValues("success").Inc() r.metrics.PingTime.WithLabelValues("success").Observe(time.Since(now).Seconds()) - r.notifier.Reachable(overlay, p2p.ReachabilityStatusPublic) + r.notifier.Reachable(p.overlay, p2p.ReachabilityStatusPublic) } else { r.metrics.Pings.WithLabelValues("failure").Inc() r.metrics.PingTime.WithLabelValues("failure").Observe(time.Since(now).Seconds()) - r.notifier.Reachable(overlay, p2p.ReachabilityStatusPrivate) + r.notifier.Reachable(p.overlay, p2p.ReachabilityStatusPrivate) } - - r.notifyManage() } } -func (r *reacher) tryAcquirePeer() (*peer, time.Duration) { +func (r *reacher) nextPeer() (*peer, time.Duration) { r.mu.Lock() defer r.mu.Unlock() - var ( - now = time.Now() - nextClosest time.Time - ) + var nextClosest time.Time for _, p := range r.peers { // retry after has expired, retry - if now.After(p.retryAfter) { + if time.Now().After(p.retryAfter) { p.retryAfter = time.Now().Add(r.options.RetryAfterDuration) return p, 0 } @@ -193,7 +182,7 @@ func (r *reacher) tryAcquirePeer() (*peer, time.Duration) { func (r *reacher) notifyManage() { select { - case r.work <- struct{}{}: + case r.newPeer <- struct{}{}: default: } } diff --git a/pkg/topology/kademlia/kademlia.go b/pkg/topology/kademlia/kademlia.go index 5f5c9e8316a..b9c201a7a8d 100644 --- a/pkg/topology/kademlia/kademlia.go +++ b/pkg/topology/kademlia/kademlia.go @@ -123,9 +123,8 @@ type kadOptions struct { } var ( - oversaturationCounts = [swarm.MaxBins]int{64, 32, 16, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8} - suffixBits = [swarm.MaxBins]int{5, 4, 3, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2} - // prefixBits = [swarm.MaxBins]int{6, 5, 4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3} + oversaturationCounts = [swarm.MaxBins]int{64, 32, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16} + suffixBits = [swarm.MaxBins]int{5, 4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3} ) func newKadOptions(o Options) kadOptions { From 0d3ed1b562ec1c106e12ed9a0848e293f7e50c69 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Thu, 23 Jan 2025 02:07:37 +0300 Subject: [PATCH 8/8] fix: asd --- pkg/salud/salud.go | 5 +++++ pkg/topology/kademlia/kademlia.go | 23 +++++++++++------------ 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/pkg/salud/salud.go b/pkg/salud/salud.go index d47abf3fd76..38484189dae 100644 --- a/pkg/salud/salud.go +++ b/pkg/salud/salud.go @@ -184,6 +184,11 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64, s.logger.Debug("computed", "avg_dur", avgDur, "pDur", pDur, "pConns", pConns, "network_radius", networkRadius, "neighborhood_radius", nHoodRadius, "batch_commitment", commitment) + // sort peers by duration, highest first + sort.Slice(peers, func(i, j int) bool { + return peers[i].dur > peers[j].dur // descending + }) + for _, peer := range peers { var healthy bool diff --git a/pkg/topology/kademlia/kademlia.go b/pkg/topology/kademlia/kademlia.go index b9c201a7a8d..1530d6629b1 100644 --- a/pkg/topology/kademlia/kademlia.go +++ b/pkg/topology/kademlia/kademlia.go @@ -123,8 +123,8 @@ type kadOptions struct { } var ( - oversaturationCounts = [swarm.MaxBins]int{64, 32, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16} - suffixBits = [swarm.MaxBins]int{5, 4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3} + oversaturationCounts = [swarm.MaxBins]int{36, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18} + suffixBits = [swarm.MaxBins]int{5, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4} ) func newKadOptions(o Options) kadOptions { @@ -290,30 +290,29 @@ func (k *Kad) connectBalanced(wg *sync.WaitGroup, peerConnChan chan<- *peerConnI depth := k.neighborhoodDepth() - for i := range k.commonBinPrefixes { + for bin := range k.commonBinPrefixes { - binPeersLength := k.knownPeers.BinSize(uint8(i)) + binPeersLength := k.knownPeers.BinSize(uint8(bin)) // balancer should skip on bins where neighborhood connector would connect to peers anyway // and there are not enough peers in known addresses to properly balance the bin - if i >= int(depth) && binPeersLength < len(k.commonBinPrefixes[i]) { + if bin >= int(depth) && binPeersLength < len(k.commonBinPrefixes[bin]) { continue } - binPeers := k.knownPeers.BinPeers(uint8(i)) - binConnectedPeers := k.connectedPeers.BinPeers(uint8(i)) + binPeers := k.knownPeers.BinPeers(uint8(bin)) + binConnectedPeers := k.connectedPeers.BinPeers(uint8(bin)) - for j := range k.commonBinPrefixes[i] { - pseudoAddr := k.commonBinPrefixes[i][j] + for j := range k.commonBinPrefixes[bin] { + pseudoAddr := k.commonBinPrefixes[bin][j] // Connect to closest known peer which we haven't tried connecting to recently. - - _, exists := nClosePeerInSlice(binConnectedPeers, pseudoAddr, noopSanctionedPeerFn, uint8(i+suffixBits[i]+1)) + _, exists := nClosePeerInSlice(binConnectedPeers, pseudoAddr, noopSanctionedPeerFn, uint8(bin+suffixBits[bin]+1)) if exists { continue } - closestKnownPeer, exists := nClosePeerInSlice(binPeers, pseudoAddr, skipPeers, uint8(i+suffixBits[i]+1)) + closestKnownPeer, exists := nClosePeerInSlice(binPeers, pseudoAddr, skipPeers, uint8(bin+suffixBits[bin]+1)) if !exists { continue }