Skip to content

Commit

Permalink
Add payload size metric
Browse files Browse the repository at this point in the history
  • Loading branch information
lmsilva-wls committed Feb 5, 2024
1 parent 198a339 commit 6c77f5c
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 9 deletions.
17 changes: 14 additions & 3 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package app

import (
"context"
"encoding/json"
"fmt"
"github.com/topfreegames/eventsgateway/v4/forwarder"
"go.opentelemetry.io/otel/propagation"
Expand Down Expand Up @@ -163,17 +164,27 @@ func (a *App) metricsReporterInterceptor(
case *pb.Event:
events = append(events, req.(*pb.Event))
case *pb.SendEventsRequest:
events = append(events, req.(*pb.SendEventsRequest).Events...)
retry = fmt.Sprintf("%d", req.(*pb.SendEventsRequest).Retry)
request := req.(*pb.SendEventsRequest)
events = append(events, request.Events...)
retry = fmt.Sprintf("%d", request.Retry)
default:
a.log.WithField("route", info.FullMethod).Infof("Unexpected request type %T", t)
}

topic := events[0].Topic
l := a.log.
WithField("route", info.FullMethod).
WithField("topic", topic)

// using JSON to calculate the payload size introduces a small, invariant/constant error related when compared to
// the gRPC payload, but should be enough for observability
marshalledEvents, _ := json.Marshal(events)
payloadSize := len(marshalledEvents)
metrics.APIPayloadSize.WithLabelValues(
info.FullMethod,
topic,
).Observe(float64(payloadSize))

defer func(startTime time.Time) {
elapsedTime := float64(time.Since(startTime).Nanoseconds() / (1000 * 1000))
for _, e := range events {
Expand Down
9 changes: 5 additions & 4 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
package cmd

import (
"fmt"
"log"

"github.com/mmcloughlin/professor"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -60,7 +60,7 @@ var startCmd = &cobra.Command{
}

func launchPProf() {
fmt.Println("Starting PProf HTTP server")
log.Println("Starting PProf HTTP server")
config.SetDefault("pprof.enabled", true)
config.SetDefault("pprof.address", "localhost:6060")
if config.GetBool("pprof.enabled") {
Expand All @@ -69,9 +69,10 @@ func launchPProf() {
}

func launchMetricsServer() {
fmt.Println("Starting Metrics HTTP server")
config.SetDefault("prometheus.port", ":9091")
metrics.StartServer(config.GetString("prometheus.port"))
httpPort := config.GetString("prometheus.port")
log.Printf("Starting Metrics HTTP server at %s\n", httpPort)
metrics.StartServer(httpPort)
}

func init() {
Expand Down
28 changes: 26 additions & 2 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,23 @@ var (
Subsystem: "api",
Name: "response_time_ms",
Help: "the response time in ms of api routes",
Buckets: []float64{1, 5, 10, 30, 90, 160, 240},
Buckets: defaultLatencyBuckets(),
},
[]string{"route", "topic", "retry"},
)

// APIPayloadSize summary
APIPayloadSize = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "eventsgateway",
Subsystem: "api",
Name: "payload_size",
Help: "payload size of API routes, in bytes",
Buckets: defaultPayloadSizeBuckets(),
},
[]string{"route", "topic"},
)

// APIRequestsSuccessCounter counter
APIRequestsSuccessCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand Down Expand Up @@ -76,7 +88,7 @@ var (
Subsystem: "client",
Name: "response_time_ms",
Help: "the response time in ms of calls to server",
Buckets: []float64{1, 3, 5, 10, 25, 50, 100, 150, 200, 250, 300},
Buckets: defaultLatencyBuckets(),
},
[]string{"route", "topic", "retry"},
)
Expand Down Expand Up @@ -122,12 +134,23 @@ var (
)
)

func defaultLatencyBuckets() []float64 {
// in milliseconds
return []float64{1, 3, 5, 10, 25, 50, 100, 200, 300, 500, 1000}
}

func defaultPayloadSizeBuckets() []float64 {
// in bytes
return []float64{100, 1000, 5000, 10000, 50000, 100000, 500000, 1000000, 5000000}
}

// StartServer runs a metrics server inside a goroutine
// that reports default application metrics in prometheus format.
// Any errors that may occur will stop the server add log.Fatal the error.
func StartServer(port string) {
prometheus.MustRegister(
APIResponseTime,
APIPayloadSize,
APIRequestsFailureCounter,
APIRequestsSuccessCounter,
APITopicsSubmission,
Expand All @@ -139,6 +162,7 @@ func StartServer(port string) {
go func() {
envEnabled, _ := os.LookupEnv("EVENTSGATEWAY_PROMETHEUS_ENABLED")
if envEnabled != "true" {
log.Warn("Prometheus web server disabled")
return
}

Expand Down

0 comments on commit 6c77f5c

Please sign in to comment.