Skip to content

Commit

Permalink
add livedebugging to otel connectors (#2598)
Browse files Browse the repository at this point in the history
  • Loading branch information
wildum authored Feb 5, 2025
1 parent f34e849 commit 9324ba5
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 15 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ Main (unreleased)

- Add livedebugging support for `prometheus.remote_write` (@ravishankar15)

- Add livedebugging support for `otelcol.connector.*` components (@wildum)

- Bump snmp_exporter and embedded modules to 0.27.0. Add support for multi-module handling by comma separation and expose argument to increase SNMP polling concurrency for `prometheus.exporter.snmp`. (@v-zhuravlev)

- Add support for pushv1.PusherService Connect API in `pyroscope.receive_http`. (@simonswine)
Expand Down
1 change: 1 addition & 0 deletions docs/sources/troubleshoot/debug.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ Supported components:
* `loki.process`
* `loki.relabel`
* `loki.secretfilter`
* `otelcol.connector.*`
* `otelcol.processor.*`
* `otelcol.receiver.*`
* `prometheus.remote_write`
Expand Down
45 changes: 35 additions & 10 deletions internal/component/otelcol/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"github.com/grafana/alloy/internal/component/otelcol/internal/fanoutconsumer"
"github.com/grafana/alloy/internal/component/otelcol/internal/lazycollector"
"github.com/grafana/alloy/internal/component/otelcol/internal/lazyconsumer"
"github.com/grafana/alloy/internal/component/otelcol/internal/livedebuggingconsumer"
"github.com/grafana/alloy/internal/component/otelcol/internal/scheduler"
"github.com/grafana/alloy/internal/service/livedebugging"
"github.com/grafana/alloy/internal/util/zapadapter"
)

Expand Down Expand Up @@ -76,11 +78,17 @@ type Connector struct {

sched *scheduler.Scheduler
collector *lazycollector.Collector

liveDebuggingConsumer *livedebuggingconsumer.Consumer
debugDataPublisher livedebugging.DebugDataPublisher

args Arguments
}

var (
_ component.Component = (*Connector)(nil)
_ component.HealthComponent = (*Connector)(nil)
_ component.LiveDebugging = (*Connector)(nil)
)

// New creates a new Alloy component which encapsulates an OpenTelemetry
Expand All @@ -90,6 +98,11 @@ var (
// The registered component must be registered to export the
// otelcol.ConsumerExports type, otherwise New will panic.
func New(opts component.Options, f otelconnector.Factory, args Arguments) (*Connector, error) {
debugDataPublisher, err := opts.GetServiceData(livedebugging.ServiceName)
if err != nil {
return nil, err
}

ctx, cancel := context.WithCancel(context.Background())

consumer := lazyconsumer.NewPaused(ctx)
Expand All @@ -114,8 +127,10 @@ func New(opts component.Options, f otelconnector.Factory, args Arguments) (*Conn
factory: f,
consumer: consumer,

sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume),
collector: collector,
liveDebuggingConsumer: livedebuggingconsumer.New(debugDataPublisher.(livedebugging.DebugDataPublisher), opts.ID),
debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher),
sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume),
collector: collector,
}
if err := p.Update(args); err != nil {
return nil, err
Expand All @@ -133,12 +148,12 @@ func (p *Connector) Run(ctx context.Context) error {
// configuration for OpenTelemetry Collector connector configuration and manage
// the underlying OpenTelemetry Collector connector.
func (p *Connector) Update(args component.Arguments) error {
pargs := args.(Arguments)
p.args = args.(Arguments)

host := scheduler.NewHost(
p.opts.Logger,
scheduler.WithHostExtensions(pargs.Extensions()),
scheduler.WithHostExporters(pargs.Exporters()),
scheduler.WithHostExtensions(p.args.Extensions()),
scheduler.WithHostExporters(p.args.Exporters()),
)

reg := prometheus.NewRegistry()
Expand All @@ -149,7 +164,7 @@ func (p *Connector) Update(args component.Arguments) error {
return err
}

metricsLevel, err := pargs.DebugMetricsConfig().Level.Convert()
metricsLevel, err := p.args.DebugMetricsConfig().Level.Convert()
if err != nil {
return err
}
Expand All @@ -171,12 +186,14 @@ func (p *Connector) Update(args component.Arguments) error {
},
}

connectorConfig, err := pargs.Convert()
connectorConfig, err := p.args.Convert()
if err != nil {
return err
}

next := pargs.NextConsumers()
next := p.args.NextConsumers()

liveDebuggingActive := p.debugDataPublisher.IsActive(livedebugging.ComponentID(p.opts.ID))

// Create instances of the connector from our factory for each of our
// supported telemetry signals.
Expand All @@ -186,14 +203,18 @@ func (p *Connector) Update(args component.Arguments) error {
var metricsConnector otelconnector.Metrics
var logsConnector otelconnector.Logs

switch pargs.ConnectorType() {
switch p.args.ConnectorType() {
case ConnectorTracesToMetrics:
if len(next.Traces) > 0 || len(next.Logs) > 0 {
return errors.New("this connector can only output metrics")
}

if len(next.Metrics) > 0 {
nextMetrics := fanoutconsumer.Metrics(next.Metrics)
metrics := next.Metrics
if liveDebuggingActive {
metrics = append(metrics, p.liveDebuggingConsumer)
}
nextMetrics := fanoutconsumer.Metrics(metrics)
tracesConnector, err = p.factory.CreateTracesToMetrics(p.ctx, settings, connectorConfig, nextMetrics)
if err != nil && !errors.Is(err, pipeline.ErrSignalNotSupported) {
return err
Expand All @@ -218,3 +239,7 @@ func (p *Connector) Update(args component.Arguments) error {
func (p *Connector) CurrentHealth() component.Health {
return p.sched.CurrentHealth()
}

func (p *Connector) LiveDebugging(_ int) {
p.Update(p.args)
}
40 changes: 35 additions & 5 deletions internal/component/otelcol/connector/spanlogs/spanlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"github.com/grafana/alloy/internal/component/otelcol"
"github.com/grafana/alloy/internal/component/otelcol/internal/fanoutconsumer"
"github.com/grafana/alloy/internal/component/otelcol/internal/lazyconsumer"
"github.com/grafana/alloy/internal/component/otelcol/internal/livedebuggingconsumer"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/grafana/alloy/internal/service/livedebugging"
"github.com/grafana/alloy/syntax"
)

Expand Down Expand Up @@ -76,24 +78,42 @@ func (args *Arguments) SetToDefault() {
// Component is the otelcol.exporter.spanlogs component.
type Component struct {
consumer *consumer

opts component.Options

liveDebuggingConsumer *livedebuggingconsumer.Consumer
debugDataPublisher livedebugging.DebugDataPublisher

args Arguments
}

var _ component.Component = (*Component)(nil)
var (
_ component.Component = (*Component)(nil)
_ component.LiveDebugging = (*Component)(nil)
)

// New creates a new otelcol.exporter.spanlogs component.
func New(o component.Options, c Arguments) (*Component, error) {
if c.Output.Traces != nil || c.Output.Metrics != nil {
level.Warn(o.Logger).Log("msg", "non-log output detected; this component only works for log outputs and trace inputs")
}

debugDataPublisher, err := o.GetServiceData(livedebugging.ServiceName)
if err != nil {
return nil, err
}

nextLogs := fanoutconsumer.Logs(c.Output.Logs)
consumer, err := NewConsumer(c, nextLogs)
if err != nil {
return nil, fmt.Errorf("failed to create a traces consumer due to error: %w", err)
}

res := &Component{
consumer: consumer,
opts: o,
consumer: consumer,
liveDebuggingConsumer: livedebuggingconsumer.New(debugDataPublisher.(livedebugging.DebugDataPublisher), o.ID),
debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher),
}

if err := res.Update(c); err != nil {
Expand All @@ -120,14 +140,24 @@ func (c *Component) Run(ctx context.Context) error {

// Update implements Component.
func (c *Component) Update(newConfig component.Arguments) error {
cfg := newConfig.(Arguments)
c.args = newConfig.(Arguments)

nextLogs := fanoutconsumer.Logs(cfg.Output.Logs)
fanoutConsumer := c.args.Output.Logs

if c.debugDataPublisher.IsActive(livedebugging.ComponentID(c.opts.ID)) {
fanoutConsumer = append(fanoutConsumer, c.liveDebuggingConsumer)
}

err := c.consumer.UpdateOptions(cfg, nextLogs)
nextLogs := fanoutconsumer.Logs(fanoutConsumer)

err := c.consumer.UpdateOptions(c.args, nextLogs)
if err != nil {
return fmt.Errorf("failed to update traces consumer due to error: %w", err)
}

return nil
}

func (c *Component) LiveDebugging(_ int) {
c.Update(c.args)
}

0 comments on commit 9324ba5

Please sign in to comment.