Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve watermark calculation by adding the option to cache it #6413

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
1,267 changes: 640 additions & 627 deletions proto/gen/rill/runtime/v1/queries.pb.go

Large diffs are not rendered by default.

29 changes: 29 additions & 0 deletions proto/gen/rill/runtime/v1/queries.pb.validate.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions proto/gen/rill/runtime/v1/runtime.swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6428,6 +6428,10 @@ definitions:
type: string
format: date-time
title: Not optional, not null
watermark:
type: string
format: date-time
title: Not optional, not null
interval:
$ref: '#/definitions/TimeRangeSummaryInterval'
title: Not optional, not null
Expand Down
2 changes: 2 additions & 0 deletions proto/rill/runtime/v1/queries.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,8 @@ message TimeRangeSummary {
google.protobuf.Timestamp min = 1;
// Not optional, not null
google.protobuf.Timestamp max = 2;
// Not optional, not null
google.protobuf.Timestamp watermark = 4;
message Interval {
begelundmuller marked this conversation as resolved.
Show resolved Hide resolved
int32 months = 1;
int32 days = 2;
Expand Down
42 changes: 36 additions & 6 deletions runtime/metricsview/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Executor struct {
olapRelease func()
instanceCfg drivers.InstanceConfig

watermark time.Time
min, max, watermark time.Time
}

// NewExecutor creates a new Executor for the provided metrics view.
Expand Down Expand Up @@ -84,7 +84,7 @@ func (e *Executor) CacheKey(ctx context.Context) ([]byte, bool, error) {
return []byte(""), true, nil
}
// watermark is the default cache key for streaming metrics views
watermark, err := e.loadWatermark(ctx, nil)
_, _, watermark, err := e.Timestamps(ctx, nil)
if err != nil {
return nil, false, err
}
Expand Down Expand Up @@ -127,10 +127,40 @@ func (e *Executor) ValidateQuery(qry *Query) error {
panic("not implemented")
}

// Watermark returns the current watermark of the metrics view.
// If the watermark resolves to null, it defaults to the current time.
func (e *Executor) Watermark(ctx context.Context) (time.Time, error) {
return e.loadWatermark(ctx, nil)
// Timestamps queries min, max and watermark for the metrics view
func (e *Executor) Timestamps(ctx context.Context, executionTime *time.Time) (time.Time, time.Time, time.Time, error) {
begelundmuller marked this conversation as resolved.
Show resolved Hide resolved
if !e.min.IsZero() {
return e.min, e.max, e.watermark, nil
}

var err error
switch e.olap.Dialect() {
case drivers.DialectDuckDB:
e.min, e.max, e.watermark, err = e.resolveDuckDB(ctx)
case drivers.DialectDruid:
e.min, e.max, e.watermark, err = e.resolveDruid(ctx)
case drivers.DialectClickHouse:
e.min, e.max, e.watermark, err = e.resolveClickHouseAndPinot(ctx)
case drivers.DialectPinot:
e.min, e.max, e.watermark, err = e.resolveClickHouseAndPinot(ctx)
default:
return time.Time{}, time.Time{}, time.Time{}, fmt.Errorf("not available for dialect '%s'", e.olap.Dialect())
}
if err != nil {
return time.Time{}, time.Time{}, time.Time{}, err
}
if executionTime != nil {
e.watermark = *executionTime
}
begelundmuller marked this conversation as resolved.
Show resolved Hide resolved
return e.min, e.max, e.watermark, nil
}

// BindQuery allows to set min, max and watermark from a cache.
func (e *Executor) BindQuery(ctx context.Context, qry *Query, minTime, maxTime, watermark time.Time) error {
e.min = minTime
e.max = maxTime
e.watermark = watermark
return e.rewriteQueryTimeRanges(ctx, qry, nil)
}

// Schema returns a schema for the metrics view's dimensions and measures.
Expand Down
67 changes: 9 additions & 58 deletions runtime/metricsview/executor_rewrite_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@ import (
"fmt"
"time"

"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/pkg/duration"
"github.com/rilldata/rill/runtime/pkg/timeutil"
)

// rewriteQueryTimeRanges rewrites the time ranges in the query to fixed start/end timestamps.
func (e *Executor) rewriteQueryTimeRanges(ctx context.Context, qry *Query, executionTime *time.Time) error {
_, _, watermark, err := e.Timestamps(ctx, executionTime)
if err != nil {
return fmt.Errorf("failed to fetch time stamps: %w", err)
}

tz := time.UTC
if qry.TimeZone != "" {
var err error
Expand All @@ -22,12 +26,12 @@ func (e *Executor) rewriteQueryTimeRanges(ctx context.Context, qry *Query, execu
}
}

err := e.resolveTimeRange(ctx, qry.TimeRange, tz, executionTime)
err = e.resolveTimeRange(qry.TimeRange, tz, watermark)
if err != nil {
return fmt.Errorf("failed to resolve time range: %w", err)
}

err = e.resolveTimeRange(ctx, qry.ComparisonTimeRange, tz, executionTime)
err = e.resolveTimeRange(qry.ComparisonTimeRange, tz, watermark)
if err != nil {
return fmt.Errorf("failed to resolve comparison time range: %w", err)
}
Expand All @@ -36,17 +40,13 @@ func (e *Executor) rewriteQueryTimeRanges(ctx context.Context, qry *Query, execu
}

// resolveTimeRange resolves the given time range, ensuring only its Start and End properties are populated.
func (e *Executor) resolveTimeRange(ctx context.Context, tr *TimeRange, tz *time.Location, executionTime *time.Time) error {
func (e *Executor) resolveTimeRange(tr *TimeRange, tz *time.Location, watermark time.Time) error {
if tr == nil || tr.IsZero() {
return nil
}

if tr.Start.IsZero() && tr.End.IsZero() {
t, err := e.loadWatermark(ctx, executionTime)
if err != nil {
return err
}
tr.End = t
tr.End = watermark
}

var isISO bool
Expand Down Expand Up @@ -117,52 +117,3 @@ func (e *Executor) resolveTimeRange(ctx context.Context, tr *TimeRange, tz *time

return nil
}

// resolveWatermark resolves the metric view's watermark expression.
// If the resolved time is zero, it defaults to the current time.
func (e *Executor) loadWatermark(ctx context.Context, executionTime *time.Time) (time.Time, error) {
if executionTime != nil {
return *executionTime, nil
}

if !e.watermark.IsZero() {
return e.watermark, nil
}

dialect := e.olap.Dialect()

var expr string
if e.metricsView.WatermarkExpression != "" {
expr = e.metricsView.WatermarkExpression
} else if e.metricsView.TimeDimension != "" {
expr = fmt.Sprintf("MAX(%s)", dialect.EscapeIdentifier(e.metricsView.TimeDimension))
} else {
return time.Time{}, errors.New("cannot determine time anchor for relative time range")
}

sql := fmt.Sprintf("SELECT %s FROM %s", expr, dialect.EscapeTable(e.metricsView.Database, e.metricsView.DatabaseSchema, e.metricsView.Table))

res, err := e.olap.Execute(ctx, &drivers.Statement{
Query: sql,
Priority: e.priority,
ExecutionTimeout: defaultInteractiveTimeout,
})
if err != nil {
return time.Time{}, err
}
defer res.Close()

var t time.Time
if res.Next() {
if err := res.Scan(&t); err != nil {
return time.Time{}, fmt.Errorf("failed to scan time anchor: %w", err)
}
}

if t.IsZero() {
t = time.Now()
}

e.watermark = t
return t, nil
}
Loading
Loading