Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OTE-1506] Loadbalancer exporter: Add resource_keys routing for the traces #14208

Merged
merged 1 commit into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions exporter/loadbalancingexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ const (
svcRouting
metricNameRouting
resourceRouting
resourceKeysRouting
)

// Config defines configuration for the exporter.
type Config struct {
Protocol Protocol `mapstructure:"protocol"`
Resolver ResolverSettings `mapstructure:"resolver"`
RoutingKey string `mapstructure:"routing_key"`
Protocol Protocol `mapstructure:"protocol"`
Resolver ResolverSettings `mapstructure:"resolver"`
RoutingKey string `mapstructure:"routing_key"`
ResourceKeys []string `mapstructure:"resource_keys"`
}

// Protocol holds the individual protocol-specific settings. Only OTLP is supported at the moment.
Expand Down
42 changes: 29 additions & 13 deletions exporter/loadbalancingexporter/trace_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ var _ exporter.Traces = (*traceExporterImp)(nil)
type exporterTraces map[*wrappedExporter]ptrace.Traces

type traceExporterImp struct {
loadBalancer *loadBalancer
routingKey routingKey
loadBalancer *loadBalancer
routingKey routingKey
routingResourceKeys []string

stopped bool
shutdownWg sync.WaitGroup
Expand All @@ -38,8 +39,9 @@ type traceExporterImp struct {
func newTracesExporter(params exporter.CreateSettings, cfg component.Config) (*traceExporterImp, error) {
exporterFactory := otlpexporter.NewFactory()

eCfg := cfg.(*Config)
lb, err := newLoadBalancer(params, cfg, func(ctx context.Context, endpoint string) (component.Component, error) {
oCfg := buildExporterConfig(cfg.(*Config), endpoint)
oCfg := buildExporterConfig(eCfg, endpoint)
return exporterFactory.CreateTracesExporter(ctx, params, &oCfg)
})
if err != nil {
Expand All @@ -48,9 +50,13 @@ func newTracesExporter(params exporter.CreateSettings, cfg component.Config) (*t

traceExporter := traceExporterImp{loadBalancer: lb, routingKey: traceIDRouting}

switch cfg.(*Config).RoutingKey {
switch eCfg.RoutingKey {
case "service":
traceExporter.routingKey = svcRouting
traceExporter.routingKey = resourceKeysRouting
traceExporter.routingResourceKeys = []string{"service.name"}
case "resource":
traceExporter.routingKey = resourceKeysRouting
traceExporter.routingResourceKeys = eCfg.ResourceKeys
case "traceID", "":
default:
return nil, fmt.Errorf("unsupported routing_key: %s", cfg.(*Config).RoutingKey)
Expand Down Expand Up @@ -85,7 +91,7 @@ func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces)
exporterSegregatedTraces := make(exporterTraces)
endpoints := make(map[*wrappedExporter]string)
for _, batch := range batches {
routingID, err := routingIdentifiersFromTraces(batch, e.routingKey)
routingID, err := e.routingIdentifiersFromTraces(batch)
if err != nil {
return err
}
Expand Down Expand Up @@ -132,7 +138,7 @@ func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces)
return errs
}

func routingIdentifiersFromTraces(td ptrace.Traces, key routingKey) (map[string]bool, error) {
func (e *traceExporterImp) routingIdentifiersFromTraces(td ptrace.Traces) (map[string]bool, error) {
foadnh marked this conversation as resolved.
Show resolved Hide resolved
ids := make(map[string]bool)
rs := td.ResourceSpans()
if rs.Len() == 0 {
Expand All @@ -149,15 +155,25 @@ func routingIdentifiersFromTraces(td ptrace.Traces, key routingKey) (map[string]
return nil, errors.New("empty spans")
}

if key == svcRouting {
if e.routingKey == resourceKeysRouting {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@foadnh does this change mean that we no longer have routing by service? It looks like you've switched from service routing to resource key routing.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, thanks!

var missingResourceKey bool
for i := 0; i < rs.Len(); i++ {
svc, ok := rs.At(i).Resource().Attributes().Get("service.name")
if !ok {
return nil, errors.New("unable to get service name")
var resourceKeyFound bool
rsi := rs.At(i)
for _, attrKey := range e.routingResourceKeys {
if v, ok := rsi.Resource().Attributes().Get(attrKey); ok {
ids[v.AsString()] = true
resourceKeyFound = true
break
}
}
if !resourceKeyFound {
missingResourceKey = true
}
ids[svc.Str()] = true
}
return ids, nil
if !missingResourceKey {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we replace missingResourceKey and resourceKeyFound this with len(ids) > 0 ? We should return ids map if it is not empty, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an assumption that there should be an identifier per resource in this function (Although in reality, we will have only a single resource per call - something Strange about the upstream implementation that should be addressed, since I want to merge it to upstream, I don't want to address this assumption!)0

return ids, nil
}
}
tid := spans.At(0).TraceID()
ids[string(tid[:])] = true
Expand Down
189 changes: 169 additions & 20 deletions exporter/loadbalancingexporter/trace_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,52 @@ import (

func TestNewTracesExporter(t *testing.T) {
for _, tt := range []struct {
desc string
config *Config
err error
desc string
config *Config
wantRoutingKey routingKey
wantRoutingResourceKeys []string
err error
}{
{
"simple",
simpleConfig(),
traceIDRouting,
nil,
nil,
},
{
"service",
serviceBasedRoutingConfig(),
resourceKeysRouting,
[]string{conventions.AttributeServiceName},
nil,
},
{
"resource_keys",
resourceKeysBasedRoutingConfig(),
resourceKeysRouting,
[]string{"resource.key_1", "resource.key_2"},
nil,
},
{
"empty",
&Config{},
0,
nil,
errNoResolver,
},
} {
t.Run(tt.desc, func(t *testing.T) {
// test
_, err := newTracesExporter(exportertest.NewNopCreateSettings(), tt.config)
te, err := newTracesExporter(exportertest.NewNopCreateSettings(), tt.config)

// verify
require.Equal(t, tt.err, err)
if err != nil {
return
}
require.Equal(t, tt.wantRoutingKey, te.routingKey)
require.Equal(t, tt.wantRoutingResourceKeys, te.routingResourceKeys)
})
}
}
Expand Down Expand Up @@ -219,7 +244,8 @@ func TestConsumeTracesServiceBased(t *testing.T) {
p, err := newTracesExporter(exportertest.NewNopCreateSettings(), serviceBasedRoutingConfig())
require.NotNil(t, p)
require.NoError(t, err)
assert.Equal(t, p.routingKey, svcRouting)
assert.Equal(t, p.routingKey, resourceKeysRouting)
assert.Equal(t, p.routingResourceKeys, []string{"service.name"})

// pre-load an exporter here, so that we don't use the actual OTLP exporter
lb.addMissingExporters(context.Background(), []string{"endpoint-1"})
Expand All @@ -245,29 +271,113 @@ func TestConsumeTracesServiceBased(t *testing.T) {
assert.Nil(t, res)
}

func TestConsumeTracesResourceKeysBased(t *testing.T) {
componentFactory := func(_ context.Context, _ string) (component.Component, error) {
return newNopMockTracesExporter(), nil
}
lb, err := newLoadBalancer(exportertest.NewNopCreateSettings(), resourceKeysBasedRoutingConfig(), componentFactory)
require.NotNil(t, lb)
require.NoError(t, err)

p, err := newTracesExporter(exportertest.NewNopCreateSettings(), resourceKeysBasedRoutingConfig())
require.NotNil(t, p)
require.NoError(t, err)
assert.Equal(t, p.routingKey, resourceKeysRouting)
assert.Equal(t, p.routingResourceKeys, []string{"resource.key_1", "resource.key_2"})

// pre-load an exporter here, so that we don't use the actual OTLP exporter
lb.addMissingExporters(context.Background(), []string{"endpoint-1"})
lb.addMissingExporters(context.Background(), []string{"endpoint-2"})
lb.res = &mockResolver{
triggerCallbacks: true,
onResolve: func(_ context.Context) ([]string, error) {
return []string{"endpoint-1", "endpoint-2"}, nil
},
}
p.loadBalancer = lb

err = p.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
defer func() {
require.NoError(t, p.Shutdown(context.Background()))
}()

// test
res := p.ConsumeTraces(context.Background(), simpleTracesWithResourceKeys())

// verify
assert.Nil(t, res)
}

func TestServiceBasedRoutingForSameTraceId(t *testing.T) {
b := pcommon.TraceID([16]byte{1, 2, 3, 4})
for _, tt := range []struct {
desc string
batch ptrace.Traces
routingKey routingKey
res map[string]bool
te *traceExporterImp
desc string
batch ptrace.Traces
res map[string]bool
}{
{
&traceExporterImp{
routingKey: resourceKeysRouting,
routingResourceKeys: []string{"service.name"},
},
"same trace id and different services - service based routing",
twoServicesWithSameTraceID(),
svcRouting,
map[string]bool{"ad-service-1": true, "get-recommendations-7": true},
},
{
&traceExporterImp{
routingKey: traceIDRouting,
},
"same trace id and different services - trace id routing",
twoServicesWithSameTraceID(),
traceIDRouting,
map[string]bool{string(b[:]): true},
},
} {
t.Run(tt.desc, func(t *testing.T) {
res, err := routingIdentifiersFromTraces(tt.batch, tt.routingKey)
res, err := tt.te.routingIdentifiersFromTraces(tt.batch)
assert.Equal(t, err, nil)
assert.Equal(t, res, tt.res)
})
}
}

func TestResourceKeysBasedRoutingIdentifiers(t *testing.T) {
b := pcommon.TraceID([16]byte{1, 2, 3, 4})
for _, tt := range []struct {
te *traceExporterImp
desc string
batch ptrace.Traces
res map[string]bool
}{
{
&traceExporterImp{
routingKey: resourceKeysRouting,
routingResourceKeys: []string{"resource.key_1", "resource.key_2"},
},
"two resource_keys values",
simpleTracesWithResourceKeys(),
map[string]bool{
"val-1": true,
"val-2": true,
},
},
{
&traceExporterImp{
routingKey: resourceKeysRouting,
routingResourceKeys: []string{"resource.key_1"},
},
"single resource_keys value with trace ID as default",
simpleTracesWithResourceKeys(),
map[string]bool{
"val-1": true,
string(b[:]): true,
},
},
} {
t.Run(tt.desc, func(t *testing.T) {
res, err := tt.te.routingIdentifiersFromTraces(tt.batch)
assert.Equal(t, err, nil)
assert.Equal(t, res, tt.res)
})
Expand Down Expand Up @@ -405,40 +515,46 @@ func TestBatchWithTwoTraces(t *testing.T) {

func TestNoTracesInBatch(t *testing.T) {
for _, tt := range []struct {
desc string
batch ptrace.Traces
routingKey routingKey
err error
te *traceExporterImp
desc string
batch ptrace.Traces
err error
}{
{
&traceExporterImp{
routingKey: svcRouting,
},
"no resource spans",
ptrace.NewTraces(),
traceIDRouting,
errors.New("empty resource spans"),
},
{
&traceExporterImp{
routingKey: traceIDRouting,
},
"no instrumentation library spans",
func() ptrace.Traces {
batch := ptrace.NewTraces()
batch.ResourceSpans().AppendEmpty()
return batch
}(),
traceIDRouting,
errors.New("empty scope spans"),
},
{
&traceExporterImp{
routingKey: svcRouting,
},
"no spans",
func() ptrace.Traces {
batch := ptrace.NewTraces()
batch.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty()
return batch
}(),
svcRouting,
errors.New("empty spans"),
},
} {
t.Run(tt.desc, func(t *testing.T) {
res, err := routingIdentifiersFromTraces(tt.batch, tt.routingKey)
res, err := tt.te.routingIdentifiersFromTraces(tt.batch)
assert.Equal(t, err, tt.err)
assert.Equal(t, res, map[string]bool(nil))
})
Expand Down Expand Up @@ -684,6 +800,29 @@ func simpleTraces() ptrace.Traces {
return traces
}

func simpleTracesWithResourceKeys() ptrace.Traces {
traces := ptrace.NewTraces()
traces.ResourceSpans().EnsureCapacity(1)

rSpans := traces.ResourceSpans().AppendEmpty()
rAttrs := rSpans.Resource().Attributes()
rAttrs.PutStr("resource.key_1", "val-1")
rSpans.ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetTraceID([16]byte{1, 2, 3, 4})

rSpans = traces.ResourceSpans().AppendEmpty()
rAttrs = rSpans.Resource().Attributes()
rAttrs.PutStr("resource.key_2", "val-2")
rSpans.ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetTraceID([16]byte{1, 2, 3, 4})

rSpans = traces.ResourceSpans().AppendEmpty()
rAttrs = rSpans.Resource().Attributes()
rAttrs.PutStr("resource.key_1", "val-1")
rAttrs.PutStr("resource.key_2", "val-2")
rSpans.ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetTraceID([16]byte{1, 2, 3, 4})

return traces
}

func simpleTracesWithServiceName() ptrace.Traces {
traces := ptrace.NewTraces()
traces.ResourceSpans().EnsureCapacity(1)
Expand Down Expand Up @@ -736,6 +875,16 @@ func serviceBasedRoutingConfig() *Config {
}
}

func resourceKeysBasedRoutingConfig() *Config {
return &Config{
Resolver: ResolverSettings{
Static: &StaticResolver{Hostnames: []string{"endpoint-1", "endpoint-2"}},
},
RoutingKey: "resource_keys",
ResourceKeys: []string{"resource.key_1", "resource.key_2"},
}
}

type mockTracesExporter struct {
component.Component
ConsumeTracesFn func(ctx context.Context, td ptrace.Traces) error
Expand Down
Loading