Skip to content

Commit

Permalink
Add payload size metric (#27)
Browse files Browse the repository at this point in the history
* Add payload size metric

* Decrease bucket sizes

* Add configuration overrides for buckets on histogram metrics

* Add client side metrics to the default registry

With this, every application must expose their own prometheus HTTP handler for the
metrics endpoint; after that is done, the EG client metrics will be available on the same
registry.

* Add docker compose setup for prometheus and grafana
  • Loading branch information
lmsilva-wls authored Feb 15, 2024
1 parent 1b7881d commit 131ffb3
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 50 deletions.
22 changes: 18 additions & 4 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package app
import (
"context"
"fmt"
"github.com/golang/protobuf/proto"
"github.com/topfreegames/eventsgateway/v4/forwarder"
"go.opentelemetry.io/otel/propagation"
"net"
Expand Down Expand Up @@ -95,6 +96,8 @@ func (a *App) loadConfigurationDefaults() {
a.config.SetDefault("server.maxConnectionAgeGrace", "5s")
a.config.SetDefault("server.Time", "10s")
a.config.SetDefault("server.Timeout", "500ms")
a.config.SetDefault("prometheus.enabled", "true") // always true on the API side
a.config.SetDefault("prometheus.port", ":9091")
}

func (a *App) configure() error {
Expand Down Expand Up @@ -159,21 +162,31 @@ func (a *App) metricsReporterInterceptor(
) (interface{}, error) {
events := []*pb.Event{}
retry := "0"
payloadSize := 0
switch t := req.(type) {
case *pb.Event:
events = append(events, req.(*pb.Event))
event := req.(*pb.Event)
events = append(events, event)
payloadSize = proto.Size(event)
case *pb.SendEventsRequest:
events = append(events, req.(*pb.SendEventsRequest).Events...)
retry = fmt.Sprintf("%d", req.(*pb.SendEventsRequest).Retry)
request := req.(*pb.SendEventsRequest)
events = append(events, request.Events...)
retry = fmt.Sprintf("%d", request.Retry)
payloadSize = proto.Size(request)
default:
a.log.WithField("route", info.FullMethod).Infof("Unexpected request type %T", t)
}

topic := events[0].Topic
l := a.log.
WithField("route", info.FullMethod).
WithField("topic", topic)

metrics.APIPayloadSize.WithLabelValues(
info.FullMethod,
topic,
).Observe(float64(payloadSize))

defer func(startTime time.Time) {
elapsedTime := float64(time.Since(startTime).Nanoseconds() / (1000 * 1000))
for _, e := range events {
Expand Down Expand Up @@ -234,6 +247,7 @@ func (a *App) Run() {
}
log.Infof("events gateway listening on %s:%d", a.host, a.port)

metrics.StartServer(a.config)
var opts []grpc.ServerOption

otelPropagator := otelgrpc.WithPropagators(otel.GetTextMapPropagator())
Expand Down
30 changes: 30 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ package client
import (
"context"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/topfreegames/eventsgateway/v4/metrics"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -94,12 +96,40 @@ func (c *Client) newGRPCClient(
"serverAddress": c.serverAddress,
"async": async,
})

c.registerMetrics(configPrefix)
if async {
return newGRPCClientAsync(configPrefix, c.config, c.logger, c.serverAddress, client, opts...)
}
return newGRPCClientSync(configPrefix, c.config, c.logger, c.serverAddress, client, opts...)
}

func (c *Client) registerMetrics(configPrefix string) {
latencyBucketsConf := fmt.Sprintf("%sclient.prometheus.buckets.latency", configPrefix)
c.config.SetDefault(latencyBucketsConf, []float64{3, 5, 10, 50, 100, 300, 500, 1000, 5000})
latencyBuckets := c.config.Get(latencyBucketsConf).([]float64)

metrics.ClientRequestsResponseTime = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "eventsgateway",
Subsystem: "client",
Name: "response_time_ms",
Help: "the response time in ms of calls to server",
Buckets: latencyBuckets,
},
[]string{"route", "topic", "retry"},
)

collectors := []prometheus.Collector{
metrics.ClientRequestsResponseTime,
metrics.ClientRequestsSuccessCounter,
metrics.ClientRequestsFailureCounter,
metrics.ClientRequestsDroppedCounter,
}
err := metrics.RegisterMetrics(collectors)
c.logger.WithError(err).Error("failed to register metric")
}

// Send sends an event to another server via grpc using the client's configured topic
func (c *Client) Send(
ctx context.Context,
Expand Down
12 changes: 2 additions & 10 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@
package cmd

import (
"fmt"
"log"

"github.com/mmcloughlin/professor"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/topfreegames/eventsgateway/v4/app"
logruswrapper "github.com/topfreegames/eventsgateway/v4/logger/logrus"
"github.com/topfreegames/eventsgateway/v4/metrics"
)

var host string
Expand All @@ -54,26 +53,19 @@ var startCmd = &cobra.Command{
log.Panic(err)
}
launchPProf()
launchMetricsServer()
a.Run()
},
}

func launchPProf() {
fmt.Println("Starting PProf HTTP server")
log.Println("Starting PProf HTTP server")
config.SetDefault("pprof.enabled", true)
config.SetDefault("pprof.address", "localhost:6060")
if config.GetBool("pprof.enabled") {
professor.Launch(config.GetString("pprof.address"))
}
}

func launchMetricsServer() {
fmt.Println("Starting Metrics HTTP server")
config.SetDefault("prometheus.port", ":9091")
metrics.StartServer(config.GetString("prometheus.port"))
}

func init() {
startCmd.Flags().StringVarP(&host, "host", "b", "0.0.0.0", "the address of the interface to bind")
startCmd.Flags().IntVarP(&port, "port", "p", 5000, "the port to bind")
Expand Down
35 changes: 35 additions & 0 deletions docker/docker-compose-metrics.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
version: '3.8'

volumes:
prometheus_data: {}
grafana_data: {}

services:
prometheus:
image: prom/prometheus
restart: always
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/usr/share/prometheus/console_libraries'
- '--web.console.templates=/usr/share/prometheus/consoles'
ports:
- "9090:9090"

grafana:
image: grafana/grafana
restart: always
environment:
GF_INSTALL_PLUGINS: 'grafana-clock-panel,grafana-simple-json-datasource'
GF_SECURITY_ADMIN_USER: 'admin'
GF_SECURITY_ADMIN_PASSWORD: 'foobar'
GF_USERS_ALLOW_SIGN_UP: 'false'
volumes:
- grafana_data:/var/lib/grafana
ports:
- "3000:3000"
depends_on:
- prometheus
11 changes: 11 additions & 0 deletions docker/prometheus.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
scrape_configs:
- job_name: app
scrape_interval: 5s
static_configs:
- targets: ['host.docker.internal:9091']

# prometheus metrics
# - job_name: 'prometheus'
# scrape_interval: 5s
# static_configs:
# - targets: ['localhost:9090']
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.17
require (
github.com/Shopify/sarama v1.35.0
github.com/golang/mock v1.4.4
github.com/golang/protobuf v1.5.2
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
Expand Down Expand Up @@ -39,7 +40,6 @@ require (
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
Expand Down
111 changes: 78 additions & 33 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
package metrics

import (
"errors"
"github.com/spf13/viper"
"net/http"
"os"
"time"

log "github.com/sirupsen/logrus"
Expand All @@ -35,17 +36,11 @@ import (
)

var (
// APIResponseTime summary
APIResponseTime = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "eventsgateway",
Subsystem: "api",
Name: "response_time_ms",
Help: "the response time in ms of api routes",
Buckets: []float64{1, 5, 10, 30, 90, 160, 240},
},
[]string{"route", "topic", "retry"},
)
// APIResponseTime summary, observes the API response time as perceived by the server
APIResponseTime *prometheus.HistogramVec

// APIPayloadSize summary, observes the payload size of requests arriving at the server
APIPayloadSize *prometheus.HistogramVec

// APIRequestsSuccessCounter counter
APIRequestsSuccessCounter = prometheus.NewCounterVec(
Expand All @@ -69,17 +64,8 @@ var (
[]string{"route", "topic", "retry", "reason"},
)

// ClientRequestsResponseTime is the time the client take to talk to the server
ClientRequestsResponseTime = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "eventsgateway",
Subsystem: "client",
Name: "response_time_ms",
Help: "the response time in ms of calls to server",
Buckets: []float64{1, 3, 5, 10, 25, 50, 100, 150, 200, 250, 300},
},
[]string{"route", "topic", "retry"},
)
// ClientRequestsResponseTime summary, observes the API response time as perceived by the client
ClientRequestsResponseTime *prometheus.HistogramVec

// ClientRequestsSuccessCounter is the count of successfull calls to the server
ClientRequestsSuccessCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Expand Down Expand Up @@ -122,31 +108,90 @@ var (
)
)

func defaultLatencyBuckets(config *viper.Viper) []float64 {
// in milliseconds
const configKey = "prometheus.buckets.latency"
config.SetDefault(configKey, []float64{3, 5, 10, 50, 100, 300, 500, 1000, 5000})

return config.Get(configKey).([]float64)
}

func defaultPayloadSizeBuckets(config *viper.Viper) []float64 {
// in bytes
configKey := "prometheus.buckets.payloadSize"
config.SetDefault(configKey, []float64{100, 1000, 5000, 10000, 50000, 100000, 500000, 1000000, 5000000})

return config.Get(configKey).([]float64)
}

// RegisterMetrics is a wrapper to handle prometheus.AlreadyRegisteredError;
// it only returns an error if the metric wasn't already registered and there was an
// actual error registering it.
func RegisterMetrics(collectors []prometheus.Collector) error {
for _, collector := range collectors {
err := prometheus.Register(collector)
if err != nil {
var alreadyRegisteredError prometheus.AlreadyRegisteredError
if !errors.As(err, &alreadyRegisteredError) {
return err
}
}
}

return nil
}

// StartServer runs a metrics server inside a goroutine
// that reports default application metrics in prometheus format.
// Any errors that may occur will stop the server add log.Fatal the error.
func StartServer(port string) {
prometheus.MustRegister(
// Any errors that may occur will stop the server and log.Fatal the error.
func StartServer(config *viper.Viper) {
APIPayloadSize = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "eventsgateway",
Subsystem: "api",
Name: "payload_size",
Help: "payload size of API routes, in bytes",
Buckets: defaultPayloadSizeBuckets(config),
},
[]string{"route", "topic"},
)

APIResponseTime = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "eventsgateway",
Subsystem: "api",
Name: "response_time_ms",
Help: "the response time in ms of api routes",
Buckets: defaultLatencyBuckets(config),
},
[]string{"route", "topic", "retry"},
)

collectors := []prometheus.Collector{
APIResponseTime,
APIPayloadSize,
APIRequestsFailureCounter,
APIRequestsSuccessCounter,
APITopicsSubmission,
ClientRequestsResponseTime,
ClientRequestsSuccessCounter,
ClientRequestsFailureCounter,
ClientRequestsDroppedCounter,
)
}

err := RegisterMetrics(collectors)
if err != nil {
log.Fatal(err)
}

go func() {
envEnabled, _ := os.LookupEnv("EVENTSGATEWAY_PROMETHEUS_ENABLED")
envEnabled := config.GetString("prometheus.enabled")
if envEnabled != "true" {
log.Warn("Prometheus web server disabled")
return
}

r := mux.NewRouter()
r.Handle("/metrics", promhttp.Handler())

s := &http.Server{
Addr: port,
Addr: config.GetString("prometheus.port"),
ReadTimeout: 8 * time.Second,
WriteTimeout: 8 * time.Second,
MaxHeaderBytes: 1 << 20,
Expand Down
5 changes: 3 additions & 2 deletions testing/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@
package testing

import (
"strings"

"github.com/spf13/viper"
"strings"
)

// GetDefaultConfig returns the configuration at ./config/test.yaml
Expand All @@ -30,5 +29,7 @@ func GetDefaultConfig() (*viper.Viper, error) {
return nil, err
}

cfg.Set("prometheus.enabled", "false")

return cfg, nil
}

0 comments on commit 131ffb3

Please sign in to comment.