Skip to content

Commit

Permalink
PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
AdityaHegde committed Jan 15, 2025
1 parent 0cc9827 commit a6a60b0
Show file tree
Hide file tree
Showing 14 changed files with 847 additions and 1,235 deletions.
1,493 changes: 699 additions & 794 deletions proto/gen/rill/runtime/v1/queries.pb.go

Large diffs are not rendered by default.

137 changes: 0 additions & 137 deletions proto/gen/rill/runtime/v1/queries.pb.validate.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 0 additions & 15 deletions proto/gen/rill/runtime/v1/runtime.swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3093,18 +3093,6 @@ definitions:
type: string
type:
$ref: '#/definitions/runtimev1Type'
TimeRangeSummaryInterval:
type: object
properties:
months:
type: integer
format: int32
days:
type: integer
format: int32
micros:
type: string
format: int64
TopKEntry:
type: object
properties:
Expand Down Expand Up @@ -6432,9 +6420,6 @@ definitions:
type: string
format: date-time
title: Not optional, not null
interval:
$ref: '#/definitions/TimeRangeSummaryInterval'
title: Not optional, not null
v1TimeSeriesResponse:
type: object
properties:
Expand Down
7 changes: 0 additions & 7 deletions proto/rill/runtime/v1/queries.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1026,13 +1026,6 @@ message TimeRangeSummary {
google.protobuf.Timestamp max = 2;
// Not optional, not null
google.protobuf.Timestamp watermark = 4;
message Interval {
int32 months = 1;
int32 days = 2;
int64 micros = 3;
}
// Not optional, not null
Interval interval = 3;
}

message ColumnCardinalityRequest {
Expand Down
38 changes: 20 additions & 18 deletions runtime/metricsview/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ type Executor struct {
instanceCfg drivers.InstanceConfig

min, max, watermark time.Time

Check failure on line 38 in runtime/metricsview/executor.go

View workflow job for this annotation

GitHub Actions / lint

field `max` is unused (unused)
timestamps TimestampsResult
}

type TimestampsResult struct {
Min time.Time
Max time.Time
Watermark time.Time
}

// NewExecutor creates a new Executor for the provided metrics view.
Expand Down Expand Up @@ -84,11 +91,11 @@ func (e *Executor) CacheKey(ctx context.Context) ([]byte, bool, error) {
return []byte(""), true, nil
}
// watermark is the default cache key for streaming metrics views
_, _, watermark, err := e.Timestamps(ctx, nil)
ts, err := e.Timestamps(ctx, nil)
if err != nil {
return nil, false, err
}
return []byte(watermark.Format(time.RFC3339)), true, nil
return []byte(ts.Watermark.Format(time.RFC3339)), true, nil
}

res, err := e.olap.Execute(ctx, &drivers.Statement{
Expand Down Expand Up @@ -128,38 +135,33 @@ func (e *Executor) ValidateQuery(qry *Query) error {
}

// Timestamps queries min, max and watermark for the metrics view
func (e *Executor) Timestamps(ctx context.Context, executionTime *time.Time) (time.Time, time.Time, time.Time, error) {
func (e *Executor) Timestamps(ctx context.Context, executionTime *time.Time) (TimestampsResult, error) {
if !e.min.IsZero() {
return e.min, e.max, e.watermark, nil
return e.timestamps, nil
}

var err error
switch e.olap.Dialect() {
case drivers.DialectDuckDB:
e.min, e.max, e.watermark, err = e.resolveDuckDB(ctx)
case drivers.DialectDuckDB, drivers.DialectClickHouse, drivers.DialectPinot:
e.timestamps, err = e.resolveDuckDBClickHouseAndPinot(ctx)
case drivers.DialectDruid:
e.min, e.max, e.watermark, err = e.resolveDruid(ctx)
case drivers.DialectClickHouse:
e.min, e.max, e.watermark, err = e.resolveClickHouseAndPinot(ctx)
case drivers.DialectPinot:
e.min, e.max, e.watermark, err = e.resolveClickHouseAndPinot(ctx)
e.timestamps, err = e.resolveDruid(ctx)
default:
return time.Time{}, time.Time{}, time.Time{}, fmt.Errorf("not available for dialect '%s'", e.olap.Dialect())
return TimestampsResult{}, fmt.Errorf("not available for dialect '%s'", e.olap.Dialect())
}
if err != nil {
return time.Time{}, time.Time{}, time.Time{}, err
return TimestampsResult{}, err
}

if executionTime != nil {
e.watermark = *executionTime
}
return e.min, e.max, e.watermark, nil
return e.timestamps, nil
}

// BindQuery allows to set min, max and watermark from a cache.
func (e *Executor) BindQuery(ctx context.Context, qry *Query, minTime, maxTime, watermark time.Time) error {
e.min = minTime
e.max = maxTime
e.watermark = watermark
func (e *Executor) BindQuery(ctx context.Context, qry *Query, timestamps TimestampsResult) error {
e.timestamps = timestamps
return e.rewriteQueryTimeRanges(ctx, qry, nil)
}

Expand Down
12 changes: 8 additions & 4 deletions runtime/metricsview/executor_rewrite_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@ import (

// rewriteQueryTimeRanges rewrites the time ranges in the query to fixed start/end timestamps.
func (e *Executor) rewriteQueryTimeRanges(ctx context.Context, qry *Query, executionTime *time.Time) error {
_, _, watermark, err := e.Timestamps(ctx, executionTime)
if e.metricsView.TimeDimension == "" {
return nil
}

ts, err := e.Timestamps(ctx, executionTime)
if err != nil {
return fmt.Errorf("failed to fetch time stamps: %w", err)
return fmt.Errorf("failed to fetch timestamps: %w", err)
}

tz := time.UTC
Expand All @@ -26,12 +30,12 @@ func (e *Executor) rewriteQueryTimeRanges(ctx context.Context, qry *Query, execu
}
}

err = e.resolveTimeRange(qry.TimeRange, tz, watermark)
err = e.resolveTimeRange(qry.TimeRange, tz, ts.Watermark)
if err != nil {
return fmt.Errorf("failed to resolve time range: %w", err)
}

err = e.resolveTimeRange(qry.ComparisonTimeRange, tz, watermark)
err = e.resolveTimeRange(qry.ComparisonTimeRange, tz, ts.Watermark)
if err != nil {
return fmt.Errorf("failed to resolve comparison time range: %w", err)
}
Expand Down
Loading

0 comments on commit a6a60b0

Please sign in to comment.