Skip to content

Commit

Permalink
Add client side metrics to the default registry
Browse files Browse the repository at this point in the history
With this, every application must expose their own prometheus HTTP handler for the
metrics endpoint; after that is done, the EG client metrics will be available on the same
registry.
  • Loading branch information
lmsilva-wls committed Feb 7, 2024
1 parent c2c67e7 commit 24c0d94
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 14 deletions.
3 changes: 3 additions & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ func (a *App) loadConfigurationDefaults() {
a.config.SetDefault("server.maxConnectionAgeGrace", "5s")
a.config.SetDefault("server.Time", "10s")
a.config.SetDefault("server.Timeout", "500ms")
a.config.SetDefault("prometheus.enabled", "true") // always true on the API side
a.config.SetDefault("prometheus.port", ":9091")
}

func (a *App) configure() error {
Expand Down Expand Up @@ -245,6 +247,7 @@ func (a *App) Run() {
}
log.Infof("events gateway listening on %s:%d", a.host, a.port)

metrics.StartServer(a.config)
var opts []grpc.ServerOption

otelPropagator := otelgrpc.WithPropagators(otel.GetTextMapPropagator())
Expand Down
30 changes: 30 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ package client
import (
"context"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/topfreegames/eventsgateway/v4/metrics"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -94,12 +96,40 @@ func (c *Client) newGRPCClient(
"serverAddress": c.serverAddress,
"async": async,
})

c.registerMetrics(configPrefix)
if async {
return newGRPCClientAsync(configPrefix, c.config, c.logger, c.serverAddress, client, opts...)
}
return newGRPCClientSync(configPrefix, c.config, c.logger, c.serverAddress, client, opts...)
}

func (c *Client) registerMetrics(configPrefix string) {
latencyBucketsConf := fmt.Sprintf("%sclient.prometheus.buckets.latency", configPrefix)
c.config.SetDefault(latencyBucketsConf, []float64{3, 5, 10, 50, 100, 300, 500, 1000, 5000})
latencyBuckets := c.config.Get(latencyBucketsConf).([]float64)

metrics.ClientRequestsResponseTime = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "eventsgateway",
Subsystem: "client",
Name: "response_time_ms",
Help: "the response time in ms of calls to server",
Buckets: latencyBuckets,
},
[]string{"route", "topic", "retry"},
)

collectors := []prometheus.Collector{
metrics.ClientRequestsResponseTime,
metrics.ClientRequestsSuccessCounter,
metrics.ClientRequestsFailureCounter,
metrics.ClientRequestsDroppedCounter,
}
err := metrics.RegisterMetrics(collectors)
c.logger.WithError(err).Error("failed to register metric")
}

// Send sends an event to another server via grpc using the client's configured topic
func (c *Client) Send(
ctx context.Context,
Expand Down
9 changes: 0 additions & 9 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/spf13/cobra"
"github.com/topfreegames/eventsgateway/v4/app"
logruswrapper "github.com/topfreegames/eventsgateway/v4/logger/logrus"
"github.com/topfreegames/eventsgateway/v4/metrics"
)

var host string
Expand All @@ -54,7 +53,6 @@ var startCmd = &cobra.Command{
log.Panic(err)
}
launchPProf()
launchMetricsServer()
a.Run()
},
}
Expand All @@ -68,13 +66,6 @@ func launchPProf() {
}
}

func launchMetricsServer() {
config.SetDefault("prometheus.enabled", "true") // always true on the API side
config.SetDefault("prometheus.port", ":9091")

metrics.StartServer(config)
}

func init() {
startCmd.Flags().StringVarP(&host, "host", "b", "0.0.0.0", "the address of the interface to bind")
startCmd.Flags().IntVarP(&port, "port", "p", 5000, "the port to bind")
Expand Down
30 changes: 27 additions & 3 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
package metrics

import (
"errors"
"github.com/spf13/viper"
"net/http"
"time"
Expand Down Expand Up @@ -123,9 +124,26 @@ func defaultPayloadSizeBuckets(config *viper.Viper) []float64 {
return config.Get(configKey).([]float64)
}

// RegisterMetrics is a wrapper to handle prometheus.AlreadyRegisteredError;
// it only returns an error if the metric wasn't already registered and there was an
// actual error registering it.
func RegisterMetrics(collectors []prometheus.Collector) error {
for _, collector := range collectors {
err := prometheus.Register(collector)
if err != nil {
var alreadyRegisteredError prometheus.AlreadyRegisteredError
if !errors.As(err, &alreadyRegisteredError) {
return err
}
}
}

return nil
}

// StartServer runs a metrics server inside a goroutine
// that reports default application metrics in prometheus format.
// Any errors that may occur will stop the server add log.Fatal the error.
// Any errors that may occur will stop the server and log.Fatal the error.
func StartServer(config *viper.Viper) {
APIPayloadSize = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand All @@ -149,13 +167,19 @@ func StartServer(config *viper.Viper) {
[]string{"route", "topic", "retry"},
)

prometheus.MustRegister(
collectors := []prometheus.Collector{
APIResponseTime,
APIPayloadSize,
APIRequestsFailureCounter,
APIRequestsSuccessCounter,
APITopicsSubmission,
)
}

err := RegisterMetrics(collectors)
if err != nil {
log.Fatal(err)
}

go func() {
envEnabled := config.GetString("prometheus.enabled")
if envEnabled != "true" {
Expand Down
5 changes: 3 additions & 2 deletions testing/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@
package testing

import (
"strings"

"github.com/spf13/viper"
"strings"
)

// GetDefaultConfig returns the configuration at ./config/test.yaml
Expand All @@ -30,5 +29,7 @@ func GetDefaultConfig() (*viper.Viper, error) {
return nil, err
}

cfg.Set("prometheus.enabled", "false")

return cfg, nil
}

0 comments on commit 24c0d94

Please sign in to comment.