Skip to content

Commit

Permalink
Add remaining unit tests for the switch
Browse files Browse the repository at this point in the history
  • Loading branch information
zivkovicmilos committed Oct 27, 2024
1 parent a11726e commit e7a987b
Show file tree
Hide file tree
Showing 3 changed files with 441 additions and 25 deletions.
2 changes: 2 additions & 0 deletions tm2/pkg/p2p/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ func (r *Reactor) handleDiscoveryRequest(peer p2p.Peer) error {
peers = make([]*types.NetAddress, 0, len(localPeers))
)

// TODO exclude private peers

// Shuffle and limit the peers shared
shufflePeers(localPeers)

Expand Down
65 changes: 41 additions & 24 deletions tm2/pkg/p2p/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,43 +323,60 @@ func (sw *MultiplexSwitch) runRedialLoop(ctx context.Context) {
ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
sw.Logger.Debug("redial crawl context canceled")
// redialFn goes through the persistent peer list
// and dials missing peers
redialFn := func() {
var (
peers = sw.Peers()
peersToDial = make([]*types.NetAddress, 0)
)

return
case <-ticker.C:
peers := sw.Peers()
sw.persistentPeers.Range(func(key, value any) bool {
var (
id = key.(types.ID)
addr = value.(*types.NetAddress)
)

peersToDial := make([]*types.NetAddress, 0)
// Check if the peer is part of the peer set
// or is scheduled for dialing
if peers.Has(id) || sw.dialQueue.Has(addr) {
return true
}

sw.persistentPeers.Range(func(key, value any) bool {
var (
id = key.(types.ID)
addr = value.(*types.NetAddress)
)
peersToDial = append(peersToDial, addr)

// Check if the peer is part of the peer set
// or is scheduled for dialing
if peers.Has(id) || sw.dialQueue.Has(addr) {
return true
}
return true
})

peersToDial = append(peersToDial, addr)
if len(peersToDial) == 0 {
// No persistent peers are missing
return
}

return true
})
// Add the peers to the dial queue
sw.DialPeers(peersToDial...)
}

// Run the initial redial loop on start,
// in case persistent peer connections are not
// active
redialFn()

for {
select {
case <-ctx.Done():
sw.Logger.Debug("redial crawl context canceled")

// Add the peers to the dial queue
sw.DialPeers(peersToDial...)
return
case <-ticker.C:
redialFn()
}
}
}

// DialPeers adds the peers to the dial queue for async dialing.
// To monitor dial progress, subscribe to adequate p2p MultiplexSwitch events
func (sw *MultiplexSwitch) DialPeers(peerAddrs ...*types.NetAddress) {
func (sw *MultiplexSwitch) DialPeers(peerAddrs ...*types.NetAddress) { // TODO remove pointer
for _, peerAddr := range peerAddrs {
// Check if this is our address
if peerAddr.Same(sw.transport.NetAddress()) {
Expand Down
Loading

0 comments on commit e7a987b

Please sign in to comment.