Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: locking in metrics #14144

Merged
merged 1 commit into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions test/e2e/testdata/cronworkflow-deprecated-schedule.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ kind: CronWorkflow
metadata:
name: test-cron-deprecated-schedule
spec:
schedules:
- "* * * * *"
schedule: "* * * * *"
concurrencyPolicy: "Forbid"
startingDeadlineSeconds: 0
workflowMetadata:
Expand Down
8 changes: 3 additions & 5 deletions util/telemetry/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Instrument struct {
}

func (m *Metrics) preCreateCheck(name string) error {
if _, exists := m.AllInstruments[name]; exists {
if inst := m.GetInstrument(name); inst != nil {
return fmt.Errorf("Instrument called %s already exists", name)
}
return nil
Expand Down Expand Up @@ -69,8 +69,6 @@ func collectOptions(options ...instrumentOption) instrumentOptions {

func (m *Metrics) CreateInstrument(instType instrumentType, name, desc, unit string, options ...instrumentOption) error {
opts := collectOptions(options...)
m.Mutex.Lock()
defer m.Mutex.Unlock()
err := m.preCreateCheck(name)
if err != nil {
return err
Expand Down Expand Up @@ -137,11 +135,11 @@ func (m *Metrics) CreateInstrument(instType instrumentType, name, desc, unit str
if err != nil {
return err
}
m.AllInstruments[name] = &Instrument{
m.AddInstrument(name, &Instrument{
name: name,
description: desc,
otel: instPtr,
}
})
return nil
}

Expand Down
40 changes: 32 additions & 8 deletions util/telemetry/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,39 @@ type Config struct {
}

type Metrics struct {
// Ensures mutual exclusion in workflows map
Mutex sync.RWMutex

// Evil context for compatibility with legacy context free interfaces
Ctx context.Context
otelMeter *metric.Meter
config *Config

AllInstruments map[string]*Instrument
// Ensures mutual exclusion in instruments
mutex sync.RWMutex
instruments map[string]*Instrument
}

func (m *Metrics) AddInstrument(name string, inst *Instrument) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.instruments[name] = inst
}

func (m *Metrics) GetInstrument(name string) *Instrument {
m.mutex.RLock()
defer m.mutex.RUnlock()
inst, ok := m.instruments[name]
if !ok {
return nil
}
return inst
}

// IterateROInstruments iterates over every instrument for Read-Only purposes
func (m *Metrics) IterateROInstruments(fn func(i *Instrument)) {
m.mutex.RLock()
defer m.mutex.RUnlock()
for _, i := range m.instruments {
fn(i)
}
}

func NewMetrics(ctx context.Context, serviceName, prometheusName string, config *Config, extraOpts ...metricsdk.Option) (*Metrics, error) {
Expand Down Expand Up @@ -81,10 +105,10 @@ func NewMetrics(ctx context.Context, serviceName, prometheusName string, config

meter := provider.Meter(serviceName)
metrics := &Metrics{
Ctx: ctx,
otelMeter: &meter,
config: config,
AllInstruments: make(map[string]*Instrument),
Ctx: ctx,
otelMeter: &meter,
config: config,
instruments: make(map[string]*Instrument),
}

return metrics, nil
Expand Down
4 changes: 2 additions & 2 deletions util/telemetry/operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func (m *Metrics) AddInt(ctx context.Context, name string, val int64, attribs InstAttribs) {
if instrument, ok := m.AllInstruments[name]; ok {
if instrument := m.GetInstrument(name); instrument != nil {
instrument.AddInt(ctx, val, attribs)
} else {
log.Errorf("Metrics addInt() to non-existent metric %s", name)
Expand All @@ -29,7 +29,7 @@ func (i *Instrument) AddInt(ctx context.Context, val int64, attribs InstAttribs)
}

func (m *Metrics) Record(ctx context.Context, name string, val float64, attribs InstAttribs) {
if instrument, ok := m.AllInstruments[name]; ok {
if instrument := m.GetInstrument(name); instrument != nil {
instrument.Record(ctx, val, attribs)
} else {
log.Errorf("Metrics record() to non-existent metric %s", name)
Expand Down
2 changes: 1 addition & 1 deletion workflow/metrics/counter_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func addLogCounter(ctx context.Context, m *Metrics) error {
err := m.CreateBuiltinInstrument(telemetry.InstrumentLogMessages)
name := telemetry.InstrumentLogMessages.Name()
lm := logMetric{
counter: m.AllInstruments[name],
counter: m.GetInstrument(name),
}
log.AddHook(lm)
for _, level := range lm.Levels() {
Expand Down
4 changes: 2 additions & 2 deletions workflow/metrics/gauge_pod_phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ func addPodPhaseGauge(ctx context.Context, m *Metrics) error {
if m.callbacks.PodPhase != nil {
ppGauge := podPhaseGauge{
callback: m.callbacks.PodPhase,
gauge: m.AllInstruments[name],
gauge: m.GetInstrument(name),
}
return m.AllInstruments[name].RegisterCallback(m.Metrics, ppGauge.update)
return ppGauge.gauge.RegisterCallback(m.Metrics, ppGauge.update)
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions workflow/metrics/gauge_workflow_condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ func addWorkflowConditionGauge(_ context.Context, m *Metrics) error {
if m.callbacks.WorkflowCondition != nil {
wfcGauge := workflowConditionGauge{
callback: m.callbacks.WorkflowCondition,
gauge: m.AllInstruments[telemetry.InstrumentWorkflowCondition.Name()],
gauge: m.GetInstrument(telemetry.InstrumentWorkflowCondition.Name()),
}
return m.AllInstruments[telemetry.InstrumentWorkflowCondition.Name()].RegisterCallback(m.Metrics, wfcGauge.update)
return wfcGauge.gauge.RegisterCallback(m.Metrics, wfcGauge.update)
}
return nil
// TODO init all phases?
Expand Down
4 changes: 2 additions & 2 deletions workflow/metrics/gauge_workflow_phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ func addWorkflowPhaseGauge(_ context.Context, m *Metrics) error {
if m.callbacks.WorkflowPhase != nil {
wfpGauge := workflowPhaseGauge{
callback: m.callbacks.WorkflowPhase,
gauge: m.AllInstruments[name],
gauge: m.GetInstrument(name),
}
return m.AllInstruments[name].RegisterCallback(m.Metrics, wfpGauge.update)
return wfpGauge.gauge.RegisterCallback(m.Metrics, wfpGauge.update)
}
return nil
// TODO init all phases?
Expand Down
4 changes: 2 additions & 2 deletions workflow/metrics/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ func addIsLeader(ctx context.Context, m *Metrics) error {
name := telemetry.InstrumentIsLeader.Name()
lGauge := leaderGauge{
callback: m.callbacks.IsLeader,
gauge: m.AllInstruments[name],
gauge: m.GetInstrument(name),
}
return m.AllInstruments[name].RegisterCallback(m.Metrics, lGauge.update)
return lGauge.gauge.RegisterCallback(m.Metrics, lGauge.update)
}

func (l *leaderGauge) update(_ context.Context, o metric.Observer) error {
Expand Down
2 changes: 2 additions & 0 deletions workflow/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metrics

import (
"context"
"sync"

"github.com/argoproj/argo-workflows/v3/util/telemetry"

Expand All @@ -12,6 +13,7 @@ type Metrics struct {
*telemetry.Metrics

callbacks Callbacks
realtimeMutex sync.Mutex
realtimeWorkflows map[string][]realtimeTracker
}

Expand Down
Loading