diff --git a/app/app.go b/app/app.go index 7903539..d1532af 100644 --- a/app/app.go +++ b/app/app.go @@ -25,6 +25,7 @@ package app import ( "context" "fmt" + "github.com/golang/protobuf/proto" "github.com/topfreegames/eventsgateway/v4/forwarder" "go.opentelemetry.io/otel/propagation" "net" @@ -95,6 +96,8 @@ func (a *App) loadConfigurationDefaults() { a.config.SetDefault("server.maxConnectionAgeGrace", "5s") a.config.SetDefault("server.Time", "10s") a.config.SetDefault("server.Timeout", "500ms") + a.config.SetDefault("prometheus.enabled", "true") // always true on the API side + a.config.SetDefault("prometheus.port", ":9091") } func (a *App) configure() error { @@ -159,21 +162,31 @@ func (a *App) metricsReporterInterceptor( ) (interface{}, error) { events := []*pb.Event{} retry := "0" + payloadSize := 0 switch t := req.(type) { case *pb.Event: - events = append(events, req.(*pb.Event)) + event := req.(*pb.Event) + events = append(events, event) + payloadSize = proto.Size(event) case *pb.SendEventsRequest: - events = append(events, req.(*pb.SendEventsRequest).Events...) - retry = fmt.Sprintf("%d", req.(*pb.SendEventsRequest).Retry) + request := req.(*pb.SendEventsRequest) + events = append(events, request.Events...) + retry = fmt.Sprintf("%d", request.Retry) + payloadSize = proto.Size(request) default: a.log.WithField("route", info.FullMethod).Infof("Unexpected request type %T", t) } - + topic := events[0].Topic l := a.log. WithField("route", info.FullMethod). WithField("topic", topic) + metrics.APIPayloadSize.WithLabelValues( + info.FullMethod, + topic, + ).Observe(float64(payloadSize)) + defer func(startTime time.Time) { elapsedTime := float64(time.Since(startTime).Nanoseconds() / (1000 * 1000)) for _, e := range events { @@ -234,6 +247,7 @@ func (a *App) Run() { } log.Infof("events gateway listening on %s:%d", a.host, a.port) + metrics.StartServer(a.config) var opts []grpc.ServerOption otelPropagator := otelgrpc.WithPropagators(otel.GetTextMapPropagator()) diff --git a/client/client.go b/client/client.go index bef1f81..e09105a 100644 --- a/client/client.go +++ b/client/client.go @@ -10,6 +10,8 @@ package client import ( "context" "fmt" + "github.com/prometheus/client_golang/prometheus" + "github.com/topfreegames/eventsgateway/v4/metrics" "strings" "sync" "time" @@ -94,12 +96,40 @@ func (c *Client) newGRPCClient( "serverAddress": c.serverAddress, "async": async, }) + + c.registerMetrics(configPrefix) if async { return newGRPCClientAsync(configPrefix, c.config, c.logger, c.serverAddress, client, opts...) } return newGRPCClientSync(configPrefix, c.config, c.logger, c.serverAddress, client, opts...) } +func (c *Client) registerMetrics(configPrefix string) { + latencyBucketsConf := fmt.Sprintf("%sclient.prometheus.buckets.latency", configPrefix) + c.config.SetDefault(latencyBucketsConf, []float64{3, 5, 10, 50, 100, 300, 500, 1000, 5000}) + latencyBuckets := c.config.Get(latencyBucketsConf).([]float64) + + metrics.ClientRequestsResponseTime = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "eventsgateway", + Subsystem: "client", + Name: "response_time_ms", + Help: "the response time in ms of calls to server", + Buckets: latencyBuckets, + }, + []string{"route", "topic", "retry"}, + ) + + collectors := []prometheus.Collector{ + metrics.ClientRequestsResponseTime, + metrics.ClientRequestsSuccessCounter, + metrics.ClientRequestsFailureCounter, + metrics.ClientRequestsDroppedCounter, + } + err := metrics.RegisterMetrics(collectors) + c.logger.WithError(err).Error("failed to register metric") +} + // Send sends an event to another server via grpc using the client's configured topic func (c *Client) Send( ctx context.Context, diff --git a/cmd/start.go b/cmd/start.go index ab7872b..786ce6e 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -23,14 +23,13 @@ package cmd import ( - "fmt" + "log" "github.com/mmcloughlin/professor" "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/topfreegames/eventsgateway/v4/app" logruswrapper "github.com/topfreegames/eventsgateway/v4/logger/logrus" - "github.com/topfreegames/eventsgateway/v4/metrics" ) var host string @@ -54,13 +53,12 @@ var startCmd = &cobra.Command{ log.Panic(err) } launchPProf() - launchMetricsServer() a.Run() }, } func launchPProf() { - fmt.Println("Starting PProf HTTP server") + log.Println("Starting PProf HTTP server") config.SetDefault("pprof.enabled", true) config.SetDefault("pprof.address", "localhost:6060") if config.GetBool("pprof.enabled") { @@ -68,12 +66,6 @@ func launchPProf() { } } -func launchMetricsServer() { - fmt.Println("Starting Metrics HTTP server") - config.SetDefault("prometheus.port", ":9091") - metrics.StartServer(config.GetString("prometheus.port")) -} - func init() { startCmd.Flags().StringVarP(&host, "host", "b", "0.0.0.0", "the address of the interface to bind") startCmd.Flags().IntVarP(&port, "port", "p", 5000, "the port to bind") diff --git a/docker/docker-compose-metrics.yml b/docker/docker-compose-metrics.yml new file mode 100644 index 0000000..3be446b --- /dev/null +++ b/docker/docker-compose-metrics.yml @@ -0,0 +1,35 @@ +version: '3.8' + +volumes: + prometheus_data: {} + grafana_data: {} + +services: + prometheus: + image: prom/prometheus + restart: always + volumes: + - ./prometheus.yml:/etc/prometheus/prometheus.yml + - prometheus_data:/prometheus + command: + - '--config.file=/etc/prometheus/prometheus.yml' + - '--storage.tsdb.path=/prometheus' + - '--web.console.libraries=/usr/share/prometheus/console_libraries' + - '--web.console.templates=/usr/share/prometheus/consoles' + ports: + - "9090:9090" + + grafana: + image: grafana/grafana + restart: always + environment: + GF_INSTALL_PLUGINS: 'grafana-clock-panel,grafana-simple-json-datasource' + GF_SECURITY_ADMIN_USER: 'admin' + GF_SECURITY_ADMIN_PASSWORD: 'foobar' + GF_USERS_ALLOW_SIGN_UP: 'false' + volumes: + - grafana_data:/var/lib/grafana + ports: + - "3000:3000" + depends_on: + - prometheus diff --git a/docker/prometheus.yml b/docker/prometheus.yml new file mode 100644 index 0000000..3e39078 --- /dev/null +++ b/docker/prometheus.yml @@ -0,0 +1,11 @@ +scrape_configs: + - job_name: app + scrape_interval: 5s + static_configs: + - targets: ['host.docker.internal:9091'] + +# prometheus metrics +# - job_name: 'prometheus' +# scrape_interval: 5s +# static_configs: +# - targets: ['localhost:9090'] diff --git a/go.mod b/go.mod index 5b3690e..0465cd9 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.17 require ( github.com/Shopify/sarama v1.35.0 github.com/golang/mock v1.4.4 + github.com/golang/protobuf v1.5.2 github.com/google/uuid v1.3.0 github.com/gorilla/mux v1.8.0 github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 @@ -39,7 +40,6 @@ require ( github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/go-logr/logr v1.2.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect - github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/go-cmp v0.5.8 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect diff --git a/metrics/metrics.go b/metrics/metrics.go index 0daf5a2..c63b239 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -23,8 +23,9 @@ package metrics import ( + "errors" + "github.com/spf13/viper" "net/http" - "os" "time" log "github.com/sirupsen/logrus" @@ -35,17 +36,11 @@ import ( ) var ( - // APIResponseTime summary - APIResponseTime = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "eventsgateway", - Subsystem: "api", - Name: "response_time_ms", - Help: "the response time in ms of api routes", - Buckets: []float64{1, 5, 10, 30, 90, 160, 240}, - }, - []string{"route", "topic", "retry"}, - ) + // APIResponseTime summary, observes the API response time as perceived by the server + APIResponseTime *prometheus.HistogramVec + + // APIPayloadSize summary, observes the payload size of requests arriving at the server + APIPayloadSize *prometheus.HistogramVec // APIRequestsSuccessCounter counter APIRequestsSuccessCounter = prometheus.NewCounterVec( @@ -69,17 +64,8 @@ var ( []string{"route", "topic", "retry", "reason"}, ) - // ClientRequestsResponseTime is the time the client take to talk to the server - ClientRequestsResponseTime = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "eventsgateway", - Subsystem: "client", - Name: "response_time_ms", - Help: "the response time in ms of calls to server", - Buckets: []float64{1, 3, 5, 10, 25, 50, 100, 150, 200, 250, 300}, - }, - []string{"route", "topic", "retry"}, - ) + // ClientRequestsResponseTime summary, observes the API response time as perceived by the client + ClientRequestsResponseTime *prometheus.HistogramVec // ClientRequestsSuccessCounter is the count of successfull calls to the server ClientRequestsSuccessCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ @@ -122,23 +108,82 @@ var ( ) ) +func defaultLatencyBuckets(config *viper.Viper) []float64 { + // in milliseconds + const configKey = "prometheus.buckets.latency" + config.SetDefault(configKey, []float64{3, 5, 10, 50, 100, 300, 500, 1000, 5000}) + + return config.Get(configKey).([]float64) +} + +func defaultPayloadSizeBuckets(config *viper.Viper) []float64 { + // in bytes + configKey := "prometheus.buckets.payloadSize" + config.SetDefault(configKey, []float64{100, 1000, 5000, 10000, 50000, 100000, 500000, 1000000, 5000000}) + + return config.Get(configKey).([]float64) +} + +// RegisterMetrics is a wrapper to handle prometheus.AlreadyRegisteredError; +// it only returns an error if the metric wasn't already registered and there was an +// actual error registering it. +func RegisterMetrics(collectors []prometheus.Collector) error { + for _, collector := range collectors { + err := prometheus.Register(collector) + if err != nil { + var alreadyRegisteredError prometheus.AlreadyRegisteredError + if !errors.As(err, &alreadyRegisteredError) { + return err + } + } + } + + return nil +} + // StartServer runs a metrics server inside a goroutine // that reports default application metrics in prometheus format. -// Any errors that may occur will stop the server add log.Fatal the error. -func StartServer(port string) { - prometheus.MustRegister( +// Any errors that may occur will stop the server and log.Fatal the error. +func StartServer(config *viper.Viper) { + APIPayloadSize = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "eventsgateway", + Subsystem: "api", + Name: "payload_size", + Help: "payload size of API routes, in bytes", + Buckets: defaultPayloadSizeBuckets(config), + }, + []string{"route", "topic"}, + ) + + APIResponseTime = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "eventsgateway", + Subsystem: "api", + Name: "response_time_ms", + Help: "the response time in ms of api routes", + Buckets: defaultLatencyBuckets(config), + }, + []string{"route", "topic", "retry"}, + ) + + collectors := []prometheus.Collector{ APIResponseTime, + APIPayloadSize, APIRequestsFailureCounter, APIRequestsSuccessCounter, APITopicsSubmission, - ClientRequestsResponseTime, - ClientRequestsSuccessCounter, - ClientRequestsFailureCounter, - ClientRequestsDroppedCounter, - ) + } + + err := RegisterMetrics(collectors) + if err != nil { + log.Fatal(err) + } + go func() { - envEnabled, _ := os.LookupEnv("EVENTSGATEWAY_PROMETHEUS_ENABLED") + envEnabled := config.GetString("prometheus.enabled") if envEnabled != "true" { + log.Warn("Prometheus web server disabled") return } @@ -146,7 +191,7 @@ func StartServer(port string) { r.Handle("/metrics", promhttp.Handler()) s := &http.Server{ - Addr: port, + Addr: config.GetString("prometheus.port"), ReadTimeout: 8 * time.Second, WriteTimeout: 8 * time.Second, MaxHeaderBytes: 1 << 20, diff --git a/testing/config.go b/testing/config.go index 47337a4..95e2377 100644 --- a/testing/config.go +++ b/testing/config.go @@ -8,9 +8,8 @@ package testing import ( - "strings" - "github.com/spf13/viper" + "strings" ) // GetDefaultConfig returns the configuration at ./config/test.yaml @@ -30,5 +29,7 @@ func GetDefaultConfig() (*viper.Viper, error) { return nil, err } + cfg.Set("prometheus.enabled", "false") + return cfg, nil }