Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Add retention hours to discarded metrics #15875

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
37 changes: 37 additions & 0 deletions pkg/compactor/retention/expiration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,43 @@ func Test_expirationChecker_Expired(t *testing.T) {
}
}

func TestTenantsRetention_RetentionPeriodFor(t *testing.T) {
sevenDays, err := model.ParseDuration("720h")
require.NoError(t, err)
oneDay, err := model.ParseDuration("24h")
require.NoError(t, err)

tr := NewTenantsRetention(fakeLimits{
defaultLimit: retentionLimit{
retentionPeriod: time.Duration(sevenDays),
streamRetention: []validation.StreamRetention{
{
Period: oneDay,
Matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
},
},
perTenant: map[string]retentionLimit{
"1": {
retentionPeriod: time.Duration(sevenDays),
streamRetention: []validation.StreamRetention{
{
Period: oneDay,
Matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
},
},
},
})

require.Equal(t, time.Duration(sevenDays), tr.RetentionPeriodFor("1", nil))
require.Equal(t, time.Duration(oneDay), tr.RetentionPeriodFor("1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}))
}

func Test_expirationChecker_Expired_zeroValue(t *testing.T) {

// Default retention should be zero
Expand Down
35 changes: 6 additions & 29 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,6 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
// We use the heuristic of 1 sample per TS to size the array.
// We also work out the hash value at the same time.
streams := make([]KeyedStream, 0, len(req.Streams))
validationMetrics := newValidationMetrics()

var validationErrors util.GroupedErrors

Expand Down Expand Up @@ -493,7 +492,8 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}
}

tenantRetentionHours := d.validator.Limits.RetentionHours(tenantID, nil)
tenantRetentionHours := util.RetentionHours(d.tenantsRetention.RetentionPeriodFor(tenantID, nil))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: instead of calling util.RetentionHours what about having tenantsRetention.RetentionHoursFor() string that calls util.RetentionHours for the result of tenantsRetention.RetentionPeriodFor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

validationMetrics := newValidationMetrics(tenantRetentionHours)

func() {
sp := opentracing.SpanFromContext(ctx)
Expand Down Expand Up @@ -524,7 +524,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
continue
}

retentionHours := d.validator.Limits.RetentionHours(tenantID, lbs)
retentionHours := util.RetentionHours(d.tenantsRetention.RetentionPeriodFor(tenantID, lbs))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit for a followup PR since we were resolving the retention for the streams before the changes. I think we could be able to speed up pushes by caching this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think optimizing that would do more harm than good (based on the benchmarks). Doing the new retention evaluation on pushes didn't make a difference but adding the cache will likely do, either by the extra memory usage or by the extra complexity of adding of caching more stuff.


if missing, lbsMissing := d.missingEnforcedLabels(lbs, tenantID); missing {
err := fmt.Errorf(validation.MissingEnforcedLabelsErrorMsg, strings.Join(lbsMissing, ","), tenantID)
Expand Down Expand Up @@ -620,7 +620,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}

if block, until, retStatusCode := d.validator.ShouldBlockIngestion(validationContext, now); block {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validationMetrics, validation.BlockedIngestion, tenantRetentionHours)
d.trackDiscardedData(ctx, req, validationContext, tenantID, validationMetrics, validation.BlockedIngestion)

err = fmt.Errorf(validation.BlockedIngestionErrorMsg, tenantID, until.Format(time.RFC3339), retStatusCode)
d.writeFailuresManager.Log(tenantID, err)
Expand All @@ -635,7 +635,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}

if !d.ingestionRateLimiter.AllowN(now, tenantID, validationMetrics.lineSize) {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validationMetrics, validation.RateLimited, tenantRetentionHours)
d.trackDiscardedData(ctx, req, validationContext, tenantID, validationMetrics, validation.RateLimited)

err = fmt.Errorf(validation.RateLimitedErrorMsg, tenantID, int(d.ingestionRateLimiter.Limit(now, tenantID)), validationMetrics.lineCount, validationMetrics.lineSize)
d.writeFailuresManager.Log(tenantID, err)
Expand Down Expand Up @@ -773,7 +773,6 @@ func (d *Distributor) trackDiscardedData(
tenantID string,
validationMetrics validationMetrics,
reason string,
tenantRetentionHours string,
) {
for retentionHours, count := range validationMetrics.lineCountPerRetentionHours {
validation.DiscardedSamples.WithLabelValues(reason, tenantID, retentionHours).Add(float64(count))
Expand All @@ -782,7 +781,7 @@ func (d *Distributor) trackDiscardedData(

if d.usageTracker != nil {
for _, stream := range req.Streams {
lbs, _, _, err := d.parseStreamLabels(validationContext, stream.Labels, stream, tenantRetentionHours)
lbs, _, _, err := d.parseStreamLabels(validationContext, stream.Labels, stream, validationMetrics.tenantRetentionHours)
if err != nil {
continue
}
Expand Down Expand Up @@ -1264,25 +1263,3 @@ func newRingAndLifecycler(cfg RingConfig, instanceCount *atomic.Uint32, logger l
func (d *Distributor) HealthyInstancesCount() int {
return int(d.healthyInstancesCount.Load())
}

type validationMetrics struct {
lineSizePerRetentionHours map[string]int
lineCountPerRetentionHours map[string]int
lineSize int
lineCount int
}

func newValidationMetrics() validationMetrics {
return validationMetrics{
lineSizePerRetentionHours: make(map[string]int),
lineCountPerRetentionHours: make(map[string]int),
}
}

func (v *validationMetrics) compute(entry logproto.Entry, retentionHours string) {
totalEntrySize := util.EntryTotalSize(&entry)
v.lineSizePerRetentionHours[retentionHours] += totalEntrySize
v.lineCountPerRetentionHours[retentionHours]++
v.lineSize += totalEntrySize
v.lineCount++
}
3 changes: 0 additions & 3 deletions pkg/distributor/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package distributor
import (
"time"

"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/compactor/retention"
"github.com/grafana/loki/v3/pkg/distributor/shardstreams"
"github.com/grafana/loki/v3/pkg/loghttp/push"
Expand Down Expand Up @@ -37,7 +35,6 @@ type Limits interface {
MaxStructuredMetadataSize(userID string) int
MaxStructuredMetadataCount(userID string) int
OTLPConfig(userID string) push.OTLPConfig
RetentionHours(userID string, labels labels.Labels) string

BlockIngestionUntil(userID string) time.Time
BlockIngestionStatusCode(userID string) int
Expand Down
30 changes: 30 additions & 0 deletions pkg/distributor/validation_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package distributor

import (
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/util"
)

type validationMetrics struct {
lineSizePerRetentionHours map[string]int
lineCountPerRetentionHours map[string]int
lineSize int
lineCount int
tenantRetentionHours string
}

func newValidationMetrics(tenantRetentionHours string) validationMetrics {
return validationMetrics{
lineSizePerRetentionHours: make(map[string]int),
lineCountPerRetentionHours: make(map[string]int),
tenantRetentionHours: tenantRetentionHours,
}
}

func (v *validationMetrics) compute(entry logproto.Entry, retentionHours string) {
totalEntrySize := util.EntryTotalSize(&entry)
v.lineSizePerRetentionHours[retentionHours] += totalEntrySize
v.lineCountPerRetentionHours[retentionHours]++
v.lineSize += totalEntrySize
v.lineCount++
}
7 changes: 5 additions & 2 deletions pkg/ingester/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/compactor/retention"
"github.com/grafana/loki/v3/pkg/compression"
"github.com/grafana/loki/v3/pkg/distributor/writefailures"
"github.com/grafana/loki/v3/pkg/ingester/client"
Expand Down Expand Up @@ -460,9 +461,10 @@ func Test_SeriesIterator(t *testing.T) {
require.NoError(t, err)

limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits})
tenantsRetention := retention.NewTenantsRetention(limits)

for i := 0; i < 3; i++ {
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, nil, nil, NewStreamRateCalculator(), nil, nil)
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, nil, nil, NewStreamRateCalculator(), nil, nil, tenantsRetention)
require.Nil(t, err)
require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream1}}))
require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream2}}))
Expand Down Expand Up @@ -508,9 +510,10 @@ func Benchmark_SeriesIterator(b *testing.B) {
require.NoError(b, err)

limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits})
tenantsRetention := retention.NewTenantsRetention(limits)

for i := range instances {
inst, _ := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("instance %d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, nil, nil, NewStreamRateCalculator(), nil, nil)
inst, _ := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("instance %d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, nil, nil, NewStreamRateCalculator(), nil, nil, tenantsRetention)

require.NoError(b,
inst.Push(context.Background(), &logproto.PushRequest{
Expand Down
7 changes: 6 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/loki/v3/pkg/analytics"
"github.com/grafana/loki/v3/pkg/compactor/retention"
"github.com/grafana/loki/v3/pkg/compression"
"github.com/grafana/loki/v3/pkg/distributor/writefailures"
"github.com/grafana/loki/v3/pkg/ingester/client"
Expand Down Expand Up @@ -265,6 +266,8 @@ type Ingester struct {

limiter *Limiter

tenantsRetention *retention.TenantsRetention

// Denotes whether the ingester should flush on shutdown.
// Currently only used by the WAL to signal when the disk is full.
flushOnShutdownSwitch *OnceSwitch
Expand Down Expand Up @@ -426,6 +429,8 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con
// Now that the lifecycler has been created, we can create the limiter
// which depends on it.
i.limiter = NewLimiter(limits, metrics, streamCountLimiter, streamRateLimiter)

i.tenantsRetention = retention.NewTenantsRetention(i.limiter.limits)
i.recalculateOwnedStreams = newRecalculateOwnedStreamsSvc(i.getInstances, ownedStreamsStrategy, cfg.OwnedStreamsCheckInterval, util_log.Logger)

return i, nil
Expand Down Expand Up @@ -1038,7 +1043,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
inst, ok = i.instances[instanceID]
if !ok {
var err error
inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch, i.chunkFilter, i.pipelineWrapper, i.extractorWrapper, i.streamRateCalculator, i.writeLogManager, i.customStreamsTracker)
inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch, i.chunkFilter, i.pipelineWrapper, i.extractorWrapper, i.streamRateCalculator, i.writeLogManager, i.customStreamsTracker, i.tenantsRetention)
if err != nil {
return nil, err
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/grafana/loki/v3/pkg/analytics"
"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/compactor/retention"
"github.com/grafana/loki/v3/pkg/distributor/writefailures"
"github.com/grafana/loki/v3/pkg/ingester/index"
"github.com/grafana/loki/v3/pkg/ingester/wal"
Expand Down Expand Up @@ -126,6 +127,8 @@ type instance struct {
schemaconfig *config.SchemaConfig

customStreamsTracker push.UsageTracker

tenantsRetention *retention.TenantsRetention
}

func newInstance(
Expand All @@ -143,6 +146,7 @@ func newInstance(
streamRateCalculator *StreamRateCalculator,
writeFailures *writefailures.Manager,
customStreamsTracker push.UsageTracker,
tenantsRetention *retention.TenantsRetention,
) (*instance, error) {
invertedIndex, err := index.NewMultiInvertedIndex(periodConfigs, uint32(cfg.IndexShards))
if err != nil {
Expand Down Expand Up @@ -181,6 +185,8 @@ func newInstance(
schemaconfig: &c,

customStreamsTracker: customStreamsTracker,

tenantsRetention: tenantsRetention,
}
i.mapper = NewFPMapper(i.getLabelsFromFingerprint)

Expand Down Expand Up @@ -290,12 +296,12 @@ func (i *instance) createStream(ctx context.Context, pushReqStream logproto.Stre
return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error())
}

retentionHours := util.RetentionHours(i.tenantsRetention.RetentionPeriodFor(i.instanceID, labels))

if record != nil {
err = i.streamCountLimiter.AssertNewStreamAllowed(i.instanceID)
}

retentionHours := i.limiter.limits.RetentionHours(i.instanceID, labels)

if err != nil {
return i.onStreamCreationError(ctx, pushReqStream, err, labels, retentionHours)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This err handler looks a bit off-place. What about getting the retentionHours after getting the labels?

        labels, err := syntax.ParseLabels(pushReqStream.Labels)
	if err != nil {
            ...
 	}

	retentionHours := i.limiter.limits.RetentionHours(i.instanceID, labels)

	if record != nil {
		err = i.streamCountLimiter.AssertNewStreamAllowed(i.instanceID)
                return i.onStreamCreationError(ctx, pushReqStream, err, labels, retentionHours)
	}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call, fixed it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite my suggestion but it's fine since it's a nit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you sure? I'm doing it immediately after parsing the labels.

}
Expand Down Expand Up @@ -377,7 +383,7 @@ func (i *instance) createStreamByFP(ls labels.Labels, fp model.Fingerprint) (*st
return nil, fmt.Errorf("failed to create stream for fingerprint: %w", err)
}

retentionHours := i.limiter.limits.RetentionHours(i.instanceID, ls)
retentionHours := util.RetentionHours(i.tenantsRetention.RetentionPeriodFor(i.instanceID, ls))
s := newStream(chunkfmt, headfmt, i.cfg, i.limiter.rateLimitStrategy, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures, i.configs, retentionHours)

i.onStreamCreated(s)
Expand Down
Loading
Loading