From 62b8cfc26ba547e018368513b4188fd67d6b150a Mon Sep 17 00:00:00 2001 From: DylanGuedes Date: Tue, 21 Jan 2025 20:31:02 -0300 Subject: [PATCH] Refactor `discarded` metrics to report retention_hours --- .../deletion/tenant_request_handler_test.go | 6 ++ pkg/distributor/distributor.go | 68 +++++++++++++------ pkg/distributor/distributor_test.go | 7 +- pkg/distributor/limits.go | 3 + pkg/distributor/validator.go | 44 ++++++------ pkg/distributor/validator_test.go | 7 +- pkg/ingester/instance.go | 15 ++-- pkg/ingester/instance_test.go | 6 +- pkg/ingester/limiter.go | 4 ++ pkg/ingester/stream.go | 14 ++-- pkg/ingester/stream_test.go | 29 +++++--- pkg/ingester/streams_map_test.go | 3 + pkg/util/entry_size.go | 7 ++ pkg/validation/limits.go | 32 +++++++++ pkg/validation/validate.go | 4 +- 15 files changed, 178 insertions(+), 71 deletions(-) diff --git a/pkg/compactor/deletion/tenant_request_handler_test.go b/pkg/compactor/deletion/tenant_request_handler_test.go index cca06f4c18cfe..7178a31bcc0f0 100644 --- a/pkg/compactor/deletion/tenant_request_handler_test.go +++ b/pkg/compactor/deletion/tenant_request_handler_test.go @@ -7,8 +7,10 @@ import ( "time" "github.com/grafana/dskit/user" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/util" "github.com/grafana/loki/v3/pkg/validation" ) @@ -81,3 +83,7 @@ func (f *fakeLimits) RetentionPeriod(userID string) time.Duration { func (f *fakeLimits) StreamRetention(userID string) []validation.StreamRetention { return f.getLimitForUser(userID).streamRetention } + +func (f *fakeLimits) RetentionHours(userID string, _ labels.Labels) string { + return util.RetentionHours(f.getLimitForUser(userID).retentionPeriod) +} diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 6ede42aab1c20..8cdd5754a2d77 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -453,8 +453,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log // We use the heuristic of 1 sample per TS to size the array. // We also work out the hash value at the same time. streams := make([]KeyedStream, 0, len(req.Streams)) - validatedLineSize := 0 - validatedLineCount := 0 + validationMetrics := newValidationMetrics() var validationErrors util.GroupedErrors @@ -494,6 +493,8 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log } } + tenantRetentionHours := d.validator.Limits.RetentionHours(tenantID, nil) + func() { sp := opentracing.SpanFromContext(ctx) if sp != nil { @@ -513,23 +514,25 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log d.truncateLines(validationContext, &stream) var lbs labels.Labels - lbs, stream.Labels, stream.Hash, err = d.parseStreamLabels(validationContext, stream.Labels, stream) + lbs, stream.Labels, stream.Hash, err = d.parseStreamLabels(validationContext, stream.Labels, stream, tenantRetentionHours) if err != nil { d.writeFailuresManager.Log(tenantID, err) validationErrors.Add(err) - validation.DiscardedSamples.WithLabelValues(validation.InvalidLabels, tenantID).Add(float64(len(stream.Entries))) + validation.DiscardedSamples.WithLabelValues(validation.InvalidLabels, tenantID, tenantRetentionHours).Add(float64(len(stream.Entries))) discardedBytes := util.EntriesTotalSize(stream.Entries) - validation.DiscardedBytes.WithLabelValues(validation.InvalidLabels, tenantID).Add(float64(discardedBytes)) + validation.DiscardedBytes.WithLabelValues(validation.InvalidLabels, tenantID, tenantRetentionHours).Add(float64(discardedBytes)) continue } + retentionHours := d.validator.Limits.RetentionHours(tenantID, lbs) + if missing, lbsMissing := d.missingEnforcedLabels(lbs, tenantID); missing { err := fmt.Errorf(validation.MissingEnforcedLabelsErrorMsg, strings.Join(lbsMissing, ","), tenantID) d.writeFailuresManager.Log(tenantID, err) validationErrors.Add(err) - validation.DiscardedSamples.WithLabelValues(validation.MissingEnforcedLabels, tenantID).Add(float64(len(stream.Entries))) + validation.DiscardedSamples.WithLabelValues(validation.MissingEnforcedLabels, tenantID, retentionHours).Add(float64(len(stream.Entries))) discardedBytes := util.EntriesTotalSize(stream.Entries) - validation.DiscardedBytes.WithLabelValues(validation.MissingEnforcedLabels, tenantID).Add(float64(discardedBytes)) + validation.DiscardedBytes.WithLabelValues(validation.MissingEnforcedLabels, tenantID, retentionHours).Add(float64(discardedBytes)) continue } @@ -538,7 +541,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log prevTs := stream.Entries[0].Timestamp for _, entry := range stream.Entries { - if err := d.validator.ValidateEntry(ctx, validationContext, lbs, entry); err != nil { + if err := d.validator.ValidateEntry(ctx, validationContext, lbs, entry, retentionHours); err != nil { d.writeFailuresManager.Log(tenantID, err) validationErrors.Add(err) continue @@ -593,8 +596,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log } n++ - validatedLineSize += util.EntryTotalSize(&entry) - validatedLineCount++ + validationMetrics.compute(entry, retentionHours) pushSize += len(entry.Line) } stream.Entries = stream.Entries[:n] @@ -618,7 +620,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log } if block, until, retStatusCode := d.validator.ShouldBlockIngestion(validationContext, now); block { - d.trackDiscardedData(ctx, req, validationContext, tenantID, validatedLineCount, validatedLineSize, validation.BlockedIngestion) + d.trackDiscardedData(ctx, req, validationContext, tenantID, validationMetrics, validation.BlockedIngestion, tenantRetentionHours) err = fmt.Errorf(validation.BlockedIngestionErrorMsg, tenantID, until.Format(time.RFC3339), retStatusCode) d.writeFailuresManager.Log(tenantID, err) @@ -632,10 +634,10 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log return nil, httpgrpc.Errorf(retStatusCode, "%s", err.Error()) } - if !d.ingestionRateLimiter.AllowN(now, tenantID, validatedLineSize) { - d.trackDiscardedData(ctx, req, validationContext, tenantID, validatedLineCount, validatedLineSize, validation.RateLimited) + if !d.ingestionRateLimiter.AllowN(now, tenantID, validationMetrics.lineSize) { + d.trackDiscardedData(ctx, req, validationContext, tenantID, validationMetrics, validation.RateLimited, tenantRetentionHours) - err = fmt.Errorf(validation.RateLimitedErrorMsg, tenantID, int(d.ingestionRateLimiter.Limit(now, tenantID)), validatedLineCount, validatedLineSize) + err = fmt.Errorf(validation.RateLimitedErrorMsg, tenantID, int(d.ingestionRateLimiter.Limit(now, tenantID)), validationMetrics.lineCount, validationMetrics.lineSize) d.writeFailuresManager.Log(tenantID, err) // Return a 429 to indicate to the client they are being rate limited return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "%s", err.Error()) @@ -769,16 +771,18 @@ func (d *Distributor) trackDiscardedData( req *logproto.PushRequest, validationContext validationContext, tenantID string, - validatedLineCount int, - validatedLineSize int, + validationMetrics validationMetrics, reason string, + tenantRetentionHours string, ) { - validation.DiscardedSamples.WithLabelValues(reason, tenantID).Add(float64(validatedLineCount)) - validation.DiscardedBytes.WithLabelValues(reason, tenantID).Add(float64(validatedLineSize)) + for retentionHours, count := range validationMetrics.lineCountPerRetentionHours { + validation.DiscardedSamples.WithLabelValues(reason, tenantID, retentionHours).Add(float64(count)) + validation.DiscardedBytes.WithLabelValues(reason, tenantID, retentionHours).Add(float64(validationMetrics.lineSizePerRetentionHours[retentionHours])) + } if d.usageTracker != nil { for _, stream := range req.Streams { - lbs, _, _, err := d.parseStreamLabels(validationContext, stream.Labels, stream) + lbs, _, _, err := d.parseStreamLabels(validationContext, stream.Labels, stream, tenantRetentionHours) if err != nil { continue } @@ -1157,7 +1161,7 @@ type labelData struct { hash uint64 } -func (d *Distributor) parseStreamLabels(vContext validationContext, key string, stream logproto.Stream) (labels.Labels, string, uint64, error) { +func (d *Distributor) parseStreamLabels(vContext validationContext, key string, stream logproto.Stream, tenantRetentionHours string) (labels.Labels, string, uint64, error) { if val, ok := d.labelCache.Get(key); ok { return val.ls, val.ls.String(), val.hash, nil } @@ -1167,7 +1171,7 @@ func (d *Distributor) parseStreamLabels(vContext validationContext, key string, return nil, "", 0, fmt.Errorf(validation.InvalidLabelsErrorMsg, key, err) } - if err := d.validator.ValidateLabels(vContext, ls, stream); err != nil { + if err := d.validator.ValidateLabels(vContext, ls, stream, tenantRetentionHours); err != nil { return nil, "", 0, err } @@ -1260,3 +1264,25 @@ func newRingAndLifecycler(cfg RingConfig, instanceCount *atomic.Uint32, logger l func (d *Distributor) HealthyInstancesCount() int { return int(d.healthyInstancesCount.Load()) } + +type validationMetrics struct { + lineSizePerRetentionHours map[string]int + lineCountPerRetentionHours map[string]int + lineSize int + lineCount int +} + +func newValidationMetrics() validationMetrics { + return validationMetrics{ + lineSizePerRetentionHours: make(map[string]int), + lineCountPerRetentionHours: make(map[string]int), + } +} + +func (v *validationMetrics) compute(entry logproto.Entry, retentionHours string) { + totalEntrySize := util.EntryTotalSize(&entry) + v.lineSizePerRetentionHours[retentionHours] += totalEntrySize + v.lineCountPerRetentionHours[retentionHours]++ + v.lineSize += totalEntrySize + v.lineCount++ +} diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 91d3fcdf1367b..05db849da116b 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -44,6 +44,7 @@ import ( "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/runtime" + "github.com/grafana/loki/v3/pkg/util" "github.com/grafana/loki/v3/pkg/util/constants" fe "github.com/grafana/loki/v3/pkg/util/flagext" loki_flagext "github.com/grafana/loki/v3/pkg/util/flagext" @@ -1226,6 +1227,7 @@ func BenchmarkShardStream(b *testing.B) { func Benchmark_SortLabelsOnPush(b *testing.B) { limits := &validation.Limits{} flagext.DefaultValues(limits) + retentionHours := util.RetentionHours(time.Duration(limits.RetentionPeriod)) distributors, _ := prepare(&testing.T{}, 1, 5, limits, nil) d := distributors[0] request := makeWriteRequest(10, 10) @@ -1233,7 +1235,7 @@ func Benchmark_SortLabelsOnPush(b *testing.B) { for n := 0; n < b.N; n++ { stream := request.Streams[0] stream.Labels = `{buzz="f", a="b"}` - _, _, _, err := d.parseStreamLabels(vCtx, stream.Labels, stream) + _, _, _, err := d.parseStreamLabels(vCtx, stream.Labels, stream, retentionHours) if err != nil { panic("parseStreamLabels fail,err:" + err.Error()) } @@ -1273,6 +1275,7 @@ func TestParseStreamLabels(t *testing.T) { }, } { limits := tc.generateLimits() + retentionHours := util.RetentionHours(time.Duration(limits.RetentionPeriod)) distributors, _ := prepare(&testing.T{}, 1, 5, limits, nil) d := distributors[0] @@ -1281,7 +1284,7 @@ func TestParseStreamLabels(t *testing.T) { t.Run(tc.name, func(t *testing.T) { lbs, lbsString, hash, err := d.parseStreamLabels(vCtx, tc.origLabels, logproto.Stream{ Labels: tc.origLabels, - }) + }, retentionHours) if tc.expectedErr != nil { require.Equal(t, tc.expectedErr, err) return diff --git a/pkg/distributor/limits.go b/pkg/distributor/limits.go index 62098dac6d96f..4c20a9ca84292 100644 --- a/pkg/distributor/limits.go +++ b/pkg/distributor/limits.go @@ -3,6 +3,8 @@ package distributor import ( "time" + "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/v3/pkg/compactor/retention" "github.com/grafana/loki/v3/pkg/distributor/shardstreams" "github.com/grafana/loki/v3/pkg/loghttp/push" @@ -35,6 +37,7 @@ type Limits interface { MaxStructuredMetadataSize(userID string) int MaxStructuredMetadataCount(userID string) int OTLPConfig(userID string) push.OTLPConfig + RetentionHours(userID string, labels labels.Labels) string BlockIngestionUntil(userID string) time.Time BlockIngestionStatusCode(userID string) int diff --git a/pkg/distributor/validator.go b/pkg/distributor/validator.go index 5aea652225a56..effc785e53bd1 100644 --- a/pkg/distributor/validator.go +++ b/pkg/distributor/validator.go @@ -86,7 +86,7 @@ func (v Validator) getValidationContextForTime(now time.Time, userID string) val } // ValidateEntry returns an error if the entry is invalid and report metrics for invalid entries accordingly. -func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, labels labels.Labels, entry logproto.Entry) error { +func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, labels labels.Labels, entry logproto.Entry, retentionHours string) error { ts := entry.Timestamp.UnixNano() validation.LineLengthHist.Observe(float64(len(entry.Line))) structuredMetadataCount := len(entry.StructuredMetadata) @@ -97,8 +97,8 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la // Makes time string on the error message formatted consistently. formatedEntryTime := entry.Timestamp.Format(timeFormat) formatedRejectMaxAgeTime := time.Unix(0, vCtx.rejectOldSampleMaxAge).Format(timeFormat) - validation.DiscardedSamples.WithLabelValues(validation.GreaterThanMaxSampleAge, vCtx.userID).Inc() - validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, vCtx.userID).Add(entrySize) + validation.DiscardedSamples.WithLabelValues(validation.GreaterThanMaxSampleAge, vCtx.userID, retentionHours).Inc() + validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, vCtx.userID, retentionHours).Add(entrySize) if v.usageTracker != nil { v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.GreaterThanMaxSampleAge, labels, entrySize) } @@ -107,8 +107,8 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la if ts > vCtx.creationGracePeriod { formatedEntryTime := entry.Timestamp.Format(timeFormat) - validation.DiscardedSamples.WithLabelValues(validation.TooFarInFuture, vCtx.userID).Inc() - validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, vCtx.userID).Add(entrySize) + validation.DiscardedSamples.WithLabelValues(validation.TooFarInFuture, vCtx.userID, retentionHours).Inc() + validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, vCtx.userID, retentionHours).Add(entrySize) if v.usageTracker != nil { v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.TooFarInFuture, labels, entrySize) } @@ -120,8 +120,8 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la // an orthogonal concept (we need not use ValidateLabels in this context) // but the upstream cortex_validation pkg uses it, so we keep this // for parity. - validation.DiscardedSamples.WithLabelValues(validation.LineTooLong, vCtx.userID).Inc() - validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, vCtx.userID).Add(entrySize) + validation.DiscardedSamples.WithLabelValues(validation.LineTooLong, vCtx.userID, retentionHours).Inc() + validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, vCtx.userID, retentionHours).Add(entrySize) if v.usageTracker != nil { v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.LineTooLong, labels, entrySize) } @@ -130,8 +130,8 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la if structuredMetadataCount > 0 { if !vCtx.allowStructuredMetadata { - validation.DiscardedSamples.WithLabelValues(validation.DisallowedStructuredMetadata, vCtx.userID).Inc() - validation.DiscardedBytes.WithLabelValues(validation.DisallowedStructuredMetadata, vCtx.userID).Add(entrySize) + validation.DiscardedSamples.WithLabelValues(validation.DisallowedStructuredMetadata, vCtx.userID, retentionHours).Inc() + validation.DiscardedBytes.WithLabelValues(validation.DisallowedStructuredMetadata, vCtx.userID, retentionHours).Add(entrySize) if v.usageTracker != nil { v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.DisallowedStructuredMetadata, labels, entrySize) } @@ -139,8 +139,8 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la } if maxSize := vCtx.maxStructuredMetadataSize; maxSize != 0 && structuredMetadataSizeBytes > maxSize { - validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooLarge, vCtx.userID).Inc() - validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooLarge, vCtx.userID).Add(entrySize) + validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooLarge, vCtx.userID, retentionHours).Inc() + validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooLarge, vCtx.userID, retentionHours).Add(entrySize) if v.usageTracker != nil { v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.StructuredMetadataTooLarge, labels, entrySize) } @@ -148,8 +148,8 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la } if maxCount := vCtx.maxStructuredMetadataCount; maxCount != 0 && structuredMetadataCount > maxCount { - validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooMany, vCtx.userID).Inc() - validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooMany, vCtx.userID).Add(entrySize) + validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooMany, vCtx.userID, retentionHours).Inc() + validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooMany, vCtx.userID, retentionHours).Add(entrySize) if v.usageTracker != nil { v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.StructuredMetadataTooMany, labels, entrySize) } @@ -161,9 +161,9 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la } // Validate labels returns an error if the labels are invalid -func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, stream logproto.Stream) error { +func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, stream logproto.Stream, retentionHours string) error { if len(ls) == 0 { - validation.DiscardedSamples.WithLabelValues(validation.MissingLabels, ctx.userID).Inc() + validation.DiscardedSamples.WithLabelValues(validation.MissingLabels, ctx.userID, retentionHours).Inc() return fmt.Errorf(validation.MissingLabelsErrorMsg) } @@ -180,20 +180,20 @@ func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, strea } if numLabelNames > ctx.maxLabelNamesPerSeries { - updateMetrics(validation.MaxLabelNamesPerSeries, ctx.userID, stream) + updateMetrics(validation.MaxLabelNamesPerSeries, ctx.userID, stream, retentionHours) return fmt.Errorf(validation.MaxLabelNamesPerSeriesErrorMsg, stream.Labels, numLabelNames, ctx.maxLabelNamesPerSeries) } lastLabelName := "" for _, l := range ls { if len(l.Name) > ctx.maxLabelNameLength { - updateMetrics(validation.LabelNameTooLong, ctx.userID, stream) + updateMetrics(validation.LabelNameTooLong, ctx.userID, stream, retentionHours) return fmt.Errorf(validation.LabelNameTooLongErrorMsg, stream.Labels, l.Name) } else if len(l.Value) > ctx.maxLabelValueLength { - updateMetrics(validation.LabelValueTooLong, ctx.userID, stream) + updateMetrics(validation.LabelValueTooLong, ctx.userID, stream, retentionHours) return fmt.Errorf(validation.LabelValueTooLongErrorMsg, stream.Labels, l.Value) } else if cmp := strings.Compare(lastLabelName, l.Name); cmp == 0 { - updateMetrics(validation.DuplicateLabelNames, ctx.userID, stream) + updateMetrics(validation.DuplicateLabelNames, ctx.userID, stream, retentionHours) return fmt.Errorf(validation.DuplicateLabelNamesErrorMsg, stream.Labels, l.Name) } lastLabelName = l.Name @@ -210,8 +210,8 @@ func (v Validator) ShouldBlockIngestion(ctx validationContext, now time.Time) (b return now.Before(ctx.blockIngestionUntil), ctx.blockIngestionUntil, ctx.blockIngestionStatusCode } -func updateMetrics(reason, userID string, stream logproto.Stream) { - validation.DiscardedSamples.WithLabelValues(reason, userID).Add(float64(len(stream.Entries))) +func updateMetrics(reason, userID string, stream logproto.Stream, retentionHours string) { + validation.DiscardedSamples.WithLabelValues(reason, userID, retentionHours).Add(float64(len(stream.Entries))) bytes := util.EntriesTotalSize(stream.Entries) - validation.DiscardedBytes.WithLabelValues(reason, userID).Add(float64(bytes)) + validation.DiscardedBytes.WithLabelValues(reason, userID, retentionHours).Add(float64(bytes)) } diff --git a/pkg/distributor/validator_test.go b/pkg/distributor/validator_test.go index 9e51099dfad38..0881bd1a06214 100644 --- a/pkg/distributor/validator_test.go +++ b/pkg/distributor/validator_test.go @@ -15,6 +15,7 @@ import ( "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/util" "github.com/grafana/loki/v3/pkg/validation" ) @@ -130,8 +131,9 @@ func TestValidator_ValidateEntry(t *testing.T) { assert.NoError(t, err) v, err := NewValidator(o, nil) assert.NoError(t, err) + retentionHours := util.RetentionHours(v.RetentionPeriod(tt.userID)) - err = v.ValidateEntry(ctx, v.getValidationContextForTime(testTime, tt.userID), testStreamLabels, tt.entry) + err = v.ValidateEntry(ctx, v.getValidationContextForTime(testTime, tt.userID), testStreamLabels, tt.entry, retentionHours) assert.Equal(t, tt.expected, err) }) } @@ -224,12 +226,13 @@ func TestValidator_ValidateLabels(t *testing.T) { t.Run(tt.name, func(t *testing.T) { l := &validation.Limits{} flagext.DefaultValues(l) + retentionHours := util.RetentionHours(time.Duration(l.RetentionPeriod)) o, err := validation.NewOverrides(*l, tt.overrides) assert.NoError(t, err) v, err := NewValidator(o, nil) assert.NoError(t, err) - err = v.ValidateLabels(v.getValidationContextForTime(testTime, tt.userID), mustParseLabels(tt.labels), logproto.Stream{Labels: tt.labels}) + err = v.ValidateLabels(v.getValidationContextForTime(testTime, tt.userID), mustParseLabels(tt.labels), logproto.Stream{Labels: tt.labels}, retentionHours) assert.Equal(t, tt.expected, err) }) } diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index c6afcacfbdfde..b969a8e9b6d4e 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -294,8 +294,10 @@ func (i *instance) createStream(ctx context.Context, pushReqStream logproto.Stre err = i.streamCountLimiter.AssertNewStreamAllowed(i.instanceID) } + retentionHours := i.limiter.limits.RetentionHours(i.instanceID, labels) + if err != nil { - return i.onStreamCreationError(ctx, pushReqStream, err, labels) + return i.onStreamCreationError(ctx, pushReqStream, err, labels, retentionHours) } fp := i.getHashForLabels(labels) @@ -307,7 +309,7 @@ func (i *instance) createStream(ctx context.Context, pushReqStream logproto.Stre return nil, fmt.Errorf("failed to create stream: %w", err) } - s := newStream(chunkfmt, headfmt, i.cfg, i.limiter.rateLimitStrategy, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures, i.configs) + s := newStream(chunkfmt, headfmt, i.cfg, i.limiter.rateLimitStrategy, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures, i.configs, retentionHours) // record will be nil when replaying the wal (we don't want to rewrite wal entries as we replay them). if record != nil { @@ -325,7 +327,7 @@ func (i *instance) createStream(ctx context.Context, pushReqStream logproto.Stre return s, nil } -func (i *instance) onStreamCreationError(ctx context.Context, pushReqStream logproto.Stream, err error, labels labels.Labels) (*stream, error) { +func (i *instance) onStreamCreationError(ctx context.Context, pushReqStream logproto.Stream, err error, labels labels.Labels, retentionHours string) (*stream, error) { if i.configs.LogStreamCreation(i.instanceID) || i.cfg.KafkaIngestion.Enabled { l := level.Debug(util_log.Logger) @@ -341,9 +343,9 @@ func (i *instance) onStreamCreationError(ctx context.Context, pushReqStream logp ) } - validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(len(pushReqStream.Entries))) + validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID, retentionHours).Add(float64(len(pushReqStream.Entries))) bytes := util.EntriesTotalSize(pushReqStream.Entries) - validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(bytes)) + validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID, retentionHours).Add(float64(bytes)) if i.customStreamsTracker != nil { i.customStreamsTracker.DiscardedBytesAdd(ctx, i.instanceID, validation.StreamLimit, labels, float64(bytes)) } @@ -375,7 +377,8 @@ func (i *instance) createStreamByFP(ls labels.Labels, fp model.Fingerprint) (*st return nil, fmt.Errorf("failed to create stream for fingerprint: %w", err) } - s := newStream(chunkfmt, headfmt, i.cfg, i.limiter.rateLimitStrategy, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures, i.configs) + retentionHours := i.limiter.limits.RetentionHours(i.instanceID, ls) + s := newStream(chunkfmt, headfmt, i.cfg, i.limiter.rateLimitStrategy, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures, i.configs, retentionHours) i.onStreamCreated(s) diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 369d0ab2d7469..84a13e9fc898c 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -310,12 +310,13 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) { {Labels: "{app=\"test\",job=\"varlogs2\"}", Entries: entries(5, currentTime.Add(12*time.Nanosecond))}, } + retentionHours := limiter.limits.RetentionHours("test", nil) for _, testStream := range testStreams { stream, err := instance.getOrCreateStream(context.Background(), testStream, recordPool.GetRecord()) require.NoError(t, err) chunkfmt, headfmt, err := instance.chunkFormatAt(minTs(&testStream)) require.NoError(t, err) - chunk := newStream(chunkfmt, headfmt, cfg, limiter.rateLimitStrategy, "fake", 0, nil, true, NewStreamRateCalculator(), NilMetrics, nil, nil).NewChunk() + chunk := newStream(chunkfmt, headfmt, cfg, limiter.rateLimitStrategy, "fake", 0, nil, true, NewStreamRateCalculator(), NilMetrics, nil, nil, retentionHours).NewChunk() for _, entry := range testStream.Entries { dup, err := chunk.Append(&entry) require.False(t, dup) @@ -550,6 +551,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) { limits, err := validation.NewOverrides(l, nil) require.NoError(b, err) limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) + retentionHours := limiter.limits.RetentionHours("test", nil) ctx := context.Background() @@ -575,7 +577,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) { b.Run("addTailersToNewStream", func(b *testing.B) { for n := 0; n < b.N; n++ { - inst.addTailersToNewStream(newStream(chunkfmt, headfmt, nil, limiter.rateLimitStrategy, "fake", 0, lbs, true, NewStreamRateCalculator(), NilMetrics, nil, nil)) + inst.addTailersToNewStream(newStream(chunkfmt, headfmt, nil, limiter.rateLimitStrategy, "fake", 0, lbs, true, NewStreamRateCalculator(), NilMetrics, nil, nil, retentionHours)) } }) } diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index c4e64149f1658..8fccc74422e25 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -7,6 +7,7 @@ import ( "time" "github.com/grafana/dskit/ring" + "github.com/prometheus/prometheus/model/labels" "golang.org/x/time/rate" "github.com/grafana/loki/v3/pkg/distributor/shardstreams" @@ -31,6 +32,9 @@ type Limits interface { MaxLocalStreamsPerUser(userID string) int MaxGlobalStreamsPerUser(userID string) int PerStreamRateLimit(userID string) validation.RateLimit + StreamRetention(userID string) []validation.StreamRetention + RetentionHours(userID string, labels labels.Labels) string + RetentionPeriod(userID string) time.Duration ShardStreams(userID string) shardstreams.Config IngestionPartitionsTenantShardSize(userID string) int } diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index b36cbb290db7e..3d2a7bf1d0319 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -82,6 +82,8 @@ type stream struct { chunkHeadBlockFormat chunkenc.HeadBlockFmt configs *runtime.TenantConfigs + + retentionHours string } type chunkDesc struct { @@ -112,6 +114,7 @@ func newStream( metrics *ingesterMetrics, writeFailures *writefailures.Manager, configs *runtime.TenantConfigs, + retentionHours string, ) *stream { hashNoShard, _ := labels.HashWithoutLabels(make([]byte, 0, 1024), ShardLbName) return &stream{ @@ -132,7 +135,8 @@ func newStream( chunkFormat: chunkFormat, chunkHeadBlockFormat: headBlockFmt, - configs: configs, + configs: configs, + retentionHours: retentionHours, } } @@ -477,15 +481,15 @@ func (s *stream) reportMetrics(ctx context.Context, outOfOrderSamples, outOfOrde if s.unorderedWrites { name = validation.TooFarBehind } - validation.DiscardedSamples.WithLabelValues(name, s.tenant).Add(float64(outOfOrderSamples)) - validation.DiscardedBytes.WithLabelValues(name, s.tenant).Add(float64(outOfOrderBytes)) + validation.DiscardedSamples.WithLabelValues(name, s.tenant, s.retentionHours).Add(float64(outOfOrderSamples)) + validation.DiscardedBytes.WithLabelValues(name, s.tenant, s.retentionHours).Add(float64(outOfOrderBytes)) if usageTracker != nil { usageTracker.DiscardedBytesAdd(ctx, s.tenant, name, s.labels, float64(outOfOrderBytes)) } } if rateLimitedSamples > 0 { - validation.DiscardedSamples.WithLabelValues(validation.StreamRateLimit, s.tenant).Add(float64(rateLimitedSamples)) - validation.DiscardedBytes.WithLabelValues(validation.StreamRateLimit, s.tenant).Add(float64(rateLimitedBytes)) + validation.DiscardedSamples.WithLabelValues(validation.StreamRateLimit, s.tenant, s.retentionHours).Add(float64(rateLimitedSamples)) + validation.DiscardedBytes.WithLabelValues(validation.StreamRateLimit, s.tenant, s.retentionHours).Add(float64(rateLimitedBytes)) if usageTracker != nil { usageTracker.DiscardedBytesAdd(ctx, s.tenant, validation.StreamRateLimit, s.labels, float64(rateLimitedBytes)) } diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index 8bf7bfaf4ce98..02d19522c8da0 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -57,6 +57,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) + retentionHours := limiter.limits.RetentionHours("fake", nil) for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { @@ -79,6 +80,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { NilMetrics, nil, nil, + retentionHours, ) _, err := s.Push(context.Background(), []logproto.Entry{ @@ -115,7 +117,7 @@ func TestPushDeduplication(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) - + retentionHours := limiter.limits.RetentionHours("fake", nil) chunkfmt, headfmt := defaultChunkFormat(t) s := newStream( @@ -133,6 +135,7 @@ func TestPushDeduplication(t *testing.T) { NilMetrics, nil, nil, + retentionHours, ) written, err := s.Push(context.Background(), []logproto.Entry{ @@ -151,7 +154,7 @@ func TestPushDeduplicationExtraMetrics(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) - + retentionHours := limiter.limits.RetentionHours("fake", nil) chunkfmt, headfmt := defaultChunkFormat(t) buf := bytes.NewBuffer(nil) @@ -193,6 +196,7 @@ func TestPushDeduplicationExtraMetrics(t *testing.T) { metrics, manager, runtimeCfg, + retentionHours, ) _, err = s.Push(context.Background(), []logproto.Entry{ @@ -221,7 +225,7 @@ func TestPushRejectOldCounter(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) - + retentionHours := limiter.limits.RetentionHours("fake", nil) chunkfmt, headfmt := defaultChunkFormat(t) s := newStream( @@ -239,6 +243,7 @@ func TestPushRejectOldCounter(t *testing.T) { NilMetrics, nil, nil, + retentionHours, ) // counter should be 2 now since the first line will be deduped @@ -329,7 +334,7 @@ func TestEntryErrorCorrectlyReported(t *testing.T) { limits, err := validation.NewOverrides(l, nil) require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) - + retentionHours := limiter.limits.RetentionHours("fake", nil) chunkfmt, headfmt := defaultChunkFormat(t) s := newStream( @@ -347,6 +352,7 @@ func TestEntryErrorCorrectlyReported(t *testing.T) { NilMetrics, nil, nil, + retentionHours, ) s.highestTs = time.Now() @@ -368,7 +374,7 @@ func TestUnorderedPush(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) - + retentionHours := limiter.limits.RetentionHours("fake", nil) chunkfmt, headfmt := defaultChunkFormat(t) s := newStream( @@ -386,6 +392,7 @@ func TestUnorderedPush(t *testing.T) { NilMetrics, nil, nil, + retentionHours, ) for _, x := range []struct { @@ -471,7 +478,7 @@ func TestPushRateLimit(t *testing.T) { limits, err := validation.NewOverrides(l, nil) require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) - + retentionHours := limiter.limits.RetentionHours("fake", nil) chunkfmt, headfmt := defaultChunkFormat(t) s := newStream( @@ -489,6 +496,7 @@ func TestPushRateLimit(t *testing.T) { NilMetrics, nil, nil, + retentionHours, ) entries := []logproto.Entry{ @@ -511,7 +519,7 @@ func TestPushRateLimitAllOrNothing(t *testing.T) { limits, err := validation.NewOverrides(l, nil) require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) - + retentionHours := limiter.limits.RetentionHours("fake", nil) cfg := defaultConfig() chunkfmt, headfmt := defaultChunkFormat(t) @@ -530,6 +538,7 @@ func TestPushRateLimitAllOrNothing(t *testing.T) { NilMetrics, nil, nil, + retentionHours, ) entries := []logproto.Entry{ @@ -550,7 +559,7 @@ func TestReplayAppendIgnoresValidityWindow(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) - + retentionHours := limiter.limits.RetentionHours("fake", nil) cfg := defaultConfig() cfg.MaxChunkAge = time.Minute chunkfmt, headfmt := defaultChunkFormat(t) @@ -570,6 +579,7 @@ func TestReplayAppendIgnoresValidityWindow(t *testing.T) { NilMetrics, nil, nil, + retentionHours, ) base := time.Now() @@ -618,9 +628,10 @@ func Benchmark_PushStream(b *testing.B) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(b, err) limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) + retentionHours := limiter.limits.RetentionHours("fake", nil) chunkfmt, headfmt := defaultChunkFormat(b) - s := newStream(chunkfmt, headfmt, &Config{MaxChunkAge: 24 * time.Hour}, limiter.rateLimitStrategy, "fake", model.Fingerprint(0), ls, true, NewStreamRateCalculator(), NilMetrics, nil, nil) + s := newStream(chunkfmt, headfmt, &Config{MaxChunkAge: 24 * time.Hour}, limiter.rateLimitStrategy, "fake", model.Fingerprint(0), ls, true, NewStreamRateCalculator(), NilMetrics, nil, nil, retentionHours) expr, err := syntax.ParseLogSelector(`{namespace="loki-dev"}`, true) require.NoError(b, err) t, err := newTailer("foo", expr, &fakeTailServer{}, 10) diff --git a/pkg/ingester/streams_map_test.go b/pkg/ingester/streams_map_test.go index 273c489d34d4a..91da719a17b14 100644 --- a/pkg/ingester/streams_map_test.go +++ b/pkg/ingester/streams_map_test.go @@ -14,6 +14,7 @@ func TestStreamsMap(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) + retentionHours := limiter.limits.RetentionHours("fake", nil) chunkfmt, headfmt := defaultChunkFormat(t) ss := []*stream{ @@ -32,6 +33,7 @@ func TestStreamsMap(t *testing.T) { NilMetrics, nil, nil, + retentionHours, ), newStream( chunkfmt, @@ -48,6 +50,7 @@ func TestStreamsMap(t *testing.T) { NilMetrics, nil, nil, + retentionHours, ), } var s *stream diff --git a/pkg/util/entry_size.go b/pkg/util/entry_size.go index 4f2c8f0bf82dc..925c0b256cbc6 100644 --- a/pkg/util/entry_size.go +++ b/pkg/util/entry_size.go @@ -1,7 +1,10 @@ package util import ( + "fmt" + "math" "slices" + "time" "github.com/grafana/loki/pkg/push" @@ -32,3 +35,7 @@ func StructuredMetadataSize(metas push.LabelsAdapter) int { } return size } + +func RetentionHours(retention time.Duration) string { + return fmt.Sprintf("%d", int64(math.Floor(retention.Hours()))) +} diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index f271715652e51..bd4c22f5729fd 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -28,6 +28,7 @@ import ( ruler_config "github.com/grafana/loki/v3/pkg/ruler/config" "github.com/grafana/loki/v3/pkg/ruler/util" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding" + retentionUtil "github.com/grafana/loki/v3/pkg/util" "github.com/grafana/loki/v3/pkg/util/flagext" util_log "github.com/grafana/loki/v3/pkg/util/log" "github.com/grafana/loki/v3/pkg/util/validation" @@ -257,6 +258,16 @@ type StreamRetention struct { Matchers []*labels.Matcher `yaml:"-" json:"-"` // populated during validation. } +func (r *StreamRetention) Matches(lbs labels.Labels) bool { + for _, matcher := range r.Matchers { + if !matcher.Matches(lbs.Get(matcher.Name)) { + return false + } + } + + return true +} + // LimitError are errors that do not comply with the limits specified. type LimitError string @@ -959,6 +970,27 @@ func (o *Overrides) StreamRetention(userID string) []StreamRetention { return o.getOverridesForUser(userID).StreamRetention } +// RetentionHours returns the retention period for a given user. +func (o *Overrides) RetentionHours(userID string, ls labels.Labels) string { + streamRetentions := o.getOverridesForUser(userID).StreamRetention + + selectedRetention := o.getOverridesForUser(userID).RetentionPeriod + highestPriority := -1 + + if len(ls) > 0 { + for _, retention := range streamRetentions { + if retention.Matches(ls) { + if retention.Priority > highestPriority { + highestPriority = retention.Priority + selectedRetention = retention.Period + } + } + } + } + + return retentionUtil.RetentionHours(time.Duration(selectedRetention)) +} + func (o *Overrides) UnorderedWrites(userID string) bool { return o.getOverridesForUser(userID).UnorderedWrites } diff --git a/pkg/validation/validate.go b/pkg/validation/validate.go index ff681ac8d0936..31e1729e264cc 100644 --- a/pkg/validation/validate.go +++ b/pkg/validation/validate.go @@ -115,7 +115,7 @@ var DiscardedBytes = promauto.NewCounterVec( Name: "discarded_bytes_total", Help: "The total number of bytes that were discarded.", }, - []string{ReasonLabel, "tenant"}, + []string{ReasonLabel, "tenant", "retention_hours"}, ) // DiscardedSamples is a metric of the number of discarded samples, by reason. @@ -125,7 +125,7 @@ var DiscardedSamples = promauto.NewCounterVec( Name: "discarded_samples_total", Help: "The total number of samples that were discarded.", }, - []string{ReasonLabel, "tenant"}, + []string{ReasonLabel, "tenant", "retention_hours"}, ) var LineLengthHist = promauto.NewHistogram(prometheus.HistogramOpts{