Skip to content

Commit

Permalink
rework live debugging for otel components
Browse files Browse the repository at this point in the history
  • Loading branch information
wildum committed Feb 14, 2025
1 parent ec53660 commit c2c9bf4
Show file tree
Hide file tree
Showing 31 changed files with 392 additions and 487 deletions.
5 changes: 2 additions & 3 deletions internal/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ type DebugComponent interface {
DebugInfo() interface{}
}

// LiveDebugging is an interface used by the components that support the live debugging feature.
// LiveDebugging is a marker interface to check if a component supports live debugging.
type LiveDebugging interface {
// LiveDebugging is invoked when the number of consumers changes.
LiveDebugging(consumers int)
LiveDebugging() // This function is never called.
}
2 changes: 1 addition & 1 deletion internal/component/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,4 +288,4 @@ func toAlloyTargets(cache map[string]*targetgroup.Group) []Target {
return allTargets
}

func (c *Component) LiveDebugging(_ int) {}
func (c *Component) LiveDebugging() {}
16 changes: 7 additions & 9 deletions internal/component/discovery/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,12 @@ func (c *Component) Run(ctx context.Context) error {
c.changed()

componentID := livedebugging.ComponentID(c.opts.ID)
if c.debugDataPublisher.IsActive(componentID) {
c.debugDataPublisher.PublishIfActive(livedebugging.NewData(
componentID,
livedebugging.Target,
uint64(len(c.processes)),
func() string { return fmt.Sprintf("%s", c.processes) },
))
}
c.debugDataPublisher.PublishIfActive(livedebugging.NewData(
componentID,
livedebugging.Target,
uint64(len(c.processes)),
func() string { return fmt.Sprintf("%s", c.processes) },
))

return nil
}
Expand Down Expand Up @@ -114,4 +112,4 @@ func (c *Component) changed() {
})
}

func (c *Component) LiveDebugging(_ int) {}
func (c *Component) LiveDebugging() {}
2 changes: 1 addition & 1 deletion internal/component/discovery/relabel/relabel.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (c *Component) Update(args component.Arguments) error {
return nil
}

func (c *Component) LiveDebugging(_ int) {}
func (c *Component) LiveDebugging() {}

func componentMapToPromLabels(ls discovery.Target) labels.Labels {
res := make([]labels.Label, 0, len(ls))
Expand Down
2 changes: 1 addition & 1 deletion internal/component/loki/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,4 @@ func stagesChanged(prev, next []stages.StageConfig) bool {
return false
}

func (c *Component) LiveDebugging(_ int) {}
func (c *Component) LiveDebugging() {}
2 changes: 1 addition & 1 deletion internal/component/loki/relabel/relabel.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,4 @@ func (c *Component) process(e loki.Entry) model.LabelSet {
return relabeled
}

func (c *Component) LiveDebugging(_ int) {}
func (c *Component) LiveDebugging() {}
2 changes: 1 addition & 1 deletion internal/component/loki/secretfilter/secretfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,4 +382,4 @@ func (c *Component) Update(args component.Arguments) error {
return nil
}

func (c *Component) LiveDebugging(_ int) {}
func (c *Component) LiveDebugging() {}
38 changes: 16 additions & 22 deletions internal/component/otelcol/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
otelcomponent "go.opentelemetry.io/collector/component"
otelconnector "go.opentelemetry.io/collector/connector"
otelextension "go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pipeline"
sdkprometheus "go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/sdk/metric"
Expand All @@ -21,9 +22,10 @@ import (
"github.com/grafana/alloy/internal/component/otelcol"
otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config"
"github.com/grafana/alloy/internal/component/otelcol/internal/fanoutconsumer"
"github.com/grafana/alloy/internal/component/otelcol/internal/interceptorconsumer"
"github.com/grafana/alloy/internal/component/otelcol/internal/lazycollector"
"github.com/grafana/alloy/internal/component/otelcol/internal/lazyconsumer"
"github.com/grafana/alloy/internal/component/otelcol/internal/livedebuggingconsumer"
"github.com/grafana/alloy/internal/component/otelcol/internal/livedebuggingpublisher"
"github.com/grafana/alloy/internal/component/otelcol/internal/scheduler"
"github.com/grafana/alloy/internal/service/livedebugging"
"github.com/grafana/alloy/internal/util/zapadapter"
Expand Down Expand Up @@ -80,8 +82,7 @@ type Connector struct {
sched *scheduler.Scheduler
collector *lazycollector.Collector

liveDebuggingConsumer *livedebuggingconsumer.Consumer
debugDataPublisher livedebugging.DebugDataPublisher
debugDataPublisher livedebugging.DebugDataPublisher

args Arguments

Expand Down Expand Up @@ -130,10 +131,9 @@ func New(opts component.Options, f otelconnector.Factory, args Arguments) (*Conn
factory: f,
consumer: consumer,

liveDebuggingConsumer: livedebuggingconsumer.New(debugDataPublisher.(livedebugging.DebugDataPublisher), opts.ID),
debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher),
sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume),
collector: collector,
debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher),
sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume),
collector: collector,
}
if err := p.Update(args); err != nil {
return nil, err
Expand All @@ -151,8 +151,6 @@ func (p *Connector) Run(ctx context.Context) error {
// configuration for OpenTelemetry Collector connector configuration and manage
// the underlying OpenTelemetry Collector connector.
func (p *Connector) Update(args component.Arguments) error {
p.updateMut.Lock()
defer p.updateMut.Unlock()
p.args = args.(Arguments)

host := scheduler.NewHost(
Expand Down Expand Up @@ -198,8 +196,6 @@ func (p *Connector) Update(args component.Arguments) error {

next := p.args.NextConsumers()

liveDebuggingActive := p.debugDataPublisher.IsActive(livedebugging.ComponentID(p.opts.ID))

// Create instances of the connector from our factory for each of our
// supported telemetry signals.
var components []otelcomponent.Component
Expand All @@ -215,12 +211,14 @@ func (p *Connector) Update(args component.Arguments) error {
}

if len(next.Metrics) > 0 {
metrics := next.Metrics
if liveDebuggingActive {
metrics = append(metrics, p.liveDebuggingConsumer)
}
nextMetrics := fanoutconsumer.Metrics(metrics)
tracesConnector, err = p.factory.CreateTracesToMetrics(p.ctx, settings, connectorConfig, nextMetrics)
fanout := fanoutconsumer.Metrics(next.Metrics)
metricsInterceptor := interceptorconsumer.Metrics(fanout, false,
func(ctx context.Context, md pmetric.Metrics) error {
livedebuggingpublisher.PublishMetricsIfActive(p.debugDataPublisher, p.opts.ID, md, next.Metrics)
return fanout.ConsumeMetrics(ctx, md)
},
)
tracesConnector, err = p.factory.CreateTracesToMetrics(p.ctx, settings, connectorConfig, metricsInterceptor)
if err != nil && !errors.Is(err, pipeline.ErrSignalNotSupported) {
return err
} else if tracesConnector != nil {
Expand All @@ -231,8 +229,6 @@ func (p *Connector) Update(args component.Arguments) error {
return errors.New("unsupported connector type")
}

p.liveDebuggingConsumer.SetTargetConsumers(next.Metrics, next.Logs, next.Traces)

updateConsumersFunc := func() {
p.consumer.SetConsumers(tracesConnector, metricsConnector, logsConnector)
}
Expand All @@ -247,6 +243,4 @@ func (p *Connector) CurrentHealth() component.Health {
return p.sched.CurrentHealth()
}

func (p *Connector) LiveDebugging(_ int) {
p.Update(p.args)
}
func (p *Connector) LiveDebugging() {}
34 changes: 17 additions & 17 deletions internal/component/otelcol/connector/spanlogs/spanlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/otelcol"
"github.com/grafana/alloy/internal/component/otelcol/internal/fanoutconsumer"
"github.com/grafana/alloy/internal/component/otelcol/internal/interceptorconsumer"
"github.com/grafana/alloy/internal/component/otelcol/internal/lazyconsumer"
"github.com/grafana/alloy/internal/component/otelcol/internal/livedebuggingconsumer"
"github.com/grafana/alloy/internal/component/otelcol/internal/livedebuggingpublisher"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/grafana/alloy/internal/service/livedebugging"
"github.com/grafana/alloy/syntax"
"go.opentelemetry.io/collector/pdata/plog"
)

func init() {
Expand Down Expand Up @@ -82,8 +84,7 @@ type Component struct {

opts component.Options

liveDebuggingConsumer *livedebuggingconsumer.Consumer
debugDataPublisher livedebugging.DebugDataPublisher
debugDataPublisher livedebugging.DebugDataPublisher

args Arguments

Expand Down Expand Up @@ -113,10 +114,9 @@ func New(o component.Options, c Arguments) (*Component, error) {
}

res := &Component{
opts: o,
consumer: consumer,
liveDebuggingConsumer: livedebuggingconsumer.New(debugDataPublisher.(livedebugging.DebugDataPublisher), o.ID),
debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher),
opts: o,
consumer: consumer,
debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher),
}

if err := res.Update(c); err != nil {
Expand Down Expand Up @@ -147,22 +147,22 @@ func (c *Component) Update(newConfig component.Arguments) error {
defer c.updateMut.Unlock()
c.args = newConfig.(Arguments)

fanoutConsumer := c.args.Output.Logs
nextLogs := c.args.Output.Logs

if c.debugDataPublisher.IsActive(livedebugging.ComponentID(c.opts.ID)) {
fanoutConsumer = append(fanoutConsumer, c.liveDebuggingConsumer)
}

nextLogs := fanoutconsumer.Logs(fanoutConsumer)
fanout := fanoutconsumer.Logs(nextLogs)
logsInterceptor := interceptorconsumer.Logs(fanout, false,
func(ctx context.Context, ld plog.Logs) error {
livedebuggingpublisher.PublishLogsIfActive(c.debugDataPublisher, c.opts.ID, ld, nextLogs)
return fanout.ConsumeLogs(ctx, ld)
},
)

err := c.consumer.UpdateOptions(c.args, nextLogs)
err := c.consumer.UpdateOptions(c.args, logsInterceptor)
if err != nil {
return fmt.Errorf("failed to update traces consumer due to error: %w", err)
}

return nil
}

func (c *Component) LiveDebugging(_ int) {
c.Update(c.args)
}
func (c *Component) LiveDebugging() {}
5 changes: 5 additions & 0 deletions internal/component/otelcol/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ type Consumer interface {
otelconsumer.Logs
}

type ConsumerWithComponentID interface {
Consumer
ComponentID() string
}

// ConsumerArguments is a common Arguments type for Alloy components which can
// send data to otelcol consumers.
//
Expand Down
37 changes: 37 additions & 0 deletions internal/component/otelcol/internal/interceptorconsumer/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package interceptorconsumer

import (
"context"

otelconsumer "go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
)

type LogsInterceptorFunc func(context.Context, plog.Logs) error

type LogsInterceptor struct {
onConsumeLogs LogsInterceptorFunc
nextLogs otelconsumer.Logs
mutatesData bool // must be set to true if the provided opts modifies the data
}

func Logs(nextLogs otelconsumer.Logs, mutatesData bool, f LogsInterceptorFunc) otelconsumer.Logs {
return &LogsInterceptor{
nextLogs: nextLogs,
mutatesData: mutatesData,
onConsumeLogs: f,
}
}

func (i *LogsInterceptor) Capabilities() otelconsumer.Capabilities {
return otelconsumer.Capabilities{MutatesData: i.mutatesData}
}

func (i *LogsInterceptor) ConsumeLogs(ctx context.Context, ld plog.Logs) error {

if i.onConsumeLogs != nil {
return i.onConsumeLogs(ctx, ld)
}

return i.nextLogs.ConsumeLogs(ctx, ld)
}
37 changes: 37 additions & 0 deletions internal/component/otelcol/internal/interceptorconsumer/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package interceptorconsumer

import (
"context"

otelconsumer "go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
)

type MetricsInterceptorFunc func(context.Context, pmetric.Metrics) error

type MetricsInterceptor struct {
onConsumeMetrics MetricsInterceptorFunc
nextMetrics otelconsumer.Metrics
mutatesData bool // must be set to true if the provided opts modifies the data
}

func Metrics(nextMetrics otelconsumer.Metrics, mutatesData bool, f MetricsInterceptorFunc) otelconsumer.Metrics {
return &MetricsInterceptor{
nextMetrics: nextMetrics,
mutatesData: mutatesData,
onConsumeMetrics: f,
}
}

func (i *MetricsInterceptor) Capabilities() otelconsumer.Capabilities {
return otelconsumer.Capabilities{MutatesData: i.mutatesData}
}

func (i *MetricsInterceptor) ConsumeMetrics(ctx context.Context, ld pmetric.Metrics) error {

if i.onConsumeMetrics != nil {
return i.onConsumeMetrics(ctx, ld)
}

return i.nextMetrics.ConsumeMetrics(ctx, ld)
}
37 changes: 37 additions & 0 deletions internal/component/otelcol/internal/interceptorconsumer/traces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package interceptorconsumer

import (
"context"

otelconsumer "go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/ptrace"
)

type TracesInterceptorFunc func(context.Context, ptrace.Traces) error

type TracesInterceptor struct {
onConsumeTraces TracesInterceptorFunc
nextTraces otelconsumer.Traces
mutatesData bool // must be set to true if the provided opts modifies the data
}

func Traces(nextTraces otelconsumer.Traces, mutatesData bool, f TracesInterceptorFunc) otelconsumer.Traces {
return &TracesInterceptor{
nextTraces: nextTraces,
mutatesData: mutatesData,
onConsumeTraces: f,
}
}

func (i *TracesInterceptor) Capabilities() otelconsumer.Capabilities {
return otelconsumer.Capabilities{MutatesData: i.mutatesData}
}

func (i *TracesInterceptor) ConsumeTraces(ctx context.Context, ld ptrace.Traces) error {

if i.onConsumeTraces != nil {
return i.onConsumeTraces(ctx, ld)
}

return i.nextTraces.ConsumeTraces(ctx, ld)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"sync"

"github.com/grafana/alloy/internal/component/otelcol"
otelconsumer "go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand All @@ -30,9 +31,10 @@ type Consumer struct {
}

var (
_ otelconsumer.Traces = (*Consumer)(nil)
_ otelconsumer.Metrics = (*Consumer)(nil)
_ otelconsumer.Logs = (*Consumer)(nil)
_ otelconsumer.Traces = (*Consumer)(nil)
_ otelconsumer.Metrics = (*Consumer)(nil)
_ otelconsumer.Logs = (*Consumer)(nil)
_ otelcol.ConsumerWithComponentID = (*Consumer)(nil)
)

// New creates a new Consumer. The provided ctx is used to determine when the
Expand Down
Loading

0 comments on commit c2c9bf4

Please sign in to comment.