Skip to content

Commit

Permalink
chore: use fetcher controller (#87)
Browse files Browse the repository at this point in the history
* feat: use fetcher controller

* chore: moved state to a separate module

* chore: moved controller to a separate module

* chore: add controller tests and fetcher stubs

* chore: fixed typo

* chore: use mutex in state

* chore: add state test
  • Loading branch information
freak12techno authored Jan 7, 2025
1 parent aa542a9 commit c262e48
Show file tree
Hide file tree
Showing 48 changed files with 455 additions and 119 deletions.
39 changes: 7 additions & 32 deletions pkg/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ package pkg

import (
"context"
controllerPkg "main/pkg/controller"
fetchersPkg "main/pkg/fetchers"
"main/pkg/fs"
generatorsPkg "main/pkg/generators"
statePkg "main/pkg/state"
"main/pkg/tendermint"
"main/pkg/tracing"
"main/pkg/types"
"net/http"
"sync"
"time"

"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
Expand Down Expand Up @@ -44,6 +42,8 @@ type App struct {
// Example: ActiveSetTokenGenerator generates a metric
// based on ValidatorsFetcher and StakingParamsFetcher.
Generators []generatorsPkg.Generator

Controller *controllerPkg.Controller
}

func NewApp(configPath string, filesystem fs.FS, version string) *App {
Expand Down Expand Up @@ -122,6 +122,8 @@ func NewApp(configPath string, filesystem fs.FS, version string) *App {
generatorsPkg.NewSupplyGenerator(appConfig.Chains),
}

controller := controllerPkg.NewController(fetchers, logger)

server := &http.Server{Addr: appConfig.ListenAddress, Handler: nil}

return &App{
Expand All @@ -132,6 +134,7 @@ func NewApp(configPath string, filesystem fs.FS, version string) *App {
Fetchers: fetchers,
Generators: generators,
Server: server,
Controller: controller,
}
}

Expand Down Expand Up @@ -174,35 +177,7 @@ func (a *App) Handler(w http.ResponseWriter, r *http.Request) {

registry := prometheus.NewRegistry()

var wg sync.WaitGroup
var mutex sync.Mutex

var queryInfos []*types.QueryInfo

state := statePkg.NewState()

for _, fetchersExt := range a.Fetchers {
wg.Add(1)

go func(fetcher fetchersPkg.Fetcher) {
childQuerierCtx, fetcherSpan := a.Tracer.Start(
rootSpanCtx,
"Fetcher "+string(fetcher.Name()),
trace.WithAttributes(attribute.String("fetcher", string(fetcher.Name()))),
)
defer fetcherSpan.End()

defer wg.Done()
data, fetcherQueryInfos := fetcher.Fetch(childQuerierCtx)

mutex.Lock()
state.Set(fetcher.Name(), data)
queryInfos = append(queryInfos, fetcherQueryInfos...)
mutex.Unlock()
}(fetchersExt)
}

wg.Wait()
state, queryInfos := a.Controller.Fetch(rootSpanCtx)

queriesMetrics := NewQueriesMetrics(a.Config.Chains, queryInfos)
registry.MustRegister(queriesMetrics.GetMetrics(rootSpanCtx)...)
Expand Down
2 changes: 2 additions & 0 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ const (
FetcherNameNodeInfo FetcherName = "node_info"
FetcherNameInflation FetcherName = "inflation"
FetcherNameSupply FetcherName = "supply"
FetcherNameStub1 FetcherName = "stub1"
FetcherNameStub2 FetcherName = "stub2"

MetricsPrefix string = "cosmos_validators_exporter_"

Expand Down
118 changes: 118 additions & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package controller

import (
"context"
"main/pkg/constants"
fetchersPkg "main/pkg/fetchers"
statePkg "main/pkg/state"
"main/pkg/types"
"sync"

"github.com/rs/zerolog"
)

type FetchersStatuses map[constants.FetcherName]bool

func (s FetchersStatuses) IsAllDone(fetcherNames []constants.FetcherName) bool {
for _, fetcherName := range fetcherNames {
if _, ok := s[fetcherName]; !ok {
return false
}
}

return true
}

type Controller struct {
Fetchers fetchersPkg.Fetchers
Logger zerolog.Logger
}

func NewController(
fetchers fetchersPkg.Fetchers,
logger *zerolog.Logger,
) *Controller {
return &Controller{
Logger: logger.With().
Str("component", "controller").
Logger(),
Fetchers: fetchers,
}
}

func (c *Controller) Fetch(ctx context.Context) (
*statePkg.State,
[]*types.QueryInfo,
) {
data := statePkg.NewState()
queries := []*types.QueryInfo{}
fetchersStatus := FetchersStatuses{}

var mutex sync.Mutex
var wg sync.WaitGroup

processFetcher := func(fetcher fetchersPkg.Fetcher) {
defer wg.Done()

c.Logger.Trace().Str("name", string(fetcher.Name())).Msg("Processing fetcher...")

mutex.Lock()
fetcherDependenciesData := data.GetData(fetcher.Dependencies())
mutex.Unlock()

fetcherData, fetcherQueries := fetcher.Fetch(ctx, fetcherDependenciesData...)

mutex.Lock()
data.Set(fetcher.Name(), fetcherData)
queries = append(queries, fetcherQueries...)
fetchersStatus[fetcher.Name()] = true
mutex.Unlock()

c.Logger.Trace().
Str("name", string(fetcher.Name())).
Msg("Processed fetcher")
}

for {
c.Logger.Trace().Msg("Processing all pending fetchers...")

if fetchersStatus.IsAllDone(c.Fetchers.GetNames()) {
c.Logger.Trace().Msg("All fetchers are fetched.")
break
}

fetchersToStart := fetchersPkg.Fetchers{}

for _, fetcher := range c.Fetchers {
if _, ok := fetchersStatus[fetcher.Name()]; ok {
c.Logger.Trace().
Str("name", string(fetcher.Name())).
Msg("Fetcher is already being processed or is processed, skipping.")
continue
}

if !fetchersStatus.IsAllDone(fetcher.Dependencies()) {
c.Logger.Trace().
Str("name", string(fetcher.Name())).
Msg("Fetcher's dependencies are not yet processed, skipping for now.")
continue
}

fetchersToStart = append(fetchersToStart, fetcher)
}

c.Logger.Trace().
Strs("names", fetchersToStart.GetNamesAsString()).
Msg("Starting the following fetchers")

wg.Add(len(fetchersToStart))

for _, fetcher := range fetchersToStart {
go processFetcher(fetcher)
}

wg.Wait()
}

return data, queries
}
24 changes: 24 additions & 0 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package controller

import (
"context"
fetchersPkg "main/pkg/fetchers"
loggerPkg "main/pkg/logger"
"testing"

"github.com/stretchr/testify/assert"
)

func TestControllerFetcherEnabled(t *testing.T) {
t.Parallel()

logger := loggerPkg.GetNopLogger()
controller := NewController(fetchersPkg.Fetchers{
&fetchersPkg.StubFetcher1{},
&fetchersPkg.StubFetcher2{},
}, logger)

data, queryInfos := controller.Fetch(context.Background())
assert.Empty(t, queryInfos)
assert.Equal(t, 2, data.Length())
}
5 changes: 5 additions & 0 deletions pkg/fetchers/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,13 @@ func NewBalanceFetcher(
}
}

func (q *BalanceFetcher) Dependencies() []constants.FetcherName {
return []constants.FetcherName{}
}

func (q *BalanceFetcher) Fetch(
ctx context.Context,
data ...interface{},
) (interface{}, []*types.QueryInfo) {
q.queryInfos = []*types.QueryInfo{}
q.allBalances = map[string]map[string][]types.Amount{}
Expand Down
5 changes: 5 additions & 0 deletions pkg/fetchers/commission.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,13 @@ func NewCommissionFetcher(
}
}

func (q *CommissionFetcher) Dependencies() []constants.FetcherName {
return []constants.FetcherName{}
}

func (q *CommissionFetcher) Fetch(
ctx context.Context,
data ...interface{},
) (interface{}, []*types.QueryInfo) {
var queryInfos []*types.QueryInfo

Expand Down
4 changes: 4 additions & 0 deletions pkg/fetchers/consumer_commission.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,12 @@ func NewConsumerCommissionFetcher(
}
}

func (f *ConsumerCommissionFetcher) Dependencies() []constants.FetcherName {
return []constants.FetcherName{}
}
func (f *ConsumerCommissionFetcher) Fetch(
ctx context.Context,
data ...interface{},
) (interface{}, []*types.QueryInfo) {
f.queryInfos = []*types.QueryInfo{}
f.data = map[string]map[string]*types.ConsumerCommissionResponse{}
Expand Down
4 changes: 4 additions & 0 deletions pkg/fetchers/consumer_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,12 @@ func NewConsumerInfoFetcher(
}
}

func (f *ConsumerInfoFetcher) Dependencies() []constants.FetcherName {
return []constants.FetcherName{}
}
func (f *ConsumerInfoFetcher) Fetch(
ctx context.Context,
data ...interface{},
) (interface{}, []*types.QueryInfo) {
f.queryInfos = []*types.QueryInfo{}
f.allInfos = map[string]map[string]types.ConsumerChainInfo{}
Expand Down
5 changes: 5 additions & 0 deletions pkg/fetchers/consumer_validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,13 @@ func NewConsumerValidatorsFetcher(
}
}

func (f *ConsumerValidatorsFetcher) Dependencies() []constants.FetcherName {
return []constants.FetcherName{}
}

func (f *ConsumerValidatorsFetcher) Fetch(
ctx context.Context,
data ...interface{},
) (interface{}, []*types.QueryInfo) {
f.queryInfos = []*types.QueryInfo{}
f.allValidators = map[string]*types.ConsumerValidatorsResponse{}
Expand Down
5 changes: 5 additions & 0 deletions pkg/fetchers/delegations.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,13 @@ func NewDelegationsFetcher(
}
}

func (q *DelegationsFetcher) Dependencies() []constants.FetcherName {
return []constants.FetcherName{}
}

func (q *DelegationsFetcher) Fetch(
ctx context.Context,
data ...interface{},
) (interface{}, []*types.QueryInfo) {
var queryInfos []*types.QueryInfo

Expand Down
25 changes: 24 additions & 1 deletion pkg/fetchers/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,29 @@ import (
)

type Fetcher interface {
Fetch(ctx context.Context) (interface{}, []*types.QueryInfo)
Fetch(ctx context.Context, data ...interface{}) (interface{}, []*types.QueryInfo)
Dependencies() []constants.FetcherName
Name() constants.FetcherName
}

type Fetchers []Fetcher

func (f Fetchers) GetNames() []constants.FetcherName {
names := make([]constants.FetcherName, len(f))

for index, fetcher := range f {
names[index] = fetcher.Name()
}

return names
}

func (f Fetchers) GetNamesAsString() []string {
names := make([]string, len(f))

for index, fetcher := range f {
names[index] = string(fetcher.Name())
}

return names
}
5 changes: 5 additions & 0 deletions pkg/fetchers/has_to_validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,13 @@ func NewValidatorConsumersFetcher(
}
}

func (q *ValidatorConsumersFetcher) Dependencies() []constants.FetcherName {
return []constants.FetcherName{}
}

func (q *ValidatorConsumersFetcher) Fetch(
ctx context.Context,
data ...interface{},
) (interface{}, []*types.QueryInfo) {
q.queryInfos = []*types.QueryInfo{}
q.allValidatorsConsumers = map[string]map[string]map[string]bool{}
Expand Down
4 changes: 4 additions & 0 deletions pkg/fetchers/inflation.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,12 @@ func NewInflationFetcher(
}
}

func (q *InflationFetcher) Dependencies() []constants.FetcherName {
return []constants.FetcherName{}
}
func (q *InflationFetcher) Fetch(
ctx context.Context,
data ...interface{},
) (interface{}, []*types.QueryInfo) {
var queryInfos []*types.QueryInfo

Expand Down
5 changes: 5 additions & 0 deletions pkg/fetchers/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,13 @@ func NewNodeInfoFetcher(
}
}

func (q *NodeInfoFetcher) Dependencies() []constants.FetcherName {
return []constants.FetcherName{}
}

func (q *NodeInfoFetcher) Fetch(
ctx context.Context,
data ...interface{},
) (interface{}, []*types.QueryInfo) {
q.queryInfos = []*types.QueryInfo{}
q.allNodeInfos = map[string]*types.NodeInfoResponse{}
Expand Down
Loading

0 comments on commit c262e48

Please sign in to comment.