Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: kademlia higher peer count in lower bins #4953

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 14 additions & 25 deletions pkg/p2p/libp2p/internal/reacher/reacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

const (
pingTimeout = time.Second * 15
workers = 8
workers = 32
retryAfterDuration = time.Minute * 5
)

Expand All @@ -32,8 +32,8 @@ type reacher struct {
mu sync.Mutex
peers map[string]*peer

work chan struct{}
quit chan struct{}
newPeer chan struct{}
quit chan struct{}

pinger p2p.Pinger
notifier p2p.ReachableNotifier
Expand All @@ -53,7 +53,7 @@ type Options struct {
func New(streamer p2p.Pinger, notifier p2p.ReachableNotifier, o *Options) *reacher {

r := &reacher{
work: make(chan struct{}, 1),
newPeer: make(chan struct{}, 1),
quit: make(chan struct{}),
pinger: streamer,
peers: make(map[string]*peer),
Expand Down Expand Up @@ -93,17 +93,15 @@ func (r *reacher) manage() {

for {

p, tryAfter := r.tryAcquirePeer()
p, tryAfter := r.nextPeer()

// if no peer is returned,
// wait until either more work or the closest retry-after time.

// wait for work and tryAfter
if tryAfter > 0 {
select {
case <-r.quit:
return
case <-r.work:
case <-r.newPeer:
continue
case <-time.After(tryAfter):
continue
Expand All @@ -115,12 +113,12 @@ func (r *reacher) manage() {
select {
case <-r.quit:
return
case <-r.work:
case <-r.newPeer:
continue
}
}

// send p to channel
// ping peer
select {
case <-r.quit:
return
Expand All @@ -135,10 +133,6 @@ func (r *reacher) ping(c chan *peer, ctx context.Context) {

for p := range c {

r.mu.Lock()
overlay := p.overlay
r.mu.Unlock()

now := time.Now()

ctxt, cancel := context.WithTimeout(ctx, r.options.PingTimeout)
Expand All @@ -149,30 +143,25 @@ func (r *reacher) ping(c chan *peer, ctx context.Context) {
if err == nil {
r.metrics.Pings.WithLabelValues("success").Inc()
r.metrics.PingTime.WithLabelValues("success").Observe(time.Since(now).Seconds())
r.notifier.Reachable(overlay, p2p.ReachabilityStatusPublic)
r.notifier.Reachable(p.overlay, p2p.ReachabilityStatusPublic)
} else {
r.metrics.Pings.WithLabelValues("failure").Inc()
r.metrics.PingTime.WithLabelValues("failure").Observe(time.Since(now).Seconds())
r.notifier.Reachable(overlay, p2p.ReachabilityStatusPrivate)
r.notifier.Reachable(p.overlay, p2p.ReachabilityStatusPrivate)
}

r.notifyManage()
}
}

func (r *reacher) tryAcquirePeer() (*peer, time.Duration) {
func (r *reacher) nextPeer() (*peer, time.Duration) {
r.mu.Lock()
defer r.mu.Unlock()

var (
now = time.Now()
nextClosest time.Time
)
var nextClosest time.Time

for _, p := range r.peers {

// retry after has expired, retry
if now.After(p.retryAfter) {
if time.Now().After(p.retryAfter) {
p.retryAfter = time.Now().Add(r.options.RetryAfterDuration)
return p, 0
}
Expand All @@ -193,7 +182,7 @@ func (r *reacher) tryAcquirePeer() (*peer, time.Duration) {

func (r *reacher) notifyManage() {
select {
case r.work <- struct{}{}:
case r.newPeer <- struct{}{}:
default:
}
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/salud/salud.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64,

s.logger.Debug("computed", "avg_dur", avgDur, "pDur", pDur, "pConns", pConns, "network_radius", networkRadius, "neighborhood_radius", nHoodRadius, "batch_commitment", commitment)

// sort peers by duration, highest first
sort.Slice(peers, func(i, j int) bool {
return peers[i].dur > peers[j].dur // descending
})

for _, peer := range peers {

var healthy bool
Expand Down
89 changes: 53 additions & 36 deletions pkg/topology/kademlia/binprefix.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,59 +13,76 @@ import (

// generateCommonBinPrefixes generates the common bin prefixes
// used by the bin balancer.
func generateCommonBinPrefixes(base swarm.Address, suffixLength int) [][]swarm.Address {
func generateConstCommonBinPrefixes(base swarm.Address, suffixLength int) [][]swarm.Address {
binPrefixes := make([][]swarm.Address, int(swarm.MaxBins))

for i := 0; i < int(swarm.MaxBins); i++ {
binPrefixes[i] = generateBinPrefixes(base, i, suffixLength)
}

return binPrefixes
}

// generateCommonBinPrefixes generates the common bin prefixes
// used by the bin balancer.
func generateCommonBinPrefixes(base swarm.Address, suffixLengths []int) [][]swarm.Address {
binPrefixes := make([][]swarm.Address, int(swarm.MaxBins))

for i := 0; i < int(swarm.MaxBins); i++ {
binPrefixes[i] = generateBinPrefixes(base, i, suffixLengths[i])
}

return binPrefixes
}

func generateBinPrefixes(base swarm.Address, bin, suffixLength int) []swarm.Address {
bitCombinationsCount := int(math.Pow(2, float64(suffixLength)))
bitSuffixes := make([]uint8, bitCombinationsCount)

for i := 0; i < bitCombinationsCount; i++ {
bitSuffixes[i] = uint8(i)
}

binPrefixes := make([][]swarm.Address, int(swarm.MaxBins))
binPrefixes := make([]swarm.Address, bitCombinationsCount)

// copy base address
for i := range binPrefixes {
binPrefixes[i] = make([]swarm.Address, bitCombinationsCount)
for j := range binPrefixes[i] {
binPrefixes[i][j] = base.Clone()
}
for j := range binPrefixes {
binPrefixes[j] = base.Clone()
}

for i := range binPrefixes {
for j := range binPrefixes[i] {
pseudoAddrBytes := binPrefixes[i][j].Bytes()

if len(pseudoAddrBytes) < 1 {
continue
}

// flip first bit for bin
indexByte, posBit := i/8, i%8
if hasBit(bits.Reverse8(pseudoAddrBytes[indexByte]), uint8(posBit)) {
pseudoAddrBytes[indexByte] = bits.Reverse8(clearBit(bits.Reverse8(pseudoAddrBytes[indexByte]), uint8(posBit)))
} else {
pseudoAddrBytes[indexByte] = bits.Reverse8(setBit(bits.Reverse8(pseudoAddrBytes[indexByte]), uint8(posBit)))
}
for j := range binPrefixes {
pseudoAddrBytes := binPrefixes[j].Bytes()

// set pseudo suffix
bitSuffixPos := suffixLength - 1
for l := i + 1; l < i+suffixLength+1; l++ {
index, pos := l/8, l%8
if len(pseudoAddrBytes) < 1 {
continue
}

if hasBit(bitSuffixes[j], uint8(bitSuffixPos)) {
pseudoAddrBytes[index] = bits.Reverse8(setBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos)))
} else {
pseudoAddrBytes[index] = bits.Reverse8(clearBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos)))
}
// flip first bit for bin
indexByte, posBit := bin/8, bin%8
if hasBit(bits.Reverse8(pseudoAddrBytes[indexByte]), uint8(posBit)) {
pseudoAddrBytes[indexByte] = bits.Reverse8(clearBit(bits.Reverse8(pseudoAddrBytes[indexByte]), uint8(posBit)))
} else {
pseudoAddrBytes[indexByte] = bits.Reverse8(setBit(bits.Reverse8(pseudoAddrBytes[indexByte]), uint8(posBit)))
}

bitSuffixPos--
}
// set pseudo suffix
bitSuffixPos := suffixLength - 1
for l := bin + 1; l < bin+suffixLength+1; l++ {
index, pos := l/8, l%8

// clear rest of the bits
for l := i + suffixLength + 1; l < len(pseudoAddrBytes)*8; l++ {
index, pos := l/8, l%8
if hasBit(bitSuffixes[j], uint8(bitSuffixPos)) {
pseudoAddrBytes[index] = bits.Reverse8(setBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos)))
} else {
pseudoAddrBytes[index] = bits.Reverse8(clearBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos)))
}

bitSuffixPos--
}

// clear rest of the bits
for l := bin + suffixLength + 1; l < len(pseudoAddrBytes)*8; l++ {
index, pos := l/8, l%8
pseudoAddrBytes[index] = bits.Reverse8(clearBit(bits.Reverse8(pseudoAddrBytes[index]), uint8(pos)))
}
}

Expand Down
45 changes: 45 additions & 0 deletions pkg/topology/kademlia/binprefix_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package kademlia

import (
"testing"

"github.com/ethersphere/bee/v2/pkg/swarm"
)

func TestGenerateCommonBinPrefixes(t *testing.T) {
base := swarm.MustParseHexAddress("abcdef1234567890")
suffixLength := 3

suffixLengths := make([]int, 32)
for i := range suffixLengths {
suffixLengths[i] = 3
}

prefixes := generateCommonBinPrefixes(base, suffixLengths)

if len(prefixes) != int(swarm.MaxBins) {
t.Fatalf("expected %d bins, got %d", swarm.MaxBins, len(prefixes))
}

for i := 0; i < int(swarm.MaxBins); i++ {
for j := 0; j < len(prefixes[i]); j++ {
proximity := swarm.Proximity(base.Bytes(), prefixes[i][j].Bytes())
if want := uint8(i); want != proximity {
t.Fatalf("expected proximity %d for bin %d, got %d", want, i, proximity)
}
}
}

bitCombinationsCount := 1 << suffixLength
for i := 0; i < int(swarm.MaxBins); i++ {
if len(prefixes[i]) != bitCombinationsCount {
t.Fatalf("expected %d addresses in bin %d, got %d", bitCombinationsCount, i, len(prefixes[i]))
}

for j := 0; j < bitCombinationsCount; j++ {
if len(prefixes[i][j].Bytes()) != len(base.Bytes()) {
t.Fatalf("expected address length %d, got %d", len(base.Bytes()), len(prefixes[i][j].Bytes()))
}
}
}
}
2 changes: 1 addition & 1 deletion pkg/topology/kademlia/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
PruneOversaturatedBinsFunc = func(k *Kad) func(uint8) {
return k.pruneOversaturatedBins
}
GenerateCommonBinPrefixes = generateCommonBinPrefixes
GenerateCommonBinPrefixes = generateConstCommonBinPrefixes
)

const (
Expand Down Expand Up @@ -57,7 +57,7 @@
}

closestConnectedPO := swarm.ExtendedProximity(closestConnectedPeer.Bytes(), pseudoAddr.Bytes())
if int(closestConnectedPO) < int(bin)+k.opt.BitSuffixLength+1 {

Check failure on line 60 in pkg/topology/kademlia/export_test.go

View workflow job for this annotation

GitHub Actions / Lint

k.opt.BitSuffixLength undefined (type kadOptions has no field or method BitSuffixLength) (typecheck)

Check failure on line 60 in pkg/topology/kademlia/export_test.go

View workflow job for this annotation

GitHub Actions / Lint

k.opt.BitSuffixLength undefined (type kadOptions has no field or method BitSuffixLength)) (typecheck)

Check failure on line 60 in pkg/topology/kademlia/export_test.go

View workflow job for this annotation

GitHub Actions / Test (ubuntu-latest)

k.opt.BitSuffixLength undefined (type kadOptions has no field or method BitSuffixLength)

Check failure on line 60 in pkg/topology/kademlia/export_test.go

View workflow job for this annotation

GitHub Actions / Test (flaky)

k.opt.BitSuffixLength undefined (type kadOptions has no field or method BitSuffixLength)

Check failure on line 60 in pkg/topology/kademlia/export_test.go

View workflow job for this annotation

GitHub Actions / Test (windows-latest)

k.opt.BitSuffixLength undefined (type kadOptions has no field or method BitSuffixLength)
return false
}
}
Expand Down
Loading
Loading