Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telemetry Job #4896

Merged
merged 29 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
838c94a
Add base telemetry job
shaun-nx Jan 5, 2024
ca9b9f7
Ensure only leader pod reports data
shaun-nx Jan 9, 2024
5e3d349
Allow deployments to opt-out of telemetry collection
shaun-nx Jan 10, 2024
a9224db
Add log line for when telemetry is collected
shaun-nx Jan 10, 2024
54b896b
gofumpt files
shaun-nx Jan 10, 2024
8011a08
Revert deployment yaml and fake manager
shaun-nx Jan 10, 2024
079a906
Fix nginx version assignment
shaun-nx Jan 10, 2024
3a69f51
Merge branch 'main' into feat/telemetry
shaun-nx Jan 10, 2024
82a91a6
Resolve lint issues
shaun-nx Jan 10, 2024
ff7515e
Placeholder for telemetry collector
jjngx Jan 22, 2024
9448ae9
Simplify telemetry reporting flags
jjngx Jan 23, 2024
dc634ac
Limit reporting period to min 1m
jjngx Jan 23, 2024
3fa1756
Set min reporting period to 1h
jjngx Jan 24, 2024
c78e3fd
Merge branch 'main' into feat/telemetry
jjngx Jan 24, 2024
922bbe6
Use temp exporter for sending data
jjngx Jan 24, 2024
da52eed
Merge branch 'main' into feat/telemetry
jjngx Jan 24, 2024
2b8e4ad
Merge branch 'main' into feat/telemetry
jjngx Jan 25, 2024
301ed1e
Merge branch 'main' into feat/telemetry
jjngx Jan 26, 2024
cf31f97
Return fake nginx version
jjngx Jan 29, 2024
e249b56
Revert nginx version check changes
shaun-nx Jan 31, 2024
22ca1e8
Merge branch 'main' into feat/telemetry
shaun-nx Jan 31, 2024
37bf9fe
Merge branch 'main' into feat/telemetry
jjngx Feb 6, 2024
e248aa9
Set min reporting time to 1m
jjngx Feb 6, 2024
ec8678d
Merge branch 'main' into feat/telemetry
jjngx Feb 6, 2024
a56bc49
Merge branch 'main' into feat/telemetry
jjngx Feb 6, 2024
2cae174
Set default reporting period and add unit test for new telemetry coll…
shaun-nx Feb 6, 2024
d401d49
Add telemetry reporting flag to helm values
shaun-nx Feb 6, 2024
1d04236
Fix telemetry unit test
shaun-nx Feb 6, 2024
cebd70c
Merge branch 'main' into feat/telemetry
shaun-nx Feb 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions cmd/nginx-ingress/flags.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package main

import (
"errors"
"flag"
"fmt"
"net"
"os"
"regexp"
"strconv"
"strings"
"time"

"github.com/golang/glog"
api_v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -201,6 +203,9 @@

enableDynamicSSLReload = flag.Bool(dynamicSSLReloadParam, true, "Enable reloading of SSL Certificates without restarting the NGINX process.")

enableTelemetryReporting = flag.Bool("enable-telemetry-reporting", true, "Enable gathering and reporting of product related telemetry.")
oseoin marked this conversation as resolved.
Show resolved Hide resolved
telemetryReportingPeriod = flag.String("telemetry-reporting-period", "24h", "Period at which product telemetry is reported.")

startupCheckFn func() error
)

Expand Down Expand Up @@ -384,6 +389,12 @@
glog.Fatalf("Invalid value for app-protect-log-level: %v", *appProtectLogLevel)
}
}

if telemetryReportingPeriod != nil {
if err := validateReportingPeriod(*telemetryReportingPeriod); err != nil {
glog.Fatalf("Invalid value for telemetry-reporting-period: %v", err)
}

Check warning on line 396 in cmd/nginx-ingress/flags.go

View check run for this annotation

Codecov / codecov/patch

cmd/nginx-ingress/flags.go#L393-L396

Added lines #L393 - L396 were not covered by tests
}
}

// validateNamespaceNames validates the namespaces are in the correct format
Expand Down Expand Up @@ -489,3 +500,15 @@
}
return nil
}

// validateReportingPeriod checks if the reporting period parameter can be parsed.
func validateReportingPeriod(period string) error {
duration, err := time.ParseDuration(period)
if err != nil {
return err
}
if duration.Minutes() < 60 {
return errors.New("invalid reporting period, expected minimum 1h")
}
return nil
}
15 changes: 11 additions & 4 deletions cmd/nginx-ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
)

// Injected during build
var version string
var (
version string
)

const (
nginxVersionLabel = "app.nginx.org/version"
Expand Down Expand Up @@ -199,6 +201,8 @@
ExternalDNSEnabled: *enableExternalDNS,
IsIPV6Disabled: *disableIPV6,
WatchNamespaceLabel: *watchNamespaceLabel,
TelemetryReportPeriod: *telemetryReportingPeriod,
EnableTelemetryReporting: *enableTelemetryReporting,

Check warning on line 205 in cmd/nginx-ingress/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/nginx-ingress/main.go#L204-L205

Added lines #L204 - L205 were not covered by tests
}

lbc := k8s.NewLoadBalancerController(lbcInput)
Expand Down Expand Up @@ -790,9 +794,12 @@
}

func updateSelfWithVersionInfo(kubeClient *kubernetes.Clientset, version, nginxVersion, appProtectVersion string, maxRetries int, waitTime time.Duration) {
nginxVer := strings.TrimSuffix(strings.Split(nginxVersion, "/")[1], "\n")
replacer := strings.NewReplacer(" ", "-", "(", "", ")", "")
nginxVer = replacer.Replace(nginxVer)
var nginxVer string
if nginxVersion != "" {
nginxVer = strings.TrimSuffix(strings.Split(nginxVersion, "/")[1], "\n")
replacer := strings.NewReplacer(" ", "-", "(", "", ")", "")
nginxVer = replacer.Replace(nginxVer)
}

Check warning on line 802 in cmd/nginx-ingress/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/nginx-ingress/main.go#L797-L802

Added lines #L797 - L802 were not covered by tests
jjngx marked this conversation as resolved.
Show resolved Hide resolved
podUpdated := false

for i := 0; (i < maxRetries || maxRetries == 0) && !podUpdated; i++ {
Expand Down
24 changes: 24 additions & 0 deletions cmd/nginx-ingress/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,27 @@ func TestValidateNamespaces(t *testing.T) {
}
}
}

func TestValidateReportingPeriodWithInvalidInput(t *testing.T) {
t.Parallel()

periods := []string{"", "-1", "1x", "abc", "-", "30s", "10ms", "30m", "59m", "0h"}
for _, p := range periods {
err := validateReportingPeriod(p)
if err == nil {
t.Errorf("want error on invalid period %s, got nil", p)
}
}
}

func TestValidateReportingPeriodWithValidInput(t *testing.T) {
t.Parallel()

periods := []string{"1h", "24h"}
for _, p := range periods {
err := validateReportingPeriod(p)
if err != nil {
t.Error(err)
}
}
}
30 changes: 30 additions & 0 deletions internal/k8s/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
"sync"
"time"

"github.com/nginxinc/kubernetes-ingress/internal/telemetry"

"github.com/nginxinc/kubernetes-ingress/pkg/apis/dos/v1beta1"
"golang.org/x/exp/maps"

Expand Down Expand Up @@ -161,6 +163,8 @@
enableBatchReload bool
isIPV6Disabled bool
namespaceWatcherController cache.Controller
telemetryCollector *telemetry.Collector
telemetryChan chan struct{}
}

var keyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc
Expand Down Expand Up @@ -206,6 +210,8 @@
ExternalDNSEnabled bool
IsIPV6Disabled bool
WatchNamespaceLabel string
TelemetryReportPeriod string
EnableTelemetryReporting bool
}

// NewLoadBalancerController creates a controller
Expand Down Expand Up @@ -271,6 +277,18 @@
lbc.externalDNSController = ed_controller.NewController(ed_controller.BuildOpts(context.TODO(), lbc.namespaceList, lbc.recorder, lbc.confClient, input.ResyncPeriod, isDynamicNs))
}

// NIC Telemetry Reporting
if input.EnableTelemetryReporting {
lbc.telemetryChan = make(chan struct{})
collector, err := telemetry.NewCollector(
telemetry.WithTimePeriod(input.TelemetryReportPeriod),
)
if err != nil {
glog.Fatalf("failed to initialize telemetry collector: %v", err)
}
lbc.telemetryCollector = collector

Check warning on line 289 in internal/k8s/controller.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/controller.go#L281-L289

Added lines #L281 - L289 were not covered by tests
}

glog.V(3).Infof("Nginx Ingress Controller has class: %v", input.IngressClass)

lbc.namespacedInformers = make(map[string]*namespacedInformer)
Expand Down Expand Up @@ -683,10 +701,22 @@
if lbc.externalDNSController != nil {
go lbc.externalDNSController.Run(lbc.ctx.Done())
}

if lbc.leaderElector != nil {
go lbc.leaderElector.Run(lbc.ctx)
}

if lbc.telemetryCollector != nil {
go func(ctx context.Context) {
select {
case <-lbc.telemetryChan:
lbc.telemetryCollector.Start(lbc.ctx)
case <-ctx.Done():
return

Check warning on line 715 in internal/k8s/controller.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/controller.go#L709-L715

Added lines #L709 - L715 were not covered by tests
}
}(lbc.ctx)
}

for _, nif := range lbc.namespacedInformers {
nif.start()
}
Expand Down
4 changes: 4 additions & 0 deletions internal/k8s/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@
return leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
glog.V(3).Info("started leading")
// Closing this channel allows the leader to start the telemetry reporting process
if lbc.telemetryChan != nil {
close(lbc.telemetryChan)
}

Check warning on line 64 in internal/k8s/leader.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/leader.go#L61-L64

Added lines #L61 - L64 were not covered by tests
if lbc.reportIngressStatus {
ingresses := lbc.configuration.GetResourcesWithFilter(resourceFilter{Ingresses: true})

Expand Down
2 changes: 1 addition & 1 deletion internal/nginx/fake_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@
// Version provides a fake implementation of Version.
func (*FakeManager) Version() Version {
glog.V(3).Info("Printing nginx version")
return Version{}
return NewVersion("nginx version: nginx/1.25.3 (nginx-plus-r31)")

Check warning on line 106 in internal/nginx/fake_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/nginx/fake_manager.go#L106

Added line #L106 was not covered by tests
jjngx marked this conversation as resolved.
Show resolved Hide resolved
}

// Start provides a fake implementation of Start.
Expand Down
135 changes: 135 additions & 0 deletions internal/telemetry/telemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Package telemetry provides functionality for collecting and exporting NIC telemetry data.
package telemetry

import (
"context"
"fmt"
"io"
"time"

"github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/wait"
)

// DiscardExporter is a temporary exporter
// for discarding collected telemetry data.
var DiscardExporter = Exporter{Endpoint: io.Discard}

// Exporter represents a temporary telemetry data exporter.
type Exporter struct {
Endpoint io.Writer
}

// Export takes context and trace data and writes to the endpoint.
func (e *Exporter) Export(_ context.Context, td TraceData) error {
// Note: exporting functionality will be implemented in a separate module.
fmt.Fprintf(e.Endpoint, "%+v", td)
return nil
}

// TraceData holds collected NIC telemetry data.
type TraceData struct {
// Numer of VirtualServers
VSCount int
// Number of TransportServers
TSCount int

// TODO
// Add more fields for NIC data points
}

// Option is a functional option used for configuring TraceReporter.
type Option func(*Collector) error

// WithTimePeriod configures reporting time on TraceReporter.
func WithTimePeriod(period string) Option {
return func(c *Collector) error {
d, err := time.ParseDuration(period)
if err != nil {
return err
}

Check warning on line 50 in internal/telemetry/telemetry.go

View check run for this annotation

Codecov / codecov/patch

internal/telemetry/telemetry.go#L49-L50

Added lines #L49 - L50 were not covered by tests
c.Period = d
return nil
}
}

// WithExporter configures telemetry collector to use given exporter.
//
// This may change in the future when we use exporter implemented
// in the external module.
func WithExporter(e Exporter) Option {
return func(c *Collector) error {
c.Exporter = e
return nil
}
}

// Collector is NIC telemetry data collector.
type Collector struct {
Period time.Duration

// Exporter is a temp exporter for exporting telemetry data.
// The concrete implementation will be implemented in a separate module.
Exporter Exporter
}

// NewCollector takes 0 or more options and creates a new TraceReporter.
// If no options are provided, NewReporter returns TraceReporter
// configured to gather data every 24h.
func NewCollector(opts ...Option) (*Collector, error) {
c := Collector{
Period: 24 * time.Hour,
Exporter: DiscardExporter, // Use DiscardExporter until the real exporter is available.
}
for _, o := range opts {
if err := o(&c); err != nil {
return nil, err
}

Check warning on line 87 in internal/telemetry/telemetry.go

View check run for this annotation

Codecov / codecov/patch

internal/telemetry/telemetry.go#L86-L87

Added lines #L86 - L87 were not covered by tests
}
return &c, nil
}

// BuildReport takes context and builds report from gathered telemetry data.
func (c *Collector) BuildReport(context.Context) (TraceData, error) {
dt := TraceData{}

// TODO: Implement handling and logging errors for each collected data point

return dt, nil
}

// Collect collects and exports telemetry data.
// It exports data using provided exporter.
func (c *Collector) Collect(ctx context.Context) {
glog.V(3).Info("Collecting telemetry data")
traceData, err := c.BuildReport(ctx)
if err != nil {
glog.Errorf("Error collecting telemetry data: %v", err)
}

Check warning on line 108 in internal/telemetry/telemetry.go

View check run for this annotation

Codecov / codecov/patch

internal/telemetry/telemetry.go#L107-L108

Added lines #L107 - L108 were not covered by tests
err = c.Exporter.Export(ctx, traceData)
if err != nil {
glog.Errorf("Error exporting telemetry data: %v", err)
}

Check warning on line 112 in internal/telemetry/telemetry.go

View check run for this annotation

Codecov / codecov/patch

internal/telemetry/telemetry.go#L111-L112

Added lines #L111 - L112 were not covered by tests
glog.V(3).Info("Exported telemetry data")
}

// Start starts running NIC Telemetry Collector.
func (c *Collector) Start(ctx context.Context) {
wait.JitterUntilWithContext(ctx, c.Collect, c.Period, 0.1, true)

Check warning on line 118 in internal/telemetry/telemetry.go

View check run for this annotation

Codecov / codecov/patch

internal/telemetry/telemetry.go#L117-L118

Added lines #L117 - L118 were not covered by tests
}

// GetVSCount returns number of VirtualServers in watched namespaces.
//
// Note: this is a placeholder function.
func (c *Collector) GetVSCount() int {
// Placeholder function
return 0

Check warning on line 126 in internal/telemetry/telemetry.go

View check run for this annotation

Codecov / codecov/patch

internal/telemetry/telemetry.go#L124-L126

Added lines #L124 - L126 were not covered by tests
}

// GetTSCount returns number of TransportServers in watched namespaces.
//
// Note: this is a placeholder function.
func (c *Collector) GetTSCount() int {
// Placeholder function
return 0

Check warning on line 134 in internal/telemetry/telemetry.go

View check run for this annotation

Codecov / codecov/patch

internal/telemetry/telemetry.go#L132-L134

Added lines #L132 - L134 were not covered by tests
}
Loading
Loading