Skip to content

Commit

Permalink
Add event processing histogram metric (#1134)
Browse files Browse the repository at this point in the history
Problem: NGF does not measure how long it takes to process an event batch.

Solution: Add a new histogram metric event_batch_processing_milliseconds, 
that measures the time it takes to process an event batch. Also adds a debug 
log statement with the same information so we can debug spikes in processing time.
kate-osborn authored Oct 12, 2023
1 parent 4f40fca commit 567f27e
Showing 17 changed files with 234 additions and 152 deletions.
3 changes: 3 additions & 0 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
@@ -92,6 +92,9 @@ NGINX Gateway Fabric exports the following metrics:
- nginx_stale_config. 1 means NGF failed to configure NGINX with the latest version of the configuration, which means
NGINX is running with a stale version.
- nginx_last_reload_milliseconds. Duration in milliseconds of NGINX reloads (histogram).
- event_batch_processing_milliseconds: Duration in milliseconds of event batch processing (histogram), which is the
time it takes NGF to process batches of Kubernetes events (changes to cluster resources). Note that NGF processes
events in batches, and while processing the current batch, it accumulates events for the next batch.
- These metrics have the namespace `nginx_gateway_fabric`, and include the label `class` which is set to the
Gateway class of NGF. For example, `nginx_gateway_fabric_nginx_reloads_total{class="nginx"}`.

23 changes: 13 additions & 10 deletions internal/framework/events/eventsfakes/fake_event_handler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion internal/framework/events/handler.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,8 @@ package events

import (
"context"

"github.com/go-logr/logr"
)

//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . EventHandler
@@ -10,5 +12,5 @@ import (
type EventHandler interface {
// HandleEventBatch handles a batch of events.
// EventBatch can include duplicated events.
HandleEventBatch(ctx context.Context, batch EventBatch)
HandleEventBatch(ctx context.Context, logger logr.Logger, batch EventBatch)
}
12 changes: 9 additions & 3 deletions internal/framework/events/loop.go
Original file line number Diff line number Diff line change
@@ -34,6 +34,9 @@ type EventLoop struct {
// The batches are swapped before starting the handler goroutine.
currentBatch EventBatch
nextBatch EventBatch

// the ID of the current batch
currentBatchID int
}

// NewEventLoop creates a new EventLoop.
@@ -63,11 +66,14 @@ func (el *EventLoop) Start(ctx context.Context) error {

handleBatch := func() {
go func(batch EventBatch) {
el.logger.Info("Handling events from the batch", "total", len(batch))
el.currentBatchID++
batchLogger := el.logger.WithName("eventHandler").WithValues("batchID", el.currentBatchID)

batchLogger.Info("Handling events from the batch", "total", len(batch))

el.handler.HandleEventBatch(ctx, batch)
el.handler.HandleEventBatch(ctx, batchLogger, batch)

el.logger.Info("Finished handling the batch")
batchLogger.Info("Finished handling the batch")
handlingDone <- struct{}{}
}(el.currentBatch)
}
11 changes: 6 additions & 5 deletions internal/framework/events/loop_test.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import (
"context"
"errors"

"github.com/go-logr/logr"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
@@ -47,7 +48,7 @@ var _ = Describe("EventLoop", func() {

// Ensure the first batch is handled
Eventually(fakeHandler.HandleEventBatchCallCount).Should(Equal(1))
_, batch = fakeHandler.HandleEventBatchArgsForCall(0)
_, _, batch = fakeHandler.HandleEventBatchArgsForCall(0)

var expectedBatch events.EventBatch = []interface{}{"event0"}
Expect(batch).Should(Equal(expectedBatch))
@@ -70,7 +71,7 @@ var _ = Describe("EventLoop", func() {
eventCh <- e

Eventually(fakeHandler.HandleEventBatchCallCount).Should(Equal(2))
_, batch := fakeHandler.HandleEventBatchArgsForCall(1)
_, _, batch := fakeHandler.HandleEventBatchArgsForCall(1)

var expectedBatch events.EventBatch = []interface{}{e}
Expect(batch).Should(Equal(expectedBatch))
@@ -82,7 +83,7 @@ var _ = Describe("EventLoop", func() {

// The func below will pause the handler goroutine while it is processing the batch with e1 until
// sentSecondAndThirdEvents is closed. This way we can add e2 and e3 to the current batch in the meantime.
fakeHandler.HandleEventBatchCalls(func(ctx context.Context, batch events.EventBatch) {
fakeHandler.HandleEventBatchCalls(func(ctx context.Context, logger logr.Logger, batch events.EventBatch) {
close(firstHandleEventBatchCallInProgress)
<-sentSecondAndThirdEvents
})
@@ -106,14 +107,14 @@ var _ = Describe("EventLoop", func() {
close(sentSecondAndThirdEvents)

Eventually(fakeHandler.HandleEventBatchCallCount).Should(Equal(3))
_, batch := fakeHandler.HandleEventBatchArgsForCall(1)
_, _, batch := fakeHandler.HandleEventBatchArgsForCall(1)

var expectedBatch events.EventBatch = []interface{}{e1}

// the first HandleEventBatch() call must have handled a batch with e1
Expect(batch).Should(Equal(expectedBatch))

_, batch = fakeHandler.HandleEventBatchArgsForCall(2)
_, _, batch = fakeHandler.HandleEventBatchArgsForCall(2)

expectedBatch = []interface{}{e2, e3}
// the second HandleEventBatch() call must have handled a batch with e2 and e3
13 changes: 5 additions & 8 deletions internal/mode/provisioner/handler.go
Original file line number Diff line number Diff line change
@@ -27,7 +27,6 @@ type eventHandler struct {

statusUpdater status.Updater
k8sClient client.Client
logger logr.Logger

staticModeDeploymentYAML []byte

@@ -38,7 +37,6 @@ func newEventHandler(
gcName string,
statusUpdater status.Updater,
k8sClient client.Client,
logger logr.Logger,
staticModeDeploymentYAML []byte,
) *eventHandler {
return &eventHandler{
@@ -47,7 +45,6 @@ func newEventHandler(
statusUpdater: statusUpdater,
gcName: gcName,
k8sClient: k8sClient,
logger: logger,
staticModeDeploymentYAML: staticModeDeploymentYAML,
gatewayNextID: 1,
}
@@ -80,7 +77,7 @@ func (h *eventHandler) setGatewayClassStatuses(ctx context.Context) {
h.statusUpdater.Update(ctx, statuses)
}

func (h *eventHandler) ensureDeploymentsMatchGateways(ctx context.Context) {
func (h *eventHandler) ensureDeploymentsMatchGateways(ctx context.Context, logger logr.Logger) {
var gwsWithoutDeps, removedGwsWithDeps []types.NamespacedName

for nsname, gw := range h.store.gateways {
@@ -116,7 +113,7 @@ func (h *eventHandler) ensureDeploymentsMatchGateways(ctx context.Context) {

h.provisions[nsname] = deployment

h.logger.Info(
logger.Info(
"Created deployment",
"deployment", client.ObjectKeyFromObject(deployment),
"gateway", nsname,
@@ -134,18 +131,18 @@ func (h *eventHandler) ensureDeploymentsMatchGateways(ctx context.Context) {

delete(h.provisions, nsname)

h.logger.Info(
logger.Info(
"Deleted deployment",
"deployment", client.ObjectKeyFromObject(deployment),
"gateway", nsname,
)
}
}

func (h *eventHandler) HandleEventBatch(ctx context.Context, batch events.EventBatch) {
func (h *eventHandler) HandleEventBatch(ctx context.Context, logger logr.Logger, batch events.EventBatch) {
h.store.update(batch)
h.setGatewayClassStatuses(ctx)
h.ensureDeploymentsMatchGateways(ctx)
h.ensureDeploymentsMatchGateways(ctx, logger)
}

func (h *eventHandler) generateDeploymentID() string {
23 changes: 10 additions & 13 deletions internal/mode/provisioner/handler_test.go
Original file line number Diff line number Diff line change
@@ -96,7 +96,7 @@ var _ = Describe("handler", func() {
Resource: gc,
},
}
handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)

// Ensure GatewayClass is accepted

@@ -126,7 +126,7 @@ var _ = Describe("handler", func() {
},
}

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)

depNsName := types.NamespacedName{
Namespace: "nginx-gateway",
@@ -156,7 +156,7 @@ var _ = Describe("handler", func() {
}

handle := func() {
handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)
}

Expect(handle).Should(Panic())
@@ -179,7 +179,6 @@ var _ = Describe("handler", func() {
gcName,
statusUpdater,
k8sclient,
zap.New(),
embeddedfiles.StaticModeDeploymentYAML,
)
})
@@ -217,7 +216,7 @@ var _ = Describe("handler", func() {
},
}

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)
deps := &v1.DeploymentList{}

err := k8sclient.List(context.Background(), deps)
@@ -237,7 +236,7 @@ var _ = Describe("handler", func() {
},
}

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)

deps := &v1.DeploymentList{}

@@ -266,7 +265,7 @@ var _ = Describe("handler", func() {
},
}

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)

deps := &v1.DeploymentList{}
err := k8sclient.List(context.Background(), deps)
@@ -295,7 +294,7 @@ var _ = Describe("handler", func() {
},
}

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)

unknownGC := &v1beta1.GatewayClass{}
err = k8sclient.Get(context.Background(), client.ObjectKeyFromObject(gc), unknownGC)
@@ -330,7 +329,6 @@ var _ = Describe("handler", func() {
gcName,
statusUpdater,
k8sclient,
zap.New(),
embeddedfiles.StaticModeDeploymentYAML,
)
})
@@ -340,7 +338,7 @@ var _ = Describe("handler", func() {
batch := []interface{}{e}

handle := func() {
handler.HandleEventBatch(context.TODO(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)
}

Expect(handle).Should(Panic())
@@ -408,7 +406,7 @@ var _ = Describe("handler", func() {
}

handle := func() {
handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)
}

Expect(handle).Should(Panic())
@@ -429,7 +427,7 @@ var _ = Describe("handler", func() {
}

handle := func() {
handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), zap.New(), batch)
}

Expect(handle).Should(Panic())
@@ -442,7 +440,6 @@ var _ = Describe("handler", func() {
gcName,
statusUpdater,
k8sclient,
zap.New(),
[]byte("broken YAML"),
)

1 change: 0 additions & 1 deletion internal/mode/provisioner/manager.go
Original file line number Diff line number Diff line change
@@ -107,7 +107,6 @@ func StartManager(cfg Config) error {
cfg.GatewayClassName,
statusUpdater,
mgr.GetClient(),
cfg.Logger.WithName("eventHandler"),
embeddedfiles.StaticModeDeploymentYAML,
)

45 changes: 33 additions & 12 deletions internal/mode/static/handler.go
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ package static
import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"
apiv1 "k8s.io/api/core/v1"
@@ -23,6 +24,10 @@ import (
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/resolver"
)

type handlerMetricsCollector interface {
ObserveLastEventBatchProcessTime(time.Duration)
}

// eventHandlerConfig holds configuration parameters for eventHandlerImpl.
type eventHandlerConfig struct {
// processor is the state ChangeProcessor.
@@ -45,8 +50,8 @@ type eventHandlerConfig struct {
healthChecker *healthChecker
// controlConfigNSName is the NamespacedName of the NginxGateway config for this controller.
controlConfigNSName types.NamespacedName
// logger is the logger to be used by the EventHandler.
logger logr.Logger
// metricsCollector collects metrics for this controller.
metricsCollector handlerMetricsCollector
// version is the current version number of the nginx config.
version int
}
@@ -67,18 +72,30 @@ func newEventHandlerImpl(cfg eventHandlerConfig) *eventHandlerImpl {
}
}

func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, batch events.EventBatch) {
func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Logger, batch events.EventBatch) {
start := time.Now()
logger.V(1).Info("Started processing event batch")

defer func() {
duration := time.Since(start)
logger.V(1).Info(
"Finished processing event batch",
"duration", duration.String(),
)
h.cfg.metricsCollector.ObserveLastEventBatchProcessTime(duration)
}()

for _, event := range batch {
switch e := event.(type) {
case *events.UpsertEvent:
if cfg, ok := e.Resource.(*ngfAPI.NginxGateway); ok {
h.updateControlPlaneAndSetStatus(ctx, cfg)
h.updateControlPlaneAndSetStatus(ctx, logger, cfg)
} else {
h.cfg.processor.CaptureUpsertChange(e.Resource)
}
case *events.DeleteEvent:
if _, ok := e.Type.(*ngfAPI.NginxGateway); ok {
h.updateControlPlaneAndSetStatus(ctx, nil)
h.updateControlPlaneAndSetStatus(ctx, logger, nil)
} else {
h.cfg.processor.CaptureDeleteChange(e.Type, e.NamespacedName)
}
@@ -89,7 +106,7 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, batch events.Ev

changed, graph := h.cfg.processor.Process()
if !changed {
h.cfg.logger.Info("Handling events didn't result into NGINX configuration changes")
logger.Info("Handling events didn't result into NGINX configuration changes")
if !h.cfg.healthChecker.ready && h.cfg.healthChecker.firstBatchError == nil {
h.cfg.healthChecker.setAsReady()
}
@@ -102,13 +119,13 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, batch events.Ev
ctx,
dataplane.BuildConfiguration(ctx, graph, h.cfg.serviceResolver, h.cfg.version),
); err != nil {
h.cfg.logger.Error(err, "Failed to update NGINX configuration")
logger.Error(err, "Failed to update NGINX configuration")
nginxReloadRes.error = err
if !h.cfg.healthChecker.ready {
h.cfg.healthChecker.firstBatchError = err
}
} else {
h.cfg.logger.Info("NGINX configuration was successfully updated")
logger.Info("NGINX configuration was successfully updated")
if !h.cfg.healthChecker.ready {
h.cfg.healthChecker.setAsReady()
}
@@ -133,17 +150,21 @@ func (h *eventHandlerImpl) updateNginx(ctx context.Context, conf dataplane.Confi

// updateControlPlaneAndSetStatus updates the control plane configuration and then sets the status
// based on the outcome
func (h *eventHandlerImpl) updateControlPlaneAndSetStatus(ctx context.Context, cfg *ngfAPI.NginxGateway) {
func (h *eventHandlerImpl) updateControlPlaneAndSetStatus(
ctx context.Context,
logger logr.Logger,
cfg *ngfAPI.NginxGateway,
) {
var cond []conditions.Condition
if err := updateControlPlane(
cfg,
h.cfg.logger,
logger,
h.cfg.eventRecorder,
h.cfg.controlConfigNSName,
h.cfg.logLevelSetter,
); err != nil {
msg := "Failed to update control plane configuration"
h.cfg.logger.Error(err, msg)
logger.Error(err, msg)
h.cfg.eventRecorder.Eventf(
cfg,
apiv1.EventTypeWarning,
@@ -164,6 +185,6 @@ func (h *eventHandlerImpl) updateControlPlaneAndSetStatus(ctx context.Context, c
}

h.cfg.statusUpdater.Update(ctx, nginxGatewayStatus)
h.cfg.logger.Info("Reconfigured control plane.")
logger.Info("Reconfigured control plane.")
}
}
29 changes: 15 additions & 14 deletions internal/mode/static/handler_test.go
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@ import (
"github.com/nginxinc/nginx-gateway-fabric/internal/framework/helpers"
"github.com/nginxinc/nginx-gateway-fabric/internal/framework/status"
"github.com/nginxinc/nginx-gateway-fabric/internal/framework/status/statusfakes"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/metrics/collectors"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/configfakes"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/file"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/file/filefakes"
@@ -68,14 +69,14 @@ var _ = Describe("eventHandler", func() {
handler = newEventHandlerImpl(eventHandlerConfig{
processor: fakeProcessor,
generator: fakeGenerator,
logger: ctlrZap.New(),
logLevelSetter: newZapLogLevelSetter(zap.NewAtomicLevel()),
nginxFileMgr: fakeNginxFileMgr,
nginxRuntimeMgr: fakeNginxRuntimeMgr,
statusUpdater: fakeStatusUpdater,
eventRecorder: fakeEventRecorder,
healthChecker: &healthChecker{},
controlConfigNSName: types.NamespacedName{Namespace: namespace, Name: configName},
metricsCollector: collectors.NewControllerNoopCollector(),
})
Expect(handler.cfg.healthChecker.ready).To(BeFalse())
})
@@ -115,7 +116,7 @@ var _ = Describe("eventHandler", func() {
e := &events.UpsertEvent{Resource: &v1beta1.HTTPRoute{}}
batch := []interface{}{e}

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)

checkUpsertEventExpectations(e)
expectReconfig(dataplane.Configuration{Version: 1}, fakeCfgFiles)
@@ -128,7 +129,7 @@ var _ = Describe("eventHandler", func() {
}
batch := []interface{}{e}

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)

checkDeleteEventExpectations(e)
expectReconfig(dataplane.Configuration{Version: 1}, fakeCfgFiles)
@@ -144,12 +145,12 @@ var _ = Describe("eventHandler", func() {
}
batch := []interface{}{upsertEvent, deleteEvent}

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)

checkUpsertEventExpectations(upsertEvent)
checkDeleteEventExpectations(deleteEvent)

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)
})
})
})
@@ -182,7 +183,7 @@ var _ = Describe("eventHandler", func() {

It("handles a valid config", func() {
batch := []interface{}{&events.UpsertEvent{Resource: cfg(ngfAPI.ControllerLogLevelError)}}
handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)

Expect(fakeStatusUpdater.UpdateCallCount()).Should(Equal(1))
_, statuses := fakeStatusUpdater.UpdateArgsForCall(0)
@@ -193,7 +194,7 @@ var _ = Describe("eventHandler", func() {

It("handles an invalid config", func() {
batch := []interface{}{&events.UpsertEvent{Resource: cfg(ngfAPI.ControllerLogLevel("invalid"))}}
handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)

Expect(fakeStatusUpdater.UpdateCallCount()).Should(Equal(1))
_, statuses := fakeStatusUpdater.UpdateArgsForCall(0)
@@ -212,7 +213,7 @@ var _ = Describe("eventHandler", func() {

It("handles a deleted config", func() {
batch := []interface{}{&events.DeleteEvent{Type: &ngfAPI.NginxGateway{}}}
handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)
Expect(len(fakeEventRecorder.Events)).To(Equal(1))
event := <-fakeEventRecorder.Events
Expect(event).To(Equal("Warning ResourceDeleted NginxGateway configuration was deleted; using defaults"))
@@ -227,7 +228,7 @@ var _ = Describe("eventHandler", func() {
fakeProcessor.ProcessReturns(true, &graph.Graph{})

Expect(handler.cfg.healthChecker.readyCheck(nil)).ToNot(Succeed())
handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)
Expect(handler.cfg.healthChecker.readyCheck(nil)).To(Succeed())
})

@@ -236,7 +237,7 @@ var _ = Describe("eventHandler", func() {
batch := []interface{}{e}

Expect(handler.cfg.healthChecker.readyCheck(nil)).ToNot(Succeed())
handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)
Expect(handler.cfg.healthChecker.readyCheck(nil)).To(Succeed())
})

@@ -247,22 +248,22 @@ var _ = Describe("eventHandler", func() {
fakeProcessor.ProcessReturns(true, &graph.Graph{})
fakeNginxRuntimeMgr.ReloadReturns(errors.New("reload error"))

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)

Expect(handler.cfg.healthChecker.readyCheck(nil)).ToNot(Succeed())

// now send an update with no changes; should still return an error
fakeProcessor.ProcessReturns(false, &graph.Graph{})

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)

Expect(handler.cfg.healthChecker.readyCheck(nil)).ToNot(Succeed())

// error goes away
fakeProcessor.ProcessReturns(true, &graph.Graph{})
fakeNginxRuntimeMgr.ReloadReturns(nil)

handler.HandleEventBatch(context.Background(), batch)
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)

Expect(handler.cfg.healthChecker.readyCheck(nil)).To(Succeed())
})
@@ -272,7 +273,7 @@ var _ = Describe("eventHandler", func() {

handle := func() {
batch := []interface{}{e}
handler.HandleEventBatch(context.TODO(), batch)
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)
}

Expect(handle).Should(Panic())
55 changes: 24 additions & 31 deletions internal/mode/static/manager.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"
apiv1 "k8s.io/api/core/v1"
discoveryV1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -29,7 +30,7 @@ import (
"github.com/nginxinc/nginx-gateway-fabric/internal/framework/events"
"github.com/nginxinc/nginx-gateway-fabric/internal/framework/status"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/config"
ngfmetrics "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/metrics"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/metrics/collectors"
ngxcfg "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config"
ngxvalidation "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/validation"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/file"
@@ -130,9 +131,26 @@ func StartManager(cfg config.Config) error {
return fmt.Errorf("NGINX is not running: %w", err)
}

mgrCollector, err := createAndRegisterMetricsCollectors(cfg.MetricsConfig.Enabled, cfg.GatewayClassName)
if err != nil {
return fmt.Errorf("cannot create and register metrics collectors: %w", err)
var (
ngxruntimeCollector ngxruntime.MetricsCollector = collectors.NewManagerNoopCollector()
// nolint:ineffassign // not an ineffectual assignment. Will be used if metrics are disabled.
handlerCollector handlerMetricsCollector = collectors.NewControllerNoopCollector()
)

if cfg.MetricsConfig.Enabled {
constLabels := map[string]string{"class": cfg.GatewayClassName}
ngxCollector, err := collectors.NewNginxMetricsCollector(constLabels)
if err != nil {
return fmt.Errorf("cannot create nginx metrics collector: %w", err)
}

ngxruntimeCollector = collectors.NewManagerMetricsCollector(constLabels)
handlerCollector = collectors.NewControllerCollector(constLabels)
metrics.Registry.MustRegister(
ngxCollector,
ngxruntimeCollector.(prometheus.Collector),
handlerCollector.(prometheus.Collector),
)
}

statusUpdater := status.NewUpdater(status.UpdaterConfig{
@@ -150,17 +168,17 @@ func StartManager(cfg config.Config) error {
processor: processor,
serviceResolver: resolver.NewServiceResolverImpl(mgr.GetClient()),
generator: ngxcfg.NewGeneratorImpl(),
logger: cfg.Logger.WithName("eventHandler"),
logLevelSetter: logLevelSetter,
nginxFileMgr: file.NewManagerImpl(
cfg.Logger.WithName("nginxFileManager"),
file.NewStdLibOSFileManager(),
),
nginxRuntimeMgr: ngxruntime.NewManagerImpl(mgrCollector),
nginxRuntimeMgr: ngxruntime.NewManagerImpl(ngxruntimeCollector),
statusUpdater: statusUpdater,
eventRecorder: recorder,
healthChecker: hc,
controlConfigNSName: controlConfigNSName,
metricsCollector: handlerCollector,
})

objects, objectLists := prepareFirstEventBatchPreparerArgs(cfg.GatewayClassName, cfg.GatewayNsName)
@@ -352,31 +370,6 @@ func setInitialConfig(
return updateControlPlane(&config, logger, eventRecorder, configName, logLevelSetter)
}

// createAndRegisterMetricsCollectors creates the NGINX status and NGINX runtime manager collectors, registers them,
// and returns the runtime manager collector to be used in the nginxRuntimeMgr.
func createAndRegisterMetricsCollectors(metricsEnabled bool, gwClassName string) (ngxruntime.ManagerCollector, error) {
if !metricsEnabled {
// return a no-op collector to avoid nil pointer errors when metrics are disabled
return ngfmetrics.NewManagerNoopCollector(), nil
}
constLabels := map[string]string{"class": gwClassName}

ngxCollector, err := ngfmetrics.NewNginxMetricsCollector(constLabels)
if err != nil {
return nil, fmt.Errorf("cannot create NGINX status metrics collector: %w", err)
}
if err := metrics.Registry.Register(ngxCollector); err != nil {
return nil, fmt.Errorf("failed to register NGINX status metrics collector: %w", err)
}

mgrCollector := ngfmetrics.NewManagerMetricsCollector(constLabels)
if err := metrics.Registry.Register(mgrCollector); err != nil {
return nil, fmt.Errorf("failed to register NGINX manager runtime metrics collector: %w", err)
}

return mgrCollector, nil
}

func getMetricsOptions(cfg config.MetricsConfig) metricsserver.Options {
metricsOptions := metricsserver.Options{BindAddress: "0"}

58 changes: 58 additions & 0 deletions internal/mode/static/metrics/collectors/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package collectors

import (
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/metrics"
)

// ControllerCollector collects metrics for the NGF controller.
// Implements the prometheus.Collector interface.
type ControllerCollector struct {
// Metrics
eventBatchProcessDuration prometheus.Histogram
}

// NewControllerCollector creates a new ControllerCollector
func NewControllerCollector(constLabels map[string]string) *ControllerCollector {
nc := &ControllerCollector{
eventBatchProcessDuration: prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: "event_batch_processing_milliseconds",
Namespace: metrics.Namespace,
Help: "Duration in milliseconds of event batch processing",
ConstLabels: constLabels,
Buckets: []float64{500, 1000, 5000, 10000, 30000},
},
),
}
return nc
}

// ObserveLastEventBatchProcessTime adds the last event batch processing time to the histogram.
func (c *ControllerCollector) ObserveLastEventBatchProcessTime(duration time.Duration) {
c.eventBatchProcessDuration.Observe(float64(duration / time.Millisecond))
}

// Describe implements prometheus.Collector interface Describe method.
func (c *ControllerCollector) Describe(ch chan<- *prometheus.Desc) {
c.eventBatchProcessDuration.Describe(ch)
}

// Collect implements the prometheus.Collector interface Collect method.
func (c *ControllerCollector) Collect(ch chan<- prometheus.Metric) {
c.eventBatchProcessDuration.Collect(ch)
}

// ControllerNoopCollector used to initialize the ControllerCollector when metrics are disabled to avoid nil pointer
// errors.
type ControllerNoopCollector struct{}

// NewControllerNoopCollector returns an instance of the ControllerNoopCollector.
func NewControllerNoopCollector() *ControllerNoopCollector {
return &ControllerNoopCollector{}
}

func (c *ControllerNoopCollector) ObserveLastEventBatchProcessTime(_ time.Duration) {}
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
package metrics
package collectors

import (
"context"
"net"
"net/http"
"time"

prometheusClient "github.com/nginxinc/nginx-prometheus-exporter/client"
nginxCollector "github.com/nginxinc/nginx-prometheus-exporter/collector"
"github.com/prometheus/client_golang/prometheus"

"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/metrics"
)

const (
nginxStatusSock = "/var/run/nginx/nginx-status.sock"
nginxStatusURI = "http://config-status/stub_status"
nginxStatusSockTimeout = 10 * time.Second
nginxStatusSock = "/var/run/nginx/nginx-status.sock"
nginxStatusURI = "http://config-status/stub_status"
)

// NewNginxMetricsCollector creates an NginxCollector which fetches stats from NGINX over a unix socket
@@ -24,7 +24,7 @@ func NewNginxMetricsCollector(constLabels map[string]string) (prometheus.Collect
if err != nil {
return nil, err
}
return nginxCollector.NewNginxCollector(client, metricsNamespace, constLabels), nil
return nginxCollector.NewNginxCollector(client, metrics.Namespace, constLabels), nil
}

// getSocketClient gets an http.Client with a unix socket transport.
Original file line number Diff line number Diff line change
@@ -1,50 +1,52 @@
package metrics
package collectors

import (
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/metrics"
)

// ManagerMetricsCollector implements ManagerCollector interface and prometheus.Collector interface
type ManagerMetricsCollector struct {
// NginxRuntimeCollector implements runtime.Collector interface and prometheus.Collector interface
type NginxRuntimeCollector struct {
// Metrics
reloadsTotal prometheus.Counter
reloadsError prometheus.Counter
configStale prometheus.Gauge
reloadsDuration prometheus.Histogram
}

// NewManagerMetricsCollector creates a new ManagerMetricsCollector
func NewManagerMetricsCollector(constLabels map[string]string) *ManagerMetricsCollector {
nc := &ManagerMetricsCollector{
// NewManagerMetricsCollector creates a new NginxRuntimeCollector
func NewManagerMetricsCollector(constLabels map[string]string) *NginxRuntimeCollector {
nc := &NginxRuntimeCollector{
reloadsTotal: prometheus.NewCounter(
prometheus.CounterOpts{
Name: "nginx_reloads_total",
Namespace: metricsNamespace,
Namespace: metrics.Namespace,
Help: "Number of successful NGINX reloads",
ConstLabels: constLabels,
}),
reloadsError: prometheus.NewCounter(
prometheus.CounterOpts{
Name: "nginx_reload_errors_total",
Namespace: metricsNamespace,
Namespace: metrics.Namespace,
Help: "Number of unsuccessful NGINX reloads",
ConstLabels: constLabels,
},
),
configStale: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "nginx_stale_config",
Namespace: metricsNamespace,
Namespace: metrics.Namespace,
Help: "Indicates if NGINX is not serving the latest configuration.",
ConstLabels: constLabels,
},
),
reloadsDuration: prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: "nginx_reloads_milliseconds",
Namespace: metricsNamespace,
Namespace: metrics.Namespace,
Help: "Duration in milliseconds of NGINX reloads",
ConstLabels: constLabels,
Buckets: []float64{500, 1000, 5000, 10000, 30000},
@@ -54,50 +56,49 @@ func NewManagerMetricsCollector(constLabels map[string]string) *ManagerMetricsCo
return nc
}

// IncNginxReloadCount increments the counter of successful NGINX reloads and sets the stale config status to false.
func (mc *ManagerMetricsCollector) IncReloadCount() {
mc.reloadsTotal.Inc()
mc.updateConfigStaleStatus(false)
// IncReloadCount increments the counter of successful NGINX reloads and sets the stale config status to false.
func (c *NginxRuntimeCollector) IncReloadCount() {
c.reloadsTotal.Inc()
c.updateConfigStaleStatus(false)
}

// IncNginxReloadErrors increments the counter of NGINX reload errors and sets the stale config status to true.
func (mc *ManagerMetricsCollector) IncReloadErrors() {
mc.reloadsError.Inc()
mc.updateConfigStaleStatus(true)
// IncReloadErrors increments the counter of NGINX reload errors and sets the stale config status to true.
func (c *NginxRuntimeCollector) IncReloadErrors() {
c.reloadsError.Inc()
c.updateConfigStaleStatus(true)
}

// updateConfigStaleStatus updates the last NGINX reload status metric.
func (mc *ManagerMetricsCollector) updateConfigStaleStatus(stale bool) {
func (c *NginxRuntimeCollector) updateConfigStaleStatus(stale bool) {
var status float64
if stale {
status = 1.0
}
mc.configStale.Set(status)
c.configStale.Set(status)
}

// ObserveLastReloadTime adds the last NGINX reload time to the histogram.
func (mc *ManagerMetricsCollector) ObserveLastReloadTime(duration time.Duration) {
mc.reloadsDuration.Observe(float64(duration / time.Millisecond))
func (c *NginxRuntimeCollector) ObserveLastReloadTime(duration time.Duration) {
c.reloadsDuration.Observe(float64(duration / time.Millisecond))
}

// Describe implements prometheus.Collector interface Describe method.
func (mc *ManagerMetricsCollector) Describe(ch chan<- *prometheus.Desc) {
mc.reloadsTotal.Describe(ch)
mc.reloadsError.Describe(ch)
mc.configStale.Describe(ch)
mc.reloadsDuration.Describe(ch)
func (c *NginxRuntimeCollector) Describe(ch chan<- *prometheus.Desc) {
c.reloadsTotal.Describe(ch)
c.reloadsError.Describe(ch)
c.configStale.Describe(ch)
c.reloadsDuration.Describe(ch)
}

// Collect implements the prometheus.Collector interface Collect method.
func (mc *ManagerMetricsCollector) Collect(ch chan<- prometheus.Metric) {
mc.reloadsTotal.Collect(ch)
mc.reloadsError.Collect(ch)
mc.configStale.Collect(ch)
mc.reloadsDuration.Collect(ch)
func (c *NginxRuntimeCollector) Collect(ch chan<- prometheus.Metric) {
c.reloadsTotal.Collect(ch)
c.reloadsError.Collect(ch)
c.configStale.Collect(ch)
c.reloadsDuration.Collect(ch)
}

// ManagerNoopCollector is a no-op collector that will implement ManagerCollector interface.
// Used to initialize the ManagerCollector when metrics are disabled to avoid nil pointer errors.
// ManagerNoopCollector used to initialize the ManagerCollector when metrics are disabled to avoid nil pointer errors.
type ManagerNoopCollector struct{}

// NewManagerNoopCollector creates a no-op collector that implements ManagerCollector interface.
@@ -106,10 +107,10 @@ func NewManagerNoopCollector() *ManagerNoopCollector {
}

// IncReloadCount implements a no-op IncReloadCount.
func (mc *ManagerNoopCollector) IncReloadCount() {}
func (c *ManagerNoopCollector) IncReloadCount() {}

// IncReloadErrors implements a no-op IncReloadErrors.
func (mc *ManagerNoopCollector) IncReloadErrors() {}
func (c *ManagerNoopCollector) IncReloadErrors() {}

// ObserveLastReloadTime implements a no-op ObserveLastReloadTime.
func (mc *ManagerNoopCollector) ObserveLastReloadTime(_ time.Duration) {}
func (c *ManagerNoopCollector) ObserveLastReloadTime(_ time.Duration) {}
2 changes: 1 addition & 1 deletion internal/mode/static/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package metrics

// nolint:gosec // flagged as potential hardcoded credentials, but is not sensitive
const metricsNamespace = "nginx_gateway_fabric"
const Namespace = "nginx_gateway_fabric"
18 changes: 9 additions & 9 deletions internal/mode/static/nginx/runtime/manager.go
Original file line number Diff line number Diff line change
@@ -35,8 +35,8 @@ type Manager interface {
Reload(ctx context.Context, configVersion int) error
}

// ManagerCollector is an interface for the metrics of the NGINX runtime manager.
type ManagerCollector interface {
// MetricsCollector is an interface for the metrics of the NGINX runtime manager.
type MetricsCollector interface {
IncReloadCount()
IncReloadErrors()
ObserveLastReloadTime(ms time.Duration)
@@ -45,14 +45,14 @@ type ManagerCollector interface {
// ManagerImpl implements Manager.
type ManagerImpl struct {
verifyClient *verifyClient
managerCollector ManagerCollector
metricsCollector MetricsCollector
}

// NewManagerImpl creates a new ManagerImpl.
func NewManagerImpl(managerCollector ManagerCollector) *ManagerImpl {
func NewManagerImpl(collector MetricsCollector) *ManagerImpl {
return &ManagerImpl{
verifyClient: newVerifyClient(nginxReloadTimeout),
managerCollector: managerCollector,
metricsCollector: collector,
}
}

@@ -73,7 +73,7 @@ func (m *ManagerImpl) Reload(ctx context.Context, configVersion int) error {
// send HUP signal to the NGINX main process reload configuration
// See https://nginx.org/en/docs/control.html
if err := syscall.Kill(pid, syscall.SIGHUP); err != nil {
m.managerCollector.IncReloadErrors()
m.metricsCollector.IncReloadErrors()
return fmt.Errorf("failed to send the HUP signal to NGINX main: %w", err)
}

@@ -84,13 +84,13 @@ func (m *ManagerImpl) Reload(ctx context.Context, configVersion int) error {
previousChildProcesses,
os.ReadFile,
); err != nil {
m.managerCollector.IncReloadErrors()
m.metricsCollector.IncReloadErrors()
return err
}
m.managerCollector.IncReloadCount()
m.metricsCollector.IncReloadCount()

finish := time.Now()
m.managerCollector.ObserveLastReloadTime(finish.Sub(start))
m.metricsCollector.ObserveLastReloadTime(finish.Sub(start))
return nil
}

2 changes: 1 addition & 1 deletion tests/scale/scale.md
Original file line number Diff line number Diff line change
@@ -62,7 +62,7 @@ are listed in the [Scale Upstream Servers](#scale-upstream-servers) test steps.
- Install Prometheus:

```console
kubectl apply -f manifets/prom-clusterrole.yaml
kubectl apply -f manifests/prom-clusterrole.yaml
helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm repo update
helm install prom prometheus-community/prometheus --set useExistingClusterRoleName=prometheus -n prom

0 comments on commit 567f27e

Please sign in to comment.