Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazy evaluation of timestamps for metrics queries #6526

Merged
merged 1 commit into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions runtime/metricsview/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type TimestampsResult struct {
Min time.Time
Max time.Time
Watermark time.Time
Now time.Time
}

// NewExecutor creates a new Executor for the provided metrics view.
Expand Down Expand Up @@ -152,6 +153,7 @@ func (e *Executor) Timestamps(ctx context.Context) (TimestampsResult, error) {
return TimestampsResult{}, err
}

e.timestamps.Now = time.Now()
return e.timestamps, nil
}

Expand Down
55 changes: 30 additions & 25 deletions runtime/metricsview/executor_rewrite_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@ func (e *Executor) rewriteQueryTimeRanges(ctx context.Context, qry *Query, execu
return nil
}

ts, err := e.Timestamps(ctx)
if err != nil {
return fmt.Errorf("failed to fetch timestamps: %w", err)
}

tz := time.UTC
if qry.TimeZone != "" {
var err error
Expand All @@ -31,22 +26,12 @@ func (e *Executor) rewriteQueryTimeRanges(ctx context.Context, qry *Query, execu
}
}

maxTime := ts.Max
watermark := ts.Watermark
now := time.Now()
if executionTime != nil {
// all end anchors should use execution time when provided
maxTime = *executionTime
watermark = *executionTime
now = *executionTime
}

err = e.resolveTimeRange(qry.TimeRange, tz, ts.Min, maxTime, watermark, now)
err := e.resolveTimeRange(ctx, qry.TimeRange, tz, executionTime)
if err != nil {
return fmt.Errorf("failed to resolve time range: %w", err)
}

err = e.resolveTimeRange(qry.ComparisonTimeRange, tz, ts.Min, maxTime, watermark, now)
err = e.resolveTimeRange(ctx, qry.ComparisonTimeRange, tz, executionTime)
if err != nil {
return fmt.Errorf("failed to resolve comparison time range: %w", err)
}
Expand All @@ -55,28 +40,40 @@ func (e *Executor) rewriteQueryTimeRanges(ctx context.Context, qry *Query, execu
}

// resolveTimeRange resolves the given time range, ensuring only its Start and End properties are populated.
func (e *Executor) resolveTimeRange(tr *TimeRange, tz *time.Location, minTime, maxTime, watermark, now time.Time) error {
func (e *Executor) resolveTimeRange(ctx context.Context, tr *TimeRange, tz *time.Location, executionTime *time.Time) error {
if tr == nil || tr.IsZero() {
return nil
}

if tr.Expression == "" {
return e.resolveISOTimeRange(tr, tz, watermark)
return e.resolveISOTimeRange(ctx, tr, tz, executionTime)
}
if !tr.Start.IsZero() || !tr.End.IsZero() || tr.IsoDuration != "" || tr.IsoOffset != "" || tr.RoundToGrain != TimeGrainUnspecified {
return errors.New("other fields are not supported when expression is provided")
}

// TODO: Implement lazy evaluation where we only evaluate timestamps if required for the time expression.
ts, err := e.Timestamps(ctx)
if err != nil {
return fmt.Errorf("failed to fetch timestamps: %w", err)
}
if executionTime != nil {
// If provided, all the end anchors should use the execution time.
ts.Watermark = *executionTime
ts.Max = *executionTime
ts.Now = *executionTime
}

rillTime, err := rilltime.Parse(tr.Expression, rilltime.ParseOptions{})
if err != nil {
return err
}

tr.Start, tr.End, err = rillTime.Eval(rilltime.EvalOptions{
Now: now,
MinTime: minTime,
MaxTime: maxTime,
Watermark: watermark,
Now: ts.Now,
MinTime: ts.Min,
MaxTime: ts.Max,
Watermark: ts.Watermark,
FirstDay: int(e.metricsView.FirstDayOfWeek),
FirstMonth: int(e.metricsView.FirstMonthOfYear),
})
Expand All @@ -94,9 +91,17 @@ func (e *Executor) resolveTimeRange(tr *TimeRange, tz *time.Location, minTime, m
}

// resolveISOTimeRange resolves the given time range where either only start/end is specified along with ISO duration/offset, ensuring only its Start and End properties are populated.
func (e *Executor) resolveISOTimeRange(tr *TimeRange, tz *time.Location, watermark time.Time) error {
func (e *Executor) resolveISOTimeRange(ctx context.Context, tr *TimeRange, tz *time.Location, executionTime *time.Time) error {
if tr.Start.IsZero() && tr.End.IsZero() {
tr.End = watermark
if executionTime == nil {
ts, err := e.Timestamps(ctx)
if err != nil {
return fmt.Errorf("failed to fetch timestamps: %w", err)
}
executionTime = &ts.Watermark
}

tr.End = *executionTime
}

var isISO bool
Expand Down
72 changes: 36 additions & 36 deletions runtime/metricsview/executor_timestamps.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (e *Executor) resolveDuckDBClickHouseAndPinot(ctx context.Context) (Timesta
if e.metricsView.WatermarkExpression != "" {
watermarkExpr = e.metricsView.WatermarkExpression
} else {
watermarkExpr = fmt.Sprintf("MAX(%s)", timeDim)
watermarkExpr = fmt.Sprintf("max(%s)", timeDim)
}

rangeSQL := fmt.Sprintf(
Expand Down Expand Up @@ -146,51 +146,51 @@ func (e *Executor) resolveDruid(ctx context.Context) (TimestampsResult, error) {
return nil
})

group.Go(func() error {
var watermarkExpr string
if e.metricsView.WatermarkExpression != "" {
watermarkExpr = e.metricsView.WatermarkExpression
} else {
watermarkExpr = fmt.Sprintf("MAX(%s)", timeDim)
}

maxSQL := fmt.Sprintf(
"SELECT %[1]s as \"watermark\" FROM %[2]s %[3]s",
watermarkExpr,
escapedTableName,
filter,
)

rows, err := e.olap.Execute(ctx, &drivers.Statement{
Query: maxSQL,
Priority: e.priority,
ExecutionTimeout: defaultExecutionTimeout,
})
if err != nil {
return err
}
defer rows.Close()

if rows.Next() {
err = rows.Scan(&ts.Watermark)
if e.metricsView.WatermarkExpression != "" {
group.Go(func() error {
maxSQL := fmt.Sprintf(
"SELECT %[1]s as \"watermark\" FROM %[2]s %[3]s",
e.metricsView.WatermarkExpression,
escapedTableName,
filter,
)

rows, err := e.olap.Execute(ctx, &drivers.Statement{
Query: maxSQL,
Priority: e.priority,
ExecutionTimeout: defaultExecutionTimeout,
})
if err != nil {
return err
}
} else {
err = rows.Err()
if err != nil {
return err
defer rows.Close()

if rows.Next() {
err = rows.Scan(&ts.Watermark)
if err != nil {
return err
}
} else {
err = rows.Err()
if err != nil {
return err
}
return errors.New("no rows returned for max time")
}
return errors.New("no rows returned for max time")
}
return nil
})
return nil
})
}

err := group.Wait()
if err != nil {
return TimestampsResult{}, err
}

// If there's no custom watermark expression, the watermark defaults to the max time.
if e.metricsView.WatermarkExpression == "" {
ts.Watermark = ts.Max
}

return ts, nil
}

Expand Down
Loading