Skip to content

Commit

Permalink
Live debugging service with prometheus.relabel (#797)
Browse files Browse the repository at this point in the history
* create xray service to manage the debug streams

* add xray support to the prometheus.relabel component

* add xray endpoint to the api

* rename handler to manager

* defer cleanup func

* rework to add multi-streams support

* more tests

* additional multi delete test

* add todo comment for buffer size

* add error checks

* improve naming and readability, split interfaces and add a check to avoid expensive string computation

* move register and isregistered to a new debugRegistry interface
  • Loading branch information
wildum authored Jun 3, 2024
1 parent f0283e0 commit 1523af0
Show file tree
Hide file tree
Showing 9 changed files with 386 additions and 28 deletions.
3 changes: 3 additions & 0 deletions internal/alloy/componenttest/componenttest.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/grafana/alloy/internal/service/labelstore"
"github.com/grafana/alloy/internal/service/livedebugging"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"

Expand Down Expand Up @@ -163,6 +164,8 @@ func (c *Controller) buildComponent(dataPath string, args component.Arguments) (
switch name {
case labelstore.ServiceName:
return labelstore.New(nil, prometheus.DefaultRegisterer), nil
case livedebugging.ServiceName:
return livedebugging.NewLiveDebugging(), nil
default:
return nil, fmt.Errorf("no service named %s defined", name)
}
Expand Down
13 changes: 9 additions & 4 deletions internal/alloycli/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/grafana/alloy/internal/service"
httpservice "github.com/grafana/alloy/internal/service/http"
"github.com/grafana/alloy/internal/service/labelstore"
"github.com/grafana/alloy/internal/service/livedebugging"
otel_service "github.com/grafana/alloy/internal/service/otel"
remotecfgservice "github.com/grafana/alloy/internal/service/remotecfg"
uiservice "github.com/grafana/alloy/internal/service/ui"
Expand Down Expand Up @@ -272,8 +273,11 @@ func (fr *alloyRun) Run(configPath string) error {
return fmt.Errorf("failed to create the remotecfg service: %w", err)
}

liveDebuggingService := livedebugging.New()

uiService := uiservice.New(uiservice.Options{
UIPrefix: fr.uiPrefix,
UIPrefix: fr.uiPrefix,
DebugCallbackManager: liveDebuggingService.Data().(livedebugging.DebugCallbackManager),
})

otelService := otel_service.New(l)
Expand All @@ -291,12 +295,13 @@ func (fr *alloyRun) Run(configPath string) error {
Reg: reg,
MinStability: fr.minStability,
Services: []service.Service{
httpService,
uiService,
clusterService,
otelService,
httpService,
labelService,
liveDebuggingService,
otelService,
remoteCfgService,
uiService,
},
})

Expand Down
27 changes: 23 additions & 4 deletions internal/component/prometheus/relabel/relabel.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/grafana/alloy/internal/component/prometheus"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/service/labelstore"
"github.com/grafana/alloy/internal/service/livedebugging"
lru "github.com/hashicorp/golang-lru/v2"
prometheus_client "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/exemplar"
Expand All @@ -22,9 +23,11 @@ import (
"go.uber.org/atomic"
)

const name = "prometheus.relabel"

func init() {
component.Register(component.Registration{
Name: "prometheus.relabel",
Name: name,
Stability: featuregate.StabilityGenerallyAvailable,
Args: Arguments{},
Exports: Exports{},
Expand Down Expand Up @@ -85,6 +88,8 @@ type Component struct {
exited atomic.Bool
ls labelstore.LabelStore

debugDataPublisher livedebugging.DebugDataPublisher

cacheMut sync.RWMutex
cache *lru.Cache[uint64, *labelAndID]
}
Expand All @@ -99,14 +104,22 @@ func New(o component.Options, args Arguments) (*Component, error) {
if err != nil {
return nil, err
}

debugDataPublisher, err := o.GetServiceData(livedebugging.ServiceName)
if err != nil {
return nil, err
}
debugDataPublisher.(livedebugging.DebugDataPublisher).Register(name)

data, err := o.GetServiceData(labelstore.ServiceName)
if err != nil {
return nil, err
}
c := &Component{
opts: o,
cache: cache,
ls: data.(labelstore.LabelStore),
opts: o,
cache: cache,
ls: data.(labelstore.LabelStore),
debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher),
}
c.metricsProcessed = prometheus_client.NewCounter(prometheus_client.CounterOpts{
Name: "alloy_prometheus_relabel_metrics_processed",
Expand Down Expand Up @@ -259,6 +272,12 @@ func (c *Component) relabel(val float64, lbls labels.Labels) labels.Labels {
// Set the cache size to the cache.len
// TODO(@mattdurham): Instead of setting this each time could collect on demand for better performance.
c.cacheSize.Set(float64(c.cache.Len()))

componentID := livedebugging.ComponentID(c.opts.ID)
if c.debugDataPublisher.IsActive(componentID) {
c.debugDataPublisher.Publish(componentID, fmt.Sprintf("%s => %s", lbls.String(), relabelled.String()))
}

return relabelled
}

Expand Down
37 changes: 23 additions & 14 deletions internal/component/prometheus/relabel/relabel_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package relabel

import (
"fmt"
"math"
"strconv"
"testing"
Expand All @@ -13,6 +14,7 @@ import (
alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel"
"github.com/grafana/alloy/internal/component/prometheus"
"github.com/grafana/alloy/internal/service/labelstore"
"github.com/grafana/alloy/internal/service/livedebugging"
"github.com/grafana/alloy/internal/util"
"github.com/grafana/alloy/syntax"
prom "github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -67,13 +69,11 @@ func TestNil(t *testing.T) {
return ref, nil
}))
relabeller, err := New(component.Options{
ID: "1",
Logger: util.TestAlloyLogger(t),
OnStateChange: func(e component.Exports) {},
Registerer: prom.NewRegistry(),
GetServiceData: func(name string) (interface{}, error) {
return labelstore.New(nil, prom.DefaultRegisterer), nil
},
ID: "1",
Logger: util.TestAlloyLogger(t),
OnStateChange: func(e component.Exports) {},
Registerer: prom.NewRegistry(),
GetServiceData: getServiceData,
}, Arguments{
ForwardTo: []storage.Appendable{fanout},
MetricRelabelConfigs: []*alloy_relabel.Config{
Expand Down Expand Up @@ -154,13 +154,11 @@ func generateRelabel(t *testing.T) *Component {
return ref, nil
}))
relabeller, err := New(component.Options{
ID: "1",
Logger: util.TestAlloyLogger(t),
OnStateChange: func(e component.Exports) {},
Registerer: prom.NewRegistry(),
GetServiceData: func(name string) (interface{}, error) {
return labelstore.New(nil, prom.DefaultRegisterer), nil
},
ID: "1",
Logger: util.TestAlloyLogger(t),
OnStateChange: func(e component.Exports) {},
Registerer: prom.NewRegistry(),
GetServiceData: getServiceData,
}, Arguments{
ForwardTo: []storage.Appendable{fanout},
MetricRelabelConfigs: []*alloy_relabel.Config{
Expand Down Expand Up @@ -225,3 +223,14 @@ func TestRuleGetter(t *testing.T) {
require.Equal(t, gotUpdated[0].SourceLabels, gotOriginal[0].SourceLabels)
require.Equal(t, gotUpdated[0].Regex, gotOriginal[0].Regex)
}

func getServiceData(name string) (interface{}, error) {
switch name {
case labelstore.ServiceName:
return labelstore.New(nil, prom.DefaultRegisterer), nil
case livedebugging.ServiceName:
return livedebugging.NewLiveDebugging(), nil
default:
return nil, fmt.Errorf("service not found %s", name)
}
}
88 changes: 88 additions & 0 deletions internal/service/livedebugging/livedebugging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package livedebugging

import "sync"

type ComponentName string
type ComponentID string
type CallbackID string

// DebugCallbackManager is used to manage live debugging callbacks.
type DebugCallbackManager interface {
DebugRegistry
// AddCallback sets a callback for a given componentID.
// The callback is used to send debugging data to live debugging consumers.
AddCallback(callbackID CallbackID, componentID ComponentID, callback func(string))
// DeleteCallback deletes a callback for a given componentID.
DeleteCallback(callbackID CallbackID, componentID ComponentID)
}

// DebugDataPublisher is used by components to push information to live debugging consumers.
type DebugDataPublisher interface {
DebugRegistry
// Publish sends debugging data for a given componentID.
Publish(componentID ComponentID, data string)
// IsActive returns true when at least one consumer is listening for debugging data for the given componentID.
IsActive(componentID ComponentID) bool
}

// DebugRegistry is used to keep track of the components that supports the live debugging functionality.
type DebugRegistry interface {
// Register a component by name.
Register(componentName ComponentName)
// IsRegistered returns true if a component has live debugging support.
IsRegistered(componentName ComponentName) bool
}

type liveDebugging struct {
loadMut sync.RWMutex
callbacks map[ComponentID]map[CallbackID]func(string)
registeredComponents map[ComponentName]struct{}
}

var _ DebugCallbackManager = &liveDebugging{}
var _ DebugDataPublisher = &liveDebugging{}

// NewLiveDebugging creates a new instance of liveDebugging.
func NewLiveDebugging() *liveDebugging {
return &liveDebugging{
callbacks: make(map[ComponentID]map[CallbackID]func(string)),
registeredComponents: make(map[ComponentName]struct{}),
}
}

func (s *liveDebugging) Publish(componentID ComponentID, data string) {
s.loadMut.RLock()
defer s.loadMut.RUnlock()
for _, callback := range s.callbacks[componentID] {
callback(data)
}
}

func (s *liveDebugging) IsActive(componentID ComponentID) bool {
_, exist := s.callbacks[componentID]
return exist
}

func (s *liveDebugging) AddCallback(callbackID CallbackID, componentID ComponentID, callback func(string)) {
s.loadMut.Lock()
defer s.loadMut.Unlock()
if _, ok := s.callbacks[componentID]; !ok {
s.callbacks[componentID] = make(map[CallbackID]func(string))
}
s.callbacks[componentID][callbackID] = callback
}

func (s *liveDebugging) DeleteCallback(callbackID CallbackID, componentID ComponentID) {
s.loadMut.Lock()
defer s.loadMut.Unlock()
delete(s.callbacks[componentID], callbackID)
}

func (s *liveDebugging) Register(componentName ComponentName) {
s.registeredComponents[componentName] = struct{}{}
}

func (s *liveDebugging) IsRegistered(componentName ComponentName) bool {
_, exist := s.registeredComponents[componentName]
return exist
}
92 changes: 92 additions & 0 deletions internal/service/livedebugging/livedebugging_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package livedebugging

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestRegister(t *testing.T) {
livedebugging := NewLiveDebugging()
require.False(t, livedebugging.IsRegistered("type1"))
livedebugging.Register("type1")
require.True(t, livedebugging.IsRegistered("type1"))
// registering a component name that has already been registered does not do anything
require.NotPanics(t, func() { livedebugging.Register("type1") })
}

func TestStream(t *testing.T) {
livedebugging := NewLiveDebugging()
componentID := ComponentID("component1")
CallbackID := CallbackID("callback1")

var receivedData string
callback := func(data string) {
receivedData = data
}
require.False(t, livedebugging.IsActive(componentID))
livedebugging.AddCallback(CallbackID, componentID, callback)
require.True(t, livedebugging.IsActive(componentID))
require.Len(t, livedebugging.callbacks[componentID], 1)

livedebugging.Publish(componentID, "test data")
require.Equal(t, "test data", receivedData)
}

func TestStreamEmpty(t *testing.T) {
livedebugging := NewLiveDebugging()
componentID := ComponentID("component1")
require.NotPanics(t, func() { livedebugging.Publish(componentID, "test data") })
}

func TestMultipleStreams(t *testing.T) {
livedebugging := NewLiveDebugging()
componentID := ComponentID("component1")
callbackID1 := CallbackID("callback1")
callbackID2 := CallbackID("callback2")

var receivedData1 string
callback1 := func(data string) {
receivedData1 = data
}

var receivedData2 string
callback2 := func(data string) {
receivedData2 = data
}

livedebugging.AddCallback(callbackID1, componentID, callback1)
livedebugging.AddCallback(callbackID2, componentID, callback2)
require.Len(t, livedebugging.callbacks[componentID], 2)

livedebugging.Publish(componentID, "test data")
require.Equal(t, "test data", receivedData1)
require.Equal(t, "test data", receivedData2)
}

func TestDeleteCallback(t *testing.T) {
livedebugging := NewLiveDebugging()
componentID := ComponentID("component1")
callbackID1 := CallbackID("callback1")
callbackID2 := CallbackID("callback2")

callback1 := func(data string) {}
callback2 := func(data string) {}

livedebugging.AddCallback(callbackID1, componentID, callback1)
livedebugging.AddCallback(callbackID2, componentID, callback2)
require.Len(t, livedebugging.callbacks[componentID], 2)

// Deleting callbacks that don't exist should not panic
require.NotPanics(t, func() { livedebugging.DeleteCallback(callbackID1, "fakeComponentID") })
require.NotPanics(t, func() { livedebugging.DeleteCallback("fakeCallbackID", componentID) })

livedebugging.AddCallback(callbackID1, componentID, callback1)
livedebugging.AddCallback(callbackID2, componentID, callback2)

livedebugging.DeleteCallback(callbackID1, componentID)
require.Len(t, livedebugging.callbacks[componentID], 1)

livedebugging.DeleteCallback(callbackID2, componentID)
require.Empty(t, livedebugging.callbacks[componentID])
}
Loading

0 comments on commit 1523af0

Please sign in to comment.