Skip to content

Commit

Permalink
Merge pull request #97 from solarwinds/NH-78295-grpcreporter-separate…
Browse files Browse the repository at this point in the history
…-get-update-setting

NH-78295 Split grpcReporter `getSetting`
  • Loading branch information
tammy-baylis-swi authored May 30, 2024
2 parents 9759b88 + d9634c3 commit 2a7ba10
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 38 deletions.
70 changes: 41 additions & 29 deletions internal/reporter/reporter_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ import (
"encoding/binary"
"errors"
"fmt"
"io"
"math"
"net"
"net/http"
"net/http/httputil"
"net/url"
"os"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/google/uuid"
"github.com/solarwinds/apm-go/internal/config"
"github.com/solarwinds/apm-go/internal/constants"
Expand All @@ -34,17 +46,6 @@ import (
"github.com/solarwinds/apm-go/internal/oboe"
"github.com/solarwinds/apm-go/internal/uams"
"github.com/solarwinds/apm-go/internal/utils"
"io"
"math"
"net"
"net/http"
"net/http/httputil"
"net/url"
"os"
"strings"
"sync"
"sync/atomic"
"time"

"google.golang.org/grpc/credentials"
"google.golang.org/grpc/encoding/gzip"
Expand Down Expand Up @@ -89,7 +90,7 @@ ftgwcxyEq5SkiR+6BCwdzAMqADV37TzXDHLjwSrMIrgLV5xZM20Kk6chxI5QAr/f
// These are hard-coded parameters for the gRPC reporter. Any of them become
// configurable in future versions will be moved to package config.
// TODO: use time.Time
grpcGetSettingsIntervalDefault = 30 // default settings retrieval interval in seconds
grpcGetAndUpdateSettingsIntervalDefault = 30 // default settings retrieval interval in seconds
grpcSettingsTimeoutCheckIntervalDefault = 10 // default check interval for timed out settings in seconds
grpcPingIntervalDefault = 20 // default interval for keep alive pings in seconds
grpcRetryDelayInitial = 500 // initial connection/send retry delay in milliseconds
Expand Down Expand Up @@ -218,7 +219,7 @@ func (c *grpcConnection) Close() {
type grpcReporter struct {
conn *grpcConnection // used for all RPC calls
collectMetricInterval int32 // metrics flush interval in seconds
getSettingsInterval int // settings retrieval interval in seconds
getAndUpdateSettingsInterval int // settings retrieval interval in seconds
settingsTimeoutCheckInterval int // check interval for timed out settings in seconds

serviceKey *uatomic.String // service key
Expand Down Expand Up @@ -302,7 +303,7 @@ func newGRPCReporter(otelServiceName string, registry metrics.LegacyRegistry, o
conn: grpcConn,

collectMetricInterval: metrics.ReportingIntervalDefault,
getSettingsInterval: grpcGetSettingsIntervalDefault,
getAndUpdateSettingsInterval: grpcGetAndUpdateSettingsIntervalDefault,
settingsTimeoutCheckInterval: grpcSettingsTimeoutCheckIntervalDefault,

serviceKey: uatomic.NewString(config.GetServiceKey()),
Expand Down Expand Up @@ -370,7 +371,7 @@ func (r *grpcReporter) start() {
go r.statusSender()

// start up long-running goroutine periodicTasks() which kicks off periodic tasks like
// collectMetrics() and getSettings()
// collectMetrics() and getAndUpdateSettings()
if !periodicTasksDisabled {
go r.periodicTasks()
}
Expand Down Expand Up @@ -557,28 +558,28 @@ func (c *grpcConnection) reconnect() error {
return c.connect()
}

// long-running goroutine that kicks off periodic tasks like collectMetrics() and getSettings()
// long-running goroutine that kicks off periodic tasks like collectMetrics() and getAndUpdateSettings()
func (r *grpcReporter) periodicTasks() {
defer log.Info("periodicTasks goroutine exiting.")

// set up tickers
collectMetricsTicker := time.NewTimer(r.collectMetricsNextInterval())
getSettingsTicker := time.NewTimer(0)
getAndUpdateSettingsTicker := time.NewTimer(0)
settingsTimeoutCheckTicker := time.NewTimer(time.Duration(r.settingsTimeoutCheckInterval) * time.Second)

defer func() {
collectMetricsTicker.Stop()
getSettingsTicker.Stop()
getAndUpdateSettingsTicker.Stop()
settingsTimeoutCheckTicker.Stop()
r.conn.pingTicker.Stop()
}()

// set up 'ready' channels to indicate if a goroutine has terminated
collectMetricsReady := make(chan bool, 1)
getSettingsReady := make(chan bool, 1)
getAndUpdateSettingsReady := make(chan bool, 1)
settingsTimeoutCheckReady := make(chan bool, 1)
collectMetricsReady <- true
getSettingsReady <- true
getAndUpdateSettingsReady <- true
settingsTimeoutCheckReady <- true

for {
Expand Down Expand Up @@ -613,13 +614,13 @@ func (r *grpcReporter) periodicTasks() {
go r.collectMetrics(collectMetricsReady)
default:
}
case <-getSettingsTicker.C: // get settings from collector
case <-getAndUpdateSettingsTicker.C: // get settings from collector
// set up ticker for next round
getSettingsTicker.Reset(time.Duration(r.getSettingsInterval) * time.Second)
getAndUpdateSettingsTicker.Reset(time.Duration(r.getAndUpdateSettingsInterval) * time.Second)
select {
case <-getSettingsReady:
case <-getAndUpdateSettingsReady:
// only kick off a new goroutine if the previous one has terminated
go r.getSettings(getSettingsReady)
go r.getAndUpdateSettings(getAndUpdateSettingsReady)
default:
}
case <-settingsTimeoutCheckTicker.C: // check for timed out settings
Expand Down Expand Up @@ -853,24 +854,35 @@ func (r *grpcReporter) sendMetrics(msgs [][]byte) {

// ================================ Settings Handling ====================================

// retrieves the settings from the collector
// retrieves the settings from the collector and updates APM with them
// ready a 'ready' channel to indicate if this routine has terminated
func (r *grpcReporter) getSettings(ready chan bool) {
func (r *grpcReporter) getAndUpdateSettings(ready chan bool) {
// notify caller that this routine has terminated (defered to end of routine)
defer func() { ready <- true }()

remoteSettings, err := r.getSettings()
if err == nil {
r.updateSettings(remoteSettings)
} else if errors.Is(err, errInvalidServiceKey) {
r.ShutdownNow()
} else {
log.Errorf("Could not getAndUpdateSettings: %s", err)
}
}

// retrieves settings from collector and returns them
func (r *grpcReporter) getSettings() (*collector.SettingsResult, error) {
method := newGetSettingsMethod(r.serviceKey.Load())
if err := r.conn.InvokeRPC(r.done, method); err == nil {
logger := log.Info
if method.Resp.Warning != "" {
logger = log.Warning
}
logger(method.CallSummary())
r.updateSettings(method.Resp)
} else if errors.Is(err, errInvalidServiceKey) {
r.ShutdownNow()
return method.Resp, nil
} else {
log.Infof("getSettings: %s", err)
return nil, err
}
}

Expand Down
19 changes: 10 additions & 9 deletions internal/reporter/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ package reporter

import (
"context"
"io"
stdlog "log"
"os"
"testing"
"time"

"github.com/solarwinds/apm-go/internal/config"
"github.com/solarwinds/apm-go/internal/constants"
"github.com/solarwinds/apm-go/internal/host"
Expand All @@ -29,11 +35,6 @@ import (
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
"io"
stdlog "log"
"os"
"testing"
"time"

"strings"

Expand Down Expand Up @@ -100,7 +101,7 @@ func TestGRPCReporter(t *testing.T) {

// The reporter becomes ready after it has got the default setting.
ready := make(chan bool, 1)
r.getSettings(ready)
r.getAndUpdateSettings(ready)
ctxTm2, cancel2 := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel2()
require.True(t, r.WaitForReady(ctxTm2))
Expand All @@ -124,7 +125,7 @@ func TestGRPCReporter(t *testing.T) {
require.Equal(t, TestServiceKey, r.serviceKey.Load())

require.Equal(t, int32(metrics.ReportingIntervalDefault), r.collectMetricInterval)
require.Equal(t, grpcGetSettingsIntervalDefault, r.getSettingsInterval)
require.Equal(t, grpcGetAndUpdateSettingsIntervalDefault, r.getAndUpdateSettingsInterval)
require.Equal(t, grpcSettingsTimeoutCheckIntervalDefault, r.settingsTimeoutCheckInterval)

time.Sleep(time.Second)
Expand Down Expand Up @@ -505,7 +506,7 @@ func testProxy(t *testing.T, proxyUrl string) {

// The reporter becomes ready after it has got the default setting.
ready := make(chan bool, 1)
r.getSettings(ready)
r.getAndUpdateSettings(ready)
ctxTm2, cancel2 := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel2()
require.True(t, r.WaitForReady(ctxTm2))
Expand All @@ -532,7 +533,7 @@ func testProxy(t *testing.T, proxyUrl string) {
require.Equal(t, TestServiceKey, r.serviceKey.Load())

require.Equal(t, int32(metrics.ReportingIntervalDefault), r.collectMetricInterval)
require.Equal(t, grpcGetSettingsIntervalDefault, r.getSettingsInterval)
require.Equal(t, grpcGetAndUpdateSettingsIntervalDefault, r.getAndUpdateSettingsInterval)
require.Equal(t, grpcSettingsTimeoutCheckIntervalDefault, r.settingsTimeoutCheckInterval)

time.Sleep(time.Second)
Expand Down

0 comments on commit 2a7ba10

Please sign in to comment.