Skip to content

Commit

Permalink
fix: locking in metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Alan Clucas <[email protected]>
  • Loading branch information
Joibel committed Feb 3, 2025
1 parent 1cc373f commit 1dd5274
Show file tree
Hide file tree
Showing 13 changed files with 137 additions and 89 deletions.
3 changes: 1 addition & 2 deletions test/e2e/testdata/cronworkflow-deprecated-schedule.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ kind: CronWorkflow
metadata:
name: test-cron-deprecated-schedule
spec:
schedules:
- "* * * * *"
schedule: "* * * * *"
concurrencyPolicy: "Forbid"
startingDeadlineSeconds: 0
workflowMetadata:
Expand Down
8 changes: 3 additions & 5 deletions util/telemetry/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Instrument struct {
}

func (m *Metrics) preCreateCheck(name string) error {
if _, exists := m.AllInstruments[name]; exists {
if inst := m.GetInstrument(name); inst != nil {
return fmt.Errorf("Instrument called %s already exists", name)
}
return nil
Expand Down Expand Up @@ -69,8 +69,6 @@ func collectOptions(options ...instrumentOption) instrumentOptions {

func (m *Metrics) CreateInstrument(instType instrumentType, name, desc, unit string, options ...instrumentOption) error {
opts := collectOptions(options...)
m.Mutex.Lock()
defer m.Mutex.Unlock()
err := m.preCreateCheck(name)
if err != nil {
return err
Expand Down Expand Up @@ -137,11 +135,11 @@ func (m *Metrics) CreateInstrument(instType instrumentType, name, desc, unit str
if err != nil {
return err
}
m.AllInstruments[name] = &Instrument{
m.AddInstrument(name, &Instrument{
name: name,
description: desc,
otel: instPtr,
}
})
return nil
}

Expand Down
40 changes: 32 additions & 8 deletions util/telemetry/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,39 @@ type Config struct {
}

type Metrics struct {
// Ensures mutual exclusion in workflows map
Mutex sync.RWMutex

// Evil context for compatibility with legacy context free interfaces
Ctx context.Context
otelMeter *metric.Meter
config *Config

AllInstruments map[string]*Instrument
// Ensures mutual exclusion in instruments
mutex sync.RWMutex
instruments map[string]*Instrument
}

func (m *Metrics) AddInstrument(name string, inst *Instrument) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.instruments[name] = inst
}

func (m *Metrics) GetInstrument(name string) *Instrument {
m.mutex.RLock()
defer m.mutex.RUnlock()
inst, ok := m.instruments[name]
if !ok {
return nil
}
return inst
}

// IterateROInstruments iterates over every instrument for Read-Only purposes
func (m *Metrics) IterateROInstruments(fn func(i *Instrument)) {
m.mutex.RLock()
defer m.mutex.RUnlock()
for _, i := range m.instruments {
fn(i)
}
}

func NewMetrics(ctx context.Context, serviceName, prometheusName string, config *Config, extraOpts ...metricsdk.Option) (*Metrics, error) {
Expand Down Expand Up @@ -81,10 +105,10 @@ func NewMetrics(ctx context.Context, serviceName, prometheusName string, config

meter := provider.Meter(serviceName)
metrics := &Metrics{
Ctx: ctx,
otelMeter: &meter,
config: config,
AllInstruments: make(map[string]*Instrument),
Ctx: ctx,
otelMeter: &meter,
config: config,
instruments: make(map[string]*Instrument),
}

return metrics, nil
Expand Down
4 changes: 2 additions & 2 deletions util/telemetry/operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func (m *Metrics) AddInt(ctx context.Context, name string, val int64, attribs InstAttribs) {
if instrument, ok := m.AllInstruments[name]; ok {
if instrument := m.GetInstrument(name); instrument != nil {
instrument.AddInt(ctx, val, attribs)
} else {
log.Errorf("Metrics addInt() to non-existent metric %s", name)
Expand All @@ -29,7 +29,7 @@ func (i *Instrument) AddInt(ctx context.Context, val int64, attribs InstAttribs)
}

func (m *Metrics) Record(ctx context.Context, name string, val float64, attribs InstAttribs) {
if instrument, ok := m.AllInstruments[name]; ok {
if instrument := m.GetInstrument(name); instrument != nil {
instrument.Record(ctx, val, attribs)
} else {
log.Errorf("Metrics record() to non-existent metric %s", name)
Expand Down
2 changes: 1 addition & 1 deletion workflow/metrics/counter_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func addLogCounter(ctx context.Context, m *Metrics) error {
err := m.CreateBuiltinInstrument(telemetry.InstrumentLogMessages)
name := telemetry.InstrumentLogMessages.Name()
lm := logMetric{
counter: m.AllInstruments[name],
counter: m.GetInstrument(name),
}
log.AddHook(lm)
for _, level := range lm.Levels() {
Expand Down
4 changes: 2 additions & 2 deletions workflow/metrics/gauge_pod_phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ func addPodPhaseGauge(ctx context.Context, m *Metrics) error {
if m.callbacks.PodPhase != nil {
ppGauge := podPhaseGauge{
callback: m.callbacks.PodPhase,
gauge: m.AllInstruments[name],
gauge: m.GetInstrument(name),
}
return m.AllInstruments[name].RegisterCallback(m.Metrics, ppGauge.update)
return ppGauge.gauge.RegisterCallback(m.Metrics, ppGauge.update)
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions workflow/metrics/gauge_workflow_condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ func addWorkflowConditionGauge(_ context.Context, m *Metrics) error {
if m.callbacks.WorkflowCondition != nil {
wfcGauge := workflowConditionGauge{
callback: m.callbacks.WorkflowCondition,
gauge: m.AllInstruments[telemetry.InstrumentWorkflowCondition.Name()],
gauge: m.GetInstrument(telemetry.InstrumentWorkflowCondition.Name()),
}
return m.AllInstruments[telemetry.InstrumentWorkflowCondition.Name()].RegisterCallback(m.Metrics, wfcGauge.update)
return wfcGauge.gauge.RegisterCallback(m.Metrics, wfcGauge.update)
}
return nil
// TODO init all phases?
Expand Down
4 changes: 2 additions & 2 deletions workflow/metrics/gauge_workflow_phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ func addWorkflowPhaseGauge(_ context.Context, m *Metrics) error {
if m.callbacks.WorkflowPhase != nil {
wfpGauge := workflowPhaseGauge{
callback: m.callbacks.WorkflowPhase,
gauge: m.AllInstruments[name],
gauge: m.GetInstrument(name),
}
return m.AllInstruments[name].RegisterCallback(m.Metrics, wfpGauge.update)
return wfpGauge.gauge.RegisterCallback(m.Metrics, wfpGauge.update)
}
return nil
// TODO init all phases?
Expand Down
4 changes: 2 additions & 2 deletions workflow/metrics/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ func addIsLeader(ctx context.Context, m *Metrics) error {
name := telemetry.InstrumentIsLeader.Name()
lGauge := leaderGauge{
callback: m.callbacks.IsLeader,
gauge: m.AllInstruments[name],
gauge: m.GetInstrument(name),
}
return m.AllInstruments[name].RegisterCallback(m.Metrics, lGauge.update)
return lGauge.gauge.RegisterCallback(m.Metrics, lGauge.update)
}

func (l *leaderGauge) update(_ context.Context, o metric.Observer) error {
Expand Down
2 changes: 2 additions & 0 deletions workflow/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metrics

import (
"context"
"sync"

"github.com/argoproj/argo-workflows/v3/util/telemetry"

Expand All @@ -12,6 +13,7 @@ type Metrics struct {
*telemetry.Metrics

callbacks Callbacks
realtimeMutex sync.Mutex
realtimeWorkflows map[string][]realtimeTracker
}

Expand Down
Loading

0 comments on commit 1dd5274

Please sign in to comment.