From a0a015d30c6bb0d1f65b155093fac1d140078448 Mon Sep 17 00:00:00 2001 From: tammy-baylis-swi Date: Wed, 29 May 2024 16:58:54 -0700 Subject: [PATCH 1/7] Split grpcReporter getSetting into get and update --- internal/reporter/reporter_grpc.go | 46 +++++++++++++++++++----------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/internal/reporter/reporter_grpc.go b/internal/reporter/reporter_grpc.go index ff36c398..18ba3758 100644 --- a/internal/reporter/reporter_grpc.go +++ b/internal/reporter/reporter_grpc.go @@ -22,6 +22,18 @@ import ( "encoding/binary" "errors" "fmt" + "io" + "math" + "net" + "net/http" + "net/http/httputil" + "net/url" + "os" + "strings" + "sync" + "sync/atomic" + "time" + "github.com/google/uuid" "github.com/solarwinds/apm-go/internal/config" "github.com/solarwinds/apm-go/internal/constants" @@ -34,17 +46,6 @@ import ( "github.com/solarwinds/apm-go/internal/oboe" "github.com/solarwinds/apm-go/internal/uams" "github.com/solarwinds/apm-go/internal/utils" - "io" - "math" - "net" - "net/http" - "net/http/httputil" - "net/url" - "os" - "strings" - "sync" - "sync/atomic" - "time" "google.golang.org/grpc/credentials" "google.golang.org/grpc/encoding/gzip" @@ -619,7 +620,7 @@ func (r *grpcReporter) periodicTasks() { select { case <-getSettingsReady: // only kick off a new goroutine if the previous one has terminated - go r.getSettings(getSettingsReady) + go r.getAndUpdateSettings(getSettingsReady) default: } case <-settingsTimeoutCheckTicker.C: // check for timed out settings @@ -853,25 +854,38 @@ func (r *grpcReporter) sendMetrics(msgs [][]byte) { // ================================ Settings Handling ==================================== -// retrieves the settings from the collector +// retrieves the settings from the collector and updates APM with them // ready a 'ready' channel to indicate if this routine has terminated -func (r *grpcReporter) getSettings(ready chan bool) { +func (r *grpcReporter) getAndUpdateSettings(ready chan bool) { // notify caller that this routine has terminated (defered to end of routine) defer func() { ready <- true }() + remoteSettings, err := r.getSettings() + if err == nil { + r.updateSettings(remoteSettings) + } else { + log.Error("Could not getAndUpdateSettings: %s", err) + } +} + +// retrieves settings from collector and returns them +func (r *grpcReporter) getSettings() (*collector.SettingsResult, error) { method := newGetSettingsMethod(r.serviceKey.Load()) - if err := r.conn.InvokeRPC(r.done, method); err == nil { + err := r.conn.InvokeRPC(r.done, method) + + if err == nil { logger := log.Info if method.Resp.Warning != "" { logger = log.Warning } logger(method.CallSummary()) - r.updateSettings(method.Resp) + return method.Resp, nil } else if errors.Is(err, errInvalidServiceKey) { r.ShutdownNow() } else { log.Infof("getSettings: %s", err) } + return nil, err } // updates the existing settings with the newly received From 59248a1cea2300176f0e0f04bc2da0c4ed1d8111 Mon Sep 17 00:00:00 2001 From: tammy-baylis-swi Date: Thu, 30 May 2024 10:10:52 -0700 Subject: [PATCH 2/7] Rename more grpcReporter settings things --- internal/reporter/reporter_grpc.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/internal/reporter/reporter_grpc.go b/internal/reporter/reporter_grpc.go index 18ba3758..cd568ccf 100644 --- a/internal/reporter/reporter_grpc.go +++ b/internal/reporter/reporter_grpc.go @@ -219,7 +219,7 @@ func (c *grpcConnection) Close() { type grpcReporter struct { conn *grpcConnection // used for all RPC calls collectMetricInterval int32 // metrics flush interval in seconds - getSettingsInterval int // settings retrieval interval in seconds + getAndUpdateSettingsInterval int // settings retrieval interval in seconds settingsTimeoutCheckInterval int // check interval for timed out settings in seconds serviceKey *uatomic.String // service key @@ -303,7 +303,7 @@ func newGRPCReporter(otelServiceName string, registry metrics.LegacyRegistry, o conn: grpcConn, collectMetricInterval: metrics.ReportingIntervalDefault, - getSettingsInterval: grpcGetSettingsIntervalDefault, + getAndUpdateSettingsInterval: grpcGetSettingsIntervalDefault, settingsTimeoutCheckInterval: grpcSettingsTimeoutCheckIntervalDefault, serviceKey: uatomic.NewString(config.GetServiceKey()), @@ -371,7 +371,7 @@ func (r *grpcReporter) start() { go r.statusSender() // start up long-running goroutine periodicTasks() which kicks off periodic tasks like - // collectMetrics() and getSettings() + // collectMetrics() and getAndUpdateSettings() if !periodicTasksDisabled { go r.periodicTasks() } @@ -558,28 +558,28 @@ func (c *grpcConnection) reconnect() error { return c.connect() } -// long-running goroutine that kicks off periodic tasks like collectMetrics() and getSettings() +// long-running goroutine that kicks off periodic tasks like collectMetrics() and getAndUpdateSettings() func (r *grpcReporter) periodicTasks() { defer log.Info("periodicTasks goroutine exiting.") // set up tickers collectMetricsTicker := time.NewTimer(r.collectMetricsNextInterval()) - getSettingsTicker := time.NewTimer(0) + getAndUpdateSettingsTicker := time.NewTimer(0) settingsTimeoutCheckTicker := time.NewTimer(time.Duration(r.settingsTimeoutCheckInterval) * time.Second) defer func() { collectMetricsTicker.Stop() - getSettingsTicker.Stop() + getAndUpdateSettingsTicker.Stop() settingsTimeoutCheckTicker.Stop() r.conn.pingTicker.Stop() }() // set up 'ready' channels to indicate if a goroutine has terminated collectMetricsReady := make(chan bool, 1) - getSettingsReady := make(chan bool, 1) + getAndUpdateSettingsReady := make(chan bool, 1) settingsTimeoutCheckReady := make(chan bool, 1) collectMetricsReady <- true - getSettingsReady <- true + getAndUpdateSettingsReady <- true settingsTimeoutCheckReady <- true for { @@ -614,13 +614,13 @@ func (r *grpcReporter) periodicTasks() { go r.collectMetrics(collectMetricsReady) default: } - case <-getSettingsTicker.C: // get settings from collector + case <-getAndUpdateSettingsTicker.C: // get settings from collector // set up ticker for next round - getSettingsTicker.Reset(time.Duration(r.getSettingsInterval) * time.Second) + getAndUpdateSettingsTicker.Reset(time.Duration(r.getAndUpdateSettingsInterval) * time.Second) select { - case <-getSettingsReady: + case <-getAndUpdateSettingsReady: // only kick off a new goroutine if the previous one has terminated - go r.getAndUpdateSettings(getSettingsReady) + go r.getAndUpdateSettings(getAndUpdateSettingsReady) default: } case <-settingsTimeoutCheckTicker.C: // check for timed out settings From 0cd1eb8b197d10fa9bc5570469d8b0e5c8bc04a2 Mon Sep 17 00:00:00 2001 From: tammy-baylis-swi Date: Thu, 30 May 2024 10:11:01 -0700 Subject: [PATCH 3/7] Fix test --- internal/reporter/reporter_test.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/internal/reporter/reporter_test.go b/internal/reporter/reporter_test.go index b5aab9ea..40e32edd 100644 --- a/internal/reporter/reporter_test.go +++ b/internal/reporter/reporter_test.go @@ -16,6 +16,12 @@ package reporter import ( "context" + "io" + stdlog "log" + "os" + "testing" + "time" + "github.com/solarwinds/apm-go/internal/config" "github.com/solarwinds/apm-go/internal/constants" "github.com/solarwinds/apm-go/internal/host" @@ -29,11 +35,6 @@ import ( "go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/otel/trace" "go.uber.org/atomic" - "io" - stdlog "log" - "os" - "testing" - "time" "strings" @@ -100,7 +101,7 @@ func TestGRPCReporter(t *testing.T) { // The reporter becomes ready after it has got the default setting. ready := make(chan bool, 1) - r.getSettings(ready) + r.getAndUpdateSettings(ready) ctxTm2, cancel2 := context.WithTimeout(context.Background(), time.Millisecond) defer cancel2() require.True(t, r.WaitForReady(ctxTm2)) @@ -124,7 +125,7 @@ func TestGRPCReporter(t *testing.T) { require.Equal(t, TestServiceKey, r.serviceKey.Load()) require.Equal(t, int32(metrics.ReportingIntervalDefault), r.collectMetricInterval) - require.Equal(t, grpcGetSettingsIntervalDefault, r.getSettingsInterval) + require.Equal(t, grpcGetSettingsIntervalDefault, r.getAndUpdateSettingsInterval) require.Equal(t, grpcSettingsTimeoutCheckIntervalDefault, r.settingsTimeoutCheckInterval) time.Sleep(time.Second) @@ -505,7 +506,7 @@ func testProxy(t *testing.T, proxyUrl string) { // The reporter becomes ready after it has got the default setting. ready := make(chan bool, 1) - r.getSettings(ready) + r.getAndUpdateSettings(ready) ctxTm2, cancel2 := context.WithTimeout(context.Background(), time.Millisecond) defer cancel2() require.True(t, r.WaitForReady(ctxTm2)) @@ -532,7 +533,7 @@ func testProxy(t *testing.T, proxyUrl string) { require.Equal(t, TestServiceKey, r.serviceKey.Load()) require.Equal(t, int32(metrics.ReportingIntervalDefault), r.collectMetricInterval) - require.Equal(t, grpcGetSettingsIntervalDefault, r.getSettingsInterval) + require.Equal(t, grpcGetSettingsIntervalDefault, r.getAndUpdateSettingsInterval) require.Equal(t, grpcSettingsTimeoutCheckIntervalDefault, r.settingsTimeoutCheckInterval) time.Sleep(time.Second) From 7441838e5e7accaedf39e1674f8a9e9c84581c8b Mon Sep 17 00:00:00 2001 From: tammy-baylis-swi Date: Thu, 30 May 2024 10:12:36 -0700 Subject: [PATCH 4/7] More renaming --- internal/reporter/reporter_grpc.go | 4 ++-- internal/reporter/reporter_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/reporter/reporter_grpc.go b/internal/reporter/reporter_grpc.go index cd568ccf..9207aecd 100644 --- a/internal/reporter/reporter_grpc.go +++ b/internal/reporter/reporter_grpc.go @@ -90,7 +90,7 @@ ftgwcxyEq5SkiR+6BCwdzAMqADV37TzXDHLjwSrMIrgLV5xZM20Kk6chxI5QAr/f // These are hard-coded parameters for the gRPC reporter. Any of them become // configurable in future versions will be moved to package config. // TODO: use time.Time - grpcGetSettingsIntervalDefault = 30 // default settings retrieval interval in seconds + grpcGetAndUpdateSettingsIntervalDefault = 30 // default settings retrieval interval in seconds grpcSettingsTimeoutCheckIntervalDefault = 10 // default check interval for timed out settings in seconds grpcPingIntervalDefault = 20 // default interval for keep alive pings in seconds grpcRetryDelayInitial = 500 // initial connection/send retry delay in milliseconds @@ -303,7 +303,7 @@ func newGRPCReporter(otelServiceName string, registry metrics.LegacyRegistry, o conn: grpcConn, collectMetricInterval: metrics.ReportingIntervalDefault, - getAndUpdateSettingsInterval: grpcGetSettingsIntervalDefault, + getAndUpdateSettingsInterval: grpcGetAndUpdateSettingsIntervalDefault, settingsTimeoutCheckInterval: grpcSettingsTimeoutCheckIntervalDefault, serviceKey: uatomic.NewString(config.GetServiceKey()), diff --git a/internal/reporter/reporter_test.go b/internal/reporter/reporter_test.go index 40e32edd..b187dc10 100644 --- a/internal/reporter/reporter_test.go +++ b/internal/reporter/reporter_test.go @@ -125,7 +125,7 @@ func TestGRPCReporter(t *testing.T) { require.Equal(t, TestServiceKey, r.serviceKey.Load()) require.Equal(t, int32(metrics.ReportingIntervalDefault), r.collectMetricInterval) - require.Equal(t, grpcGetSettingsIntervalDefault, r.getAndUpdateSettingsInterval) + require.Equal(t, grpcGetAndUpdateSettingsIntervalDefault, r.getAndUpdateSettingsInterval) require.Equal(t, grpcSettingsTimeoutCheckIntervalDefault, r.settingsTimeoutCheckInterval) time.Sleep(time.Second) @@ -533,7 +533,7 @@ func testProxy(t *testing.T, proxyUrl string) { require.Equal(t, TestServiceKey, r.serviceKey.Load()) require.Equal(t, int32(metrics.ReportingIntervalDefault), r.collectMetricInterval) - require.Equal(t, grpcGetSettingsIntervalDefault, r.getAndUpdateSettingsInterval) + require.Equal(t, grpcGetAndUpdateSettingsIntervalDefault, r.getAndUpdateSettingsInterval) require.Equal(t, grpcSettingsTimeoutCheckIntervalDefault, r.settingsTimeoutCheckInterval) time.Sleep(time.Second) From 6b47180cedac3f8f3e8cd18bba12f54e8dc7b941 Mon Sep 17 00:00:00 2001 From: tammy-baylis-swi Date: Thu, 30 May 2024 11:20:11 -0700 Subject: [PATCH 5/7] Fixes --- internal/reporter/reporter_grpc.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/reporter/reporter_grpc.go b/internal/reporter/reporter_grpc.go index 9207aecd..3f601585 100644 --- a/internal/reporter/reporter_grpc.go +++ b/internal/reporter/reporter_grpc.go @@ -864,16 +864,16 @@ func (r *grpcReporter) getAndUpdateSettings(ready chan bool) { if err == nil { r.updateSettings(remoteSettings) } else { - log.Error("Could not getAndUpdateSettings: %s", err) + log.Errorf("Could not getAndUpdateSettings: %s", err) } } // retrieves settings from collector and returns them func (r *grpcReporter) getSettings() (*collector.SettingsResult, error) { method := newGetSettingsMethod(r.serviceKey.Load()) - err := r.conn.InvokeRPC(r.done, method) - if err == nil { + var err error + if err := r.conn.InvokeRPC(r.done, method); err == nil { logger := log.Info if method.Resp.Warning != "" { logger = log.Warning From 95fc263b27b6ae133fc6e54551a4159bf9e941ee Mon Sep 17 00:00:00 2001 From: tammy-baylis-swi Date: Thu, 30 May 2024 11:33:07 -0700 Subject: [PATCH 6/7] More fix --- internal/reporter/reporter_grpc.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/reporter/reporter_grpc.go b/internal/reporter/reporter_grpc.go index 3f601585..e4fe78c6 100644 --- a/internal/reporter/reporter_grpc.go +++ b/internal/reporter/reporter_grpc.go @@ -872,7 +872,6 @@ func (r *grpcReporter) getAndUpdateSettings(ready chan bool) { func (r *grpcReporter) getSettings() (*collector.SettingsResult, error) { method := newGetSettingsMethod(r.serviceKey.Load()) - var err error if err := r.conn.InvokeRPC(r.done, method); err == nil { logger := log.Info if method.Resp.Warning != "" { @@ -882,10 +881,11 @@ func (r *grpcReporter) getSettings() (*collector.SettingsResult, error) { return method.Resp, nil } else if errors.Is(err, errInvalidServiceKey) { r.ShutdownNow() + return nil, err } else { log.Infof("getSettings: %s", err) + return nil, err } - return nil, err } // updates the existing settings with the newly received From d9634c36958c3790c5a62b3bc971269b05bf7d4d Mon Sep 17 00:00:00 2001 From: tammy-baylis-swi Date: Thu, 30 May 2024 11:44:47 -0700 Subject: [PATCH 7/7] Move getAndUpdateSettings r.shutdown --- internal/reporter/reporter_grpc.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/internal/reporter/reporter_grpc.go b/internal/reporter/reporter_grpc.go index e4fe78c6..b0ced062 100644 --- a/internal/reporter/reporter_grpc.go +++ b/internal/reporter/reporter_grpc.go @@ -863,6 +863,8 @@ func (r *grpcReporter) getAndUpdateSettings(ready chan bool) { remoteSettings, err := r.getSettings() if err == nil { r.updateSettings(remoteSettings) + } else if errors.Is(err, errInvalidServiceKey) { + r.ShutdownNow() } else { log.Errorf("Could not getAndUpdateSettings: %s", err) } @@ -871,7 +873,6 @@ func (r *grpcReporter) getAndUpdateSettings(ready chan bool) { // retrieves settings from collector and returns them func (r *grpcReporter) getSettings() (*collector.SettingsResult, error) { method := newGetSettingsMethod(r.serviceKey.Load()) - if err := r.conn.InvokeRPC(r.done, method); err == nil { logger := log.Info if method.Resp.Warning != "" { @@ -879,9 +880,6 @@ func (r *grpcReporter) getSettings() (*collector.SettingsResult, error) { } logger(method.CallSummary()) return method.Resp, nil - } else if errors.Is(err, errInvalidServiceKey) { - r.ShutdownNow() - return nil, err } else { log.Infof("getSettings: %s", err) return nil, err