Skip to content

Commit

Permalink
Add payload size metric
Browse files Browse the repository at this point in the history
  • Loading branch information
lmsilva-wls committed Feb 6, 2024
1 parent 198a339 commit a3166ce
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 11 deletions.
19 changes: 15 additions & 4 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package app
import (
"context"
"fmt"
"github.com/golang/protobuf/proto"
"github.com/topfreegames/eventsgateway/v4/forwarder"
"go.opentelemetry.io/otel/propagation"
"net"
Expand Down Expand Up @@ -159,21 +160,31 @@ func (a *App) metricsReporterInterceptor(
) (interface{}, error) {
events := []*pb.Event{}
retry := "0"
payloadSize := 0
switch t := req.(type) {
case *pb.Event:
events = append(events, req.(*pb.Event))
event := req.(*pb.Event)
events = append(events, event)
payloadSize = proto.Size(event)
case *pb.SendEventsRequest:
events = append(events, req.(*pb.SendEventsRequest).Events...)
retry = fmt.Sprintf("%d", req.(*pb.SendEventsRequest).Retry)
request := req.(*pb.SendEventsRequest)
events = append(events, request.Events...)
retry = fmt.Sprintf("%d", request.Retry)
payloadSize = proto.Size(request)
default:
a.log.WithField("route", info.FullMethod).Infof("Unexpected request type %T", t)
}

topic := events[0].Topic
l := a.log.
WithField("route", info.FullMethod).
WithField("topic", topic)

metrics.APIPayloadSize.WithLabelValues(
info.FullMethod,
topic,
).Observe(float64(payloadSize))

defer func(startTime time.Time) {
elapsedTime := float64(time.Since(startTime).Nanoseconds() / (1000 * 1000))
for _, e := range events {
Expand Down
9 changes: 5 additions & 4 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
package cmd

import (
"fmt"
"log"

"github.com/mmcloughlin/professor"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -60,7 +60,7 @@ var startCmd = &cobra.Command{
}

func launchPProf() {
fmt.Println("Starting PProf HTTP server")
log.Println("Starting PProf HTTP server")
config.SetDefault("pprof.enabled", true)
config.SetDefault("pprof.address", "localhost:6060")
if config.GetBool("pprof.enabled") {
Expand All @@ -69,9 +69,10 @@ func launchPProf() {
}

func launchMetricsServer() {
fmt.Println("Starting Metrics HTTP server")
config.SetDefault("prometheus.port", ":9091")
metrics.StartServer(config.GetString("prometheus.port"))
httpPort := config.GetString("prometheus.port")
log.Printf("Starting Metrics HTTP server at %s\n", httpPort)
metrics.StartServer(httpPort)
}

func init() {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.17
require (
github.com/Shopify/sarama v1.35.0
github.com/golang/mock v1.4.4
github.com/golang/protobuf v1.5.2
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
Expand Down Expand Up @@ -39,7 +40,6 @@ require (
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
Expand Down
28 changes: 26 additions & 2 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,23 @@ var (
Subsystem: "api",
Name: "response_time_ms",
Help: "the response time in ms of api routes",
Buckets: []float64{1, 5, 10, 30, 90, 160, 240},
Buckets: defaultLatencyBuckets(),
},
[]string{"route", "topic", "retry"},
)

// APIPayloadSize summary
APIPayloadSize = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "eventsgateway",
Subsystem: "api",
Name: "payload_size",
Help: "payload size of API routes, in bytes",
Buckets: defaultPayloadSizeBuckets(),
},
[]string{"route", "topic"},
)

// APIRequestsSuccessCounter counter
APIRequestsSuccessCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand Down Expand Up @@ -76,7 +88,7 @@ var (
Subsystem: "client",
Name: "response_time_ms",
Help: "the response time in ms of calls to server",
Buckets: []float64{1, 3, 5, 10, 25, 50, 100, 150, 200, 250, 300},
Buckets: defaultLatencyBuckets(),
},
[]string{"route", "topic", "retry"},
)
Expand Down Expand Up @@ -122,12 +134,23 @@ var (
)
)

func defaultLatencyBuckets() []float64 {
// in milliseconds
return []float64{1, 3, 5, 10, 25, 50, 100, 200, 300, 500, 1000}
}

func defaultPayloadSizeBuckets() []float64 {
// in bytes
return []float64{100, 1000, 5000, 10000, 50000, 100000, 500000, 1000000, 5000000}
}

// StartServer runs a metrics server inside a goroutine
// that reports default application metrics in prometheus format.
// Any errors that may occur will stop the server add log.Fatal the error.
func StartServer(port string) {
prometheus.MustRegister(
APIResponseTime,
APIPayloadSize,
APIRequestsFailureCounter,
APIRequestsSuccessCounter,
APITopicsSubmission,
Expand All @@ -139,6 +162,7 @@ func StartServer(port string) {
go func() {
envEnabled, _ := os.LookupEnv("EVENTSGATEWAY_PROMETHEUS_ENABLED")
if envEnabled != "true" {
log.Warn("Prometheus web server disabled")
return
}

Expand Down

0 comments on commit a3166ce

Please sign in to comment.