Skip to content

Commit

Permalink
feat: remove tracking error count
Browse files Browse the repository at this point in the history
  • Loading branch information
rustcandy committed Nov 11, 2024
1 parent 6eb75cc commit 44fe002
Show file tree
Hide file tree
Showing 12 changed files with 310 additions and 2 deletions.
16 changes: 15 additions & 1 deletion .docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Let's see what each section refers to:
- [`database`](#database)
- [`pruning`](#pruning)
- [`logging`](#logging)
- [`telemetry`](#telemetry)

## `chain`
This section contains the details of the chain configuration regarding the Cosmos SDK.
Expand All @@ -62,6 +63,7 @@ This section contains the details of the chain configuration regarding the Cosmo
Currently, we support the following modules:

- `pruning` to periodically prune the old database data
- `telemetry` to support a telemetry service

## `node`
This section contains the details of the node to which Juno will connect.
Expand Down Expand Up @@ -142,4 +144,16 @@ if you add the `"pruning"` entry to the `modules` field of the [`chain` config](
|:-------------:|:---------:|:-------------------------------------------------------------------------------------------------------|:--------|
| `interval` | `integer` | Number of blocks that should pass between one pruning and the other (default: prune every `10` blocks) | `100` |
| `keep_every` | `integer` | Keep the state every `nth` block, even if it should have been pruned | `500` |
| `keep_recent` | `integer` | Do not prune this amount of recent states | `100` |
| `keep_recent` | `integer` | Do not prune this amount of recent states | `100` |

## `telemetry`
This section allows to configure the telemetry details of Juno. Note that this will have effect only if you add
the `"telemetry"` entry to the `modules` field of the [`chain` config](#chain).

| Attribute | Type | Description | Example |
|:---------:|:------:|:-----------------------------------------------|:--------|
| `port` | `uint` | Port on which the telemetry server will listen | `8000` |

**Note**
If the telemetry server is enabled, a new endpoint at the provided port and path `/metrics` will
expose [Prometheus](https://prometheus.io/) data.
13 changes: 13 additions & 0 deletions cmd/start/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/forbole/juno/v5/modules"
"github.com/forbole/juno/v5/parser"
"github.com/forbole/juno/v5/prometheus"
"github.com/forbole/juno/v5/types"
cmdtypes "github.com/forbole/juno/v5/types/cmd"
"github.com/forbole/juno/v5/utils"
Expand Down Expand Up @@ -52,6 +53,12 @@ func NewStartCmd() *cobra.Command {
func startParsing(ctx *parser.Context) error {
// Get the config
cfg := ctx.Config.Parser
prometheus.StartHeight.Add(float64(cfg.StartHeight))

// Start the prometheus monitoring
if ctx.Prometheus != nil {
ctx.Prometheus.Start()
}

// Start periodic operations
scheduler := gocron.NewScheduler(time.UTC)
Expand Down Expand Up @@ -123,6 +130,7 @@ func enqueueMissingBlocks(exportQueue types.HeightQueue, ctx *parser.Context) {
lastDBBlockHeight, err := ctx.Database.GetLastBlockHeight()
if err != nil {
ctx.Logger.Error("failed to get last block height from database", "error", err)
prometheus.SignalDBOperationError()
}

// Get the start height, default to the config's height
Expand Down Expand Up @@ -195,6 +203,8 @@ func mustGetLatestHeight(ctx *parser.Context) int64 {
}

ctx.Logger.Error("failed to get last block from rpc client", "err", err, "retry count", retryCount)
prometheus.SignalRPCRequestError()

time.Sleep(ctx.Config.GetAvgBlockTime() * time.Duration(retryCount))
}

Expand All @@ -215,5 +225,8 @@ func trapSignal(ctx *parser.Context) {
defer ctx.Node.Stop()
defer ctx.Database.Close()
defer waitGroup.Done()
if ctx.Prometheus != nil {
defer ctx.Prometheus.Stop()
}
}()
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.4
github.com/golangci/golangci-lint v1.52.2
github.com/gorilla/mux v1.8.1
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/jmoiron/sqlx v1.3.5
github.com/lib/pq v1.10.9
github.com/prometheus/client_golang v1.19.0
github.com/rs/zerolog v1.32.0
github.com/spf13/cobra v1.8.0
github.com/spf13/viper v1.18.2
Expand Down Expand Up @@ -167,7 +169,6 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/polyfloyd/go-errorlint v1.4.5 // indirect
github.com/prometheus/client_golang v1.19.0 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.52.2 // indirect
github.com/prometheus/procfs v0.13.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,8 @@ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gordonklaus/ineffassign v0.0.0-20230107090616-13ace0543b28 h1:9alfqbrhuD+9fLZ4iaAVwhlp5PEhmnBt7yvK2Oy5C1U=
github.com/gordonklaus/ineffassign v0.0.0-20230107090616-13ace0543b28/go.mod h1:Qcp2HIAYhR7mNUVSIxZww3Guk4it82ghYcEXIAk+QT0=
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gostaticanalysis/analysisutil v0.7.1 h1:ZMCjoue3DtDWQ5WyU16YbjbQEQ3VuzwxALrpYd+HeKk=
Expand Down
2 changes: 2 additions & 0 deletions modules/registrar/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/forbole/juno/v5/logging"
"github.com/forbole/juno/v5/modules"
"github.com/forbole/juno/v5/modules/pruning"
"github.com/forbole/juno/v5/modules/telemetry"
"github.com/forbole/juno/v5/node"
"github.com/forbole/juno/v5/types"
"github.com/forbole/juno/v5/types/config"
Expand Down Expand Up @@ -78,6 +79,7 @@ func NewDefaultRegistrar() *DefaultRegistrar {
func (r *DefaultRegistrar) BuildModules(ctx Context) modules.Modules {
return modules.Modules{
pruning.NewModule(ctx.JunoConfig, ctx.Database, ctx.Logger),
telemetry.NewModule(ctx.JunoConfig),
}
}

Expand Down
25 changes: 25 additions & 0 deletions modules/telemetry/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package telemetry

import "gopkg.in/yaml.v3"

// Config represents the configuration for the telemetry module
type Config struct {
Port uint `yaml:"port"`
}

// NewConfig allows to build a new Config instance
func NewConfig(port uint) *Config {
return &Config{
Port: port,
}
}

// ParseConfig allows to parse a byte array as a Config instance
func ParseConfig(bytes []byte) (*Config, error) {
type T struct {
Telemetry *Config `yaml:"telemetry"`
}
var cfg T
err := yaml.Unmarshal(bytes, &cfg)
return cfg.Telemetry, err
}
50 changes: 50 additions & 0 deletions modules/telemetry/handle_additional_operations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package telemetry

import (
"fmt"
"net/http"
"time"

"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

// RunAdditionalOperations runs the module additional operations
func RunAdditionalOperations(cfg *Config) error {
err := checkConfig(cfg)
if err != nil {
return err
}

go startPrometheus(cfg)

return nil
}

// checkConfig checks if the given config is valid
func checkConfig(cfg *Config) error {
if cfg == nil {
return fmt.Errorf("no telemetry config found")
}

return nil
}

// startPrometheus starts a Prometheus server using the given configuration
func startPrometheus(cfg *Config) {
router := mux.NewRouter()
router.Handle("/metrics", promhttp.Handler())

// Create a new server
server := http.Server{
Addr: fmt.Sprintf(":%d", cfg.Port),
Handler: router,
ReadTimeout: 5 * time.Second,
WriteTimeout: 10 * time.Second,
}

err := server.ListenAndServe()
if err != nil {
panic(err)
}
}
47 changes: 47 additions & 0 deletions modules/telemetry/module.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package telemetry

import (
"github.com/forbole/juno/v5/modules"
"github.com/forbole/juno/v5/types/config"
)

const (
ModuleName = "telemetry"
)

var (
_ modules.Module = &Module{}
_ modules.AdditionalOperationsModule = &Module{}
)

// Module represents the telemetry module
type Module struct {
cfg *Config
}

// NewModule returns a new Module implementation
func NewModule(cfg config.Config) *Module {
bz, err := cfg.GetBytes()
if err != nil {
panic(err)
}

telemetryCfg, err := ParseConfig(bz)
if err != nil {
panic(err)
}

return &Module{
cfg: telemetryCfg,
}
}

// Name implements modules.Module
func (m *Module) Name() string {
return ModuleName
}

// RunAdditionalOperations implements modules.AdditionalOperationsModule
func (m *Module) RunAdditionalOperations() error {
return RunAdditionalOperations(m.cfg)
}
3 changes: 3 additions & 0 deletions parser/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/forbole/juno/v5/logging"
"github.com/forbole/juno/v5/modules"
"github.com/forbole/juno/v5/node"
"github.com/forbole/juno/v5/prometheus"
"github.com/forbole/juno/v5/types"
"github.com/forbole/juno/v5/types/config"
)
Expand All @@ -17,6 +18,7 @@ type Context struct {
Database database.Database
Logger logging.Logger
Modules []modules.Module
Prometheus *prometheus.Server
}

// NewContext builds a new Context instance
Expand All @@ -35,5 +37,6 @@ func NewContext(
Database: db,
Modules: modules,
Logger: logger,
Prometheus: prometheus.NewServer(config.Monitoring),
}
}
8 changes: 8 additions & 0 deletions parser/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

"github.com/forbole/juno/v5/prometheus"
"github.com/forbole/juno/v5/utils"

"github.com/forbole/juno/v5/database"
Expand Down Expand Up @@ -62,6 +63,8 @@ func (w Worker) shouldReEnqueueWhenFailed() bool {
// Start starts a worker by listening for new jobs (block heights) from the
// given worker queue. Any failed job is logged and re-enqueued.
func (w Worker) Start() {
prometheus.WorkersCount.Inc()

for i := range w.queue {
// Make sure we did not reach the max retries yet
if i.HasReachedMaxRetries(w.cfg.Parser.GetMaxRetries()) {
Expand All @@ -73,6 +76,9 @@ func (w Worker) Start() {
err := w.ProcessIfNotExists(i.Height)
if err != nil {
go func() {
// Signal that an error occurred while processing this block
prometheus.SignalBlockError(i.Height)

// Build the block with the updated retry count and log the error
newBlock := i.IncrementRetryCount(err)
w.logger.Debug("re-enqueuing failed block", "height", i.Height, "err", err, "count", newBlock.RetryCount)
Expand All @@ -82,6 +88,8 @@ func (w Worker) Start() {
w.queue <- newBlock
}()
}

prometheus.LatestIndexedHeightByWorker.WithLabelValues(fmt.Sprintf("%d", w.index)).Set(float64(i.Height))
}
}

Expand Down
Loading

0 comments on commit 44fe002

Please sign in to comment.