Skip to content

Commit

Permalink
app: eth2wrap best selector refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
pinebit committed Jan 19, 2024
1 parent 226bdfc commit 0fc9d08
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 86 deletions.
89 changes: 38 additions & 51 deletions app/eth2wrap/eth2wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func NewMultiHTTP(timeout time.Duration, addresses ...string) (Client, error) {
func newMulti(clients []Client) Client {
return multi{
clients: clients,
selector: newBestSelector(len(clients), bestPeriod),
selector: newBestSelector(bestPeriod),
}
}

Expand All @@ -114,17 +114,17 @@ type multi struct {
selector *bestSelector
}

// bestIdx increments the selector with the best client index.
func (m multi) bestIdx(i int) {
m.selector.Increment(i)
}

func (multi) Name() string {
return "eth2wrap.multi"
}

func (m multi) Address() string {
return m.clients[m.selector.Best()].Address()
address, ok := m.selector.BestAddress()
if !ok {
return m.clients[0].Address()
}

return address
}

func (m multi) SetValidatorCache(valCache func(context.Context) (ActiveValidators, error)) {
Expand Down Expand Up @@ -159,7 +159,7 @@ func (m multi) ProposerConfig(ctx context.Context) (*eth2exp.ProposerConfigRespo
func(ctx context.Context, cl Client) (*eth2exp.ProposerConfigResponse, error) {
return cl.ProposerConfig(ctx)
},
nil, m.bestIdx,
nil, m.selector,
)
if err != nil {
incError(label)
Expand All @@ -177,7 +177,7 @@ func (m multi) AggregateBeaconCommitteeSelections(ctx context.Context, selection
func(ctx context.Context, cl Client) ([]*eth2exp.BeaconCommitteeSelection, error) {
return cl.AggregateBeaconCommitteeSelections(ctx, selections)
},
nil, m.bestIdx,
nil, m.selector,
)
if err != nil {
incError(label)
Expand All @@ -195,7 +195,7 @@ func (m multi) AggregateSyncCommitteeSelections(ctx context.Context, selections
func(ctx context.Context, cl Client) ([]*eth2exp.SyncCommitteeSelection, error) {
return cl.AggregateSyncCommitteeSelections(ctx, selections)
},
nil, m.bestIdx,
nil, m.selector,
)
if err != nil {
incError(label)
Expand All @@ -213,7 +213,7 @@ func (m multi) BlockAttestations(ctx context.Context, stateID string) ([]*eth2p0
func(ctx context.Context, cl Client) ([]*eth2p0.Attestation, error) {
return cl.BlockAttestations(ctx, stateID)
},
nil, m.bestIdx,
nil, m.selector,
)
if err != nil {
incError(label)
Expand All @@ -231,7 +231,7 @@ func (m multi) NodePeerCount(ctx context.Context) (int, error) {
func(ctx context.Context, cl Client) (int, error) {
return cl.NodePeerCount(ctx)
},
nil, m.bestIdx,
nil, m.selector,
)
if err != nil {
incError(label)
Expand All @@ -245,14 +245,11 @@ func (m multi) NodePeerCount(ctx context.Context) (int, error) {
// first successful result or first error.
// The bestIdxFunc is called with the index of the client returning a successful response.
func provide[O any](ctx context.Context, clients []Client,
work forkjoin.Work[Client, O], isSuccessFunc func(O) bool, bestIdxFunc func(int),
work forkjoin.Work[Client, O], isSuccessFunc func(O) bool, bestSelector *bestSelector,
) (O, error) {
if isSuccessFunc == nil {
isSuccessFunc = func(O) bool { return true }
}
if bestIdxFunc == nil {
bestIdxFunc = func(int) {}
}

fork, join, cancel := forkjoin.New(ctx, work,
forkjoin.WithoutFailFast(),
Expand All @@ -272,13 +269,9 @@ func provide[O any](ctx context.Context, clients []Client,
if ctx.Err() != nil {
return zero, ctx.Err()
} else if res.Err == nil && isSuccessFunc(res.Output) {
// TODO(corver): Find a better way to get the index of successful client.
for i, client := range clients {
if client.Address() == res.Input.Address() {
bestIdxFunc(i)
}
if bestSelector != nil {
bestSelector.Increment(res.Input.Address())
}

return res.Output, nil

Check failure on line 275 in app/eth2wrap/eth2wrap.go

View workflow job for this annotation

GitHub Actions / golangci

return with no blank line before (nlreturn)
} else {
nokResp = res
Expand All @@ -298,14 +291,12 @@ func provide[O any](ctx context.Context, clients []Client,
type empty struct{}

// submit proxies provide, but returns nil instead of a successful result.
func submit(ctx context.Context, clients []Client, work func(context.Context, Client) error,
bestIdxFunc func(int),
) error {
func submit(ctx context.Context, clients []Client, work func(context.Context, Client) error, selector *bestSelector) error {
_, err := provide(ctx, clients,
func(ctx context.Context, cl Client) (empty, error) {
return empty{}, work(ctx, cl)
},
nil, bestIdxFunc,
nil, selector,
)

return err
Expand Down Expand Up @@ -368,52 +359,48 @@ func wrapError(ctx context.Context, err error, label string, fields ...z.Field)
}

// newBestSelector returns a new bestSelector.
func newBestSelector(n int, period time.Duration) *bestSelector {
func newBestSelector(period time.Duration) *bestSelector {
return &bestSelector{
n: n,
counts: make(map[string]int),
start: time.Now(),
period: period,
counts: make([]int, n),
}
}

// bestSelector calculates the "best client index" per period.
type bestSelector struct {
n int
mu sync.Mutex
mu sync.RWMutex
counts map[string]int
start time.Time
period time.Duration
counts []int
}

// Best returns the best index or 0 if no counts.
func (s *bestSelector) Best() int {
s.mu.Lock()
defer s.mu.Unlock()

var resp, count int
for i, c := range s.counts {
if c > count {
resp = i
count = c
// BestAddress returns the best client address when ok is true.
func (s *bestSelector) BestAddress() (address string, ok bool) {
s.mu.RLock()
defer s.mu.RUnlock()

var maxCount int
for addr, count := range s.counts {
if count > maxCount {
ok = true
address = addr
maxCount = count
}
}

return resp
return address, ok
}

// Increment increments the counter for the given index.
func (s *bestSelector) Increment(i int) {
// Increment increments the counter for the given address.
func (s *bestSelector) Increment(address string) {
s.mu.Lock()
defer s.mu.Unlock()

if i < 0 || i >= s.n {
panic("invalid index") // This should never happen
}

if time.Since(s.start) > s.period { // Reset counters after period.
s.counts = make([]int, s.n)
s.counts = make(map[string]int)

Check warning on line 401 in app/eth2wrap/eth2wrap.go

View check run for this annotation

Codecov / codecov/patch

app/eth2wrap/eth2wrap.go#L401

Added line #L401 was not covered by tests
s.start = time.Now()
}

s.counts[i]++
s.counts[address]++
}
Loading

0 comments on commit 0fc9d08

Please sign in to comment.