Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add new rill time parser backend support #6326

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/MicahParks/keyfunc v1.9.0
github.com/NYTimes/gziphandler v1.1.1
github.com/XSAM/otelsql v0.27.0
github.com/alecthomas/participle/v2 v2.1.1
github.com/alicebob/miniredis v2.5.0+incompatible
github.com/apache/arrow/go/v15 v15.0.2
github.com/aws/aws-sdk-go v1.49.0
Expand Down Expand Up @@ -174,7 +175,6 @@ require (
github.com/Microsoft/hcsshim v0.11.5 // indirect
github.com/ProtonMail/go-crypto v1.0.0 // indirect
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect
github.com/alecthomas/participle/v2 v2.1.1 // indirect
github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect
github.com/andybalholm/brotli v1.1.1 // indirect
github.com/apache/arrow-go/v18 v18.0.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -739,8 +739,12 @@ github.com/ajstarks/deck v0.0.0-20200831202436-30c9fc6549a9/go.mod h1:JynElWSGnm
github.com/ajstarks/deck/generate v0.0.0-20210309230005-c3f852c02e19/go.mod h1:T13YZdzov6OU0A1+RfKZiZN9ca6VeKdBdyDV+BY97Tk=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/ajstarks/svgo v0.0.0-20211024235047-1546f124cd8b/go.mod h1:1KcenG0jGWcpt8ov532z81sp/kMMUG485J2InIOyADM=
github.com/alecthomas/assert/v2 v2.3.0 h1:mAsH2wmvjsuvyBvAmCtm7zFsBlb8mIHx5ySLVdDZXL0=
github.com/alecthomas/assert/v2 v2.3.0/go.mod h1:pXcQ2Asjp247dahGEmsZ6ru0UVwnkhktn7S0bBDLxvQ=
github.com/alecthomas/participle/v2 v2.1.1 h1:hrjKESvSqGHzRb4yW1ciisFJ4p3MGYih6icjJvbsmV8=
github.com/alecthomas/participle/v2 v2.1.1/go.mod h1:Y1+hAs8DHPmc3YUFzqllV+eSQ9ljPTk0ZkPMtEdAx2c=
github.com/alecthomas/repr v0.2.0 h1:HAzS41CIzNW5syS8Mf9UwXhNH1J9aix/BvDRf1Ml2Yk=
github.com/alecthomas/repr v0.2.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
Expand Down Expand Up @@ -1594,6 +1598,8 @@ github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0m
github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/heetch/avro v0.4.4/go.mod h1:c0whqijPh/C+RwnXzAHFit01tdtf7gMeEHYSbICxJjU=
github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM=
github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg=
github.com/hinshun/vt10x v0.0.0-20220119200601-820417d04eec h1:qv2VnGeEQHchGaZ/u7lxST/RaJw+cv273q79D81Xbog=
github.com/hinshun/vt10x v0.0.0-20220119200601-820417d04eec/go.mod h1:Q48J4R4DvxnHolD5P8pOtXigYlRuPLGl6moFx3ulM68=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
Expand Down
12 changes: 7 additions & 5 deletions runtime/compilers/rillv1/parse_explore.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,13 @@ func (p *Parser) parseExplore(node *Node) error {
}
isNewFormat = rt.IsNewFormat
}
if ctr.Offset != "" && isNewFormat {
return fmt.Errorf("offset cannot be provided along with rill time range")
}
if err := validateISO8601(ctr.Offset, false, false); err != nil {
return fmt.Errorf("invalid comparison offset %q: %w", ctr.Offset, err)
if ctr.Offset != "" {
if isNewFormat {
return fmt.Errorf("offset cannot be provided along with rill time range")
}
if err := validateISO8601(ctr.Offset, false, false); err != nil {
return fmt.Errorf("invalid comparison offset %q: %w", ctr.Offset, err)
}
}
res.ComparisonTimeRanges = append(res.ComparisonTimeRanges, &runtimev1.ExploreComparisonTimeRange{
Offset: ctr.Offset,
Expand Down
13 changes: 7 additions & 6 deletions runtime/compilers/rillv1/parse_metrics_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,12 +758,13 @@ func (p *Parser) parseMetricsView(node *Node) error {
isNewFormat = rt.IsNewFormat
}

if o.Offset != "" && isNewFormat {
return fmt.Errorf("offset cannot be provided along with rill time range")
}
err := validateISO8601(o.Offset, false, false)
if err != nil {
return fmt.Errorf("invalid offset in comparison_offsets: %w", err)
if o.Offset != "" {
if isNewFormat {
return fmt.Errorf("offset cannot be provided along with rill time range")
}
if err := validateISO8601(o.Offset, false, false); err != nil {
return fmt.Errorf("invalid offset in comparison_offsets: %w", err)
}
}
}
}
Expand Down
64 changes: 32 additions & 32 deletions runtime/metricsview/executor_rewrite_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,37 @@ import (
"github.com/rilldata/rill/runtime/pkg/timeutil"
)

func (e *Executor) GetMinTime(ctx context.Context, colName string) (time.Time, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Can drop the Get, so just MinTime, similar to Watermark.
  2. All the executors exported functions should be in runtime/metricsview/executor.go file – this probably belongs next to the Watermark function.
  3. Why does this accept a column name? We currently only support one time dimension for metrics views.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update for 1 and 2. For 3, we are currently reusing the method for time_range_start and time_range_end which accepts a column name.

if colName == "" {
// we cannot get min time without a time dimension or a column name specified. return a 0 time
return time.Time{}, nil
}

dialect := e.olap.Dialect()
sql := fmt.Sprintf("SELECT MIN(%s) FROM %s", dialect.EscapeIdentifier(colName), dialect.EscapeTable(e.metricsView.Database, e.metricsView.DatabaseSchema, e.metricsView.Table))

res, err := e.olap.Execute(ctx, &drivers.Statement{
Query: sql,
Priority: e.priority,
ExecutionTimeout: defaultInteractiveTimeout,
})
if err != nil {
return time.Time{}, err
}
defer res.Close()

var t time.Time
if res.Next() {
if err := res.Scan(&t); err != nil {
return time.Time{}, fmt.Errorf("failed to scan time anchor: %w", err)
}
}
if res.Err() != nil {
return time.Time{}, fmt.Errorf("failed to scan time anchor: %w", res.Err())
}
return t, nil
}

// rewriteQueryTimeRanges rewrites the time ranges in the query to fixed start/end timestamps.
func (e *Executor) rewriteQueryTimeRanges(ctx context.Context, qry *Query, executionTime *time.Time) error {
tz := time.UTC
Expand Down Expand Up @@ -56,7 +87,7 @@ func (e *Executor) resolveTimeRange(ctx context.Context, tr *TimeRange, tz *time
return err
}

minTime, err := e.getMinTime(ctx)
minTime, err := e.GetMinTime(ctx, e.metricsView.TimeDimension)
if err != nil {
return err
}
Expand All @@ -72,8 +103,6 @@ func (e *Executor) resolveTimeRange(ctx context.Context, tr *TimeRange, tz *time
return err
}

fmt.Println(tr.RillTime, tr.Start, tr.End)

// Clear all other fields than Start and End
tr.RillTime = ""
tr.IsoDuration = ""
Expand Down Expand Up @@ -210,32 +239,3 @@ func (e *Executor) loadWatermark(ctx context.Context, executionTime *time.Time)
e.watermark = t
return t, nil
}

func (e *Executor) getMinTime(ctx context.Context) (time.Time, error) {
if e.metricsView.TimeDimension == "" {
// we cannot get min time without a time dimension specified. return a 0 time
return time.Time{}, nil
}

dialect := e.olap.Dialect()
sql := fmt.Sprintf("SELECT %s FROM %s", fmt.Sprintf("MIN(%s)", dialect.EscapeIdentifier(e.metricsView.TimeDimension)), dialect.EscapeTable(e.metricsView.Database, e.metricsView.DatabaseSchema, e.metricsView.Table))

res, err := e.olap.Execute(ctx, &drivers.Statement{
Query: sql,
Priority: e.priority,
ExecutionTimeout: defaultInteractiveTimeout,
})
if err != nil {
return time.Time{}, err
}
defer res.Close()

var t time.Time
if res.Next() {
if err := res.Scan(&t); err != nil {
return time.Time{}, fmt.Errorf("failed to scan time anchor: %w", err)
}
}

return t, nil
}
9 changes: 9 additions & 0 deletions runtime/pkg/metricssql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ type query struct {
q *metricsview.Query

controller *runtime.Controller
claims *runtime.SecurityClaims
instanceID string
priority int

// fields available after parsing FROM clause
metricsViewSpec *runtimev1.MetricsViewSpec
security *runtime.ResolvedSecurity
dims map[string]any
measures map[string]any
}
Expand Down Expand Up @@ -82,6 +84,7 @@ func (c *Compiler) Rewrite(ctx context.Context, sql string) (*metricsview.Query,
q := &query{
q: &metricsview.Query{},
controller: c.controller,
claims: c.claims,
instanceID: c.instanceID,
priority: c.priority,
}
Expand Down Expand Up @@ -159,7 +162,13 @@ func (q *query) parseFrom(ctx context.Context, node *ast.TableRefsClause) error
if spec == nil {
return fmt.Errorf("metrics view %q is not valid: (status: %q, error: %q)", mv.Meta.GetName(), mv.Meta.ReconcileStatus, mv.Meta.ReconcileError)
}
security, err := q.controller.Runtime.ResolveSecurity(q.instanceID, q.claims, mv)
if err != nil {
// if left is not a table source, then it must be a join
AdityaHegde marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("metrics sql: failed to resolve security")
}
q.metricsViewSpec = spec
q.security = security
AdityaHegde marked this conversation as resolved.
Show resolved Hide resolved
q.measures = make(map[string]any, len(spec.Measures))
for _, measure := range spec.Measures {
q.measures[measure.Name] = nil
Expand Down
103 changes: 35 additions & 68 deletions runtime/pkg/metricssql/time_range_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package metricssqlparser
import (
"context"
"fmt"
"strconv"
"strings"
"time"

Expand All @@ -14,7 +13,7 @@ import (
)

func (q *query) parseTimeRangeStart(ctx context.Context, node *ast.FuncCallExpr) (*metricsview.Expression, error) {
rt, unit, colName, err := parseTimeRangeArgs(node.Args)
rt, colName, err := parseTimeRangeArgs(node.Args)
if err != nil {
return nil, err
}
Expand All @@ -29,17 +28,15 @@ func (q *query) parseTimeRangeStart(ctx context.Context, node *ast.FuncCallExpr)
return nil, err
}

for i := 1; i <= unit; i++ {
watermark, _, err = rt.Resolve(rilltime.ResolverContext{
Now: time.Now(),
MinTime: minTime,
MaxTime: watermark,
FirstDay: int(q.metricsViewSpec.FirstDayOfWeek),
FirstMonth: int(q.metricsViewSpec.FirstMonthOfYear),
})
if err != nil {
return nil, err
}
watermark, _, err = rt.Resolve(rilltime.ResolverContext{
Now: time.Now(),
MinTime: minTime,
MaxTime: watermark,
FirstDay: int(q.metricsViewSpec.FirstDayOfWeek),
FirstMonth: int(q.metricsViewSpec.FirstMonthOfYear),
})
if err != nil {
return nil, err
}

return &metricsview.Expression{
Expand All @@ -48,7 +45,7 @@ func (q *query) parseTimeRangeStart(ctx context.Context, node *ast.FuncCallExpr)
}

func (q *query) parseTimeRangeEnd(ctx context.Context, node *ast.FuncCallExpr) (*metricsview.Expression, error) {
rt, unit, colName, err := parseTimeRangeArgs(node.Args)
rt, colName, err := parseTimeRangeArgs(node.Args)
if err != nil {
return nil, err
}
Expand All @@ -63,17 +60,15 @@ func (q *query) parseTimeRangeEnd(ctx context.Context, node *ast.FuncCallExpr) (
return nil, err
}

for i := 1; i <= unit; i++ {
_, watermark, err = rt.Resolve(rilltime.ResolverContext{
Now: time.Now(),
MinTime: minTime,
MaxTime: watermark,
FirstDay: int(q.metricsViewSpec.FirstDayOfWeek),
FirstMonth: int(q.metricsViewSpec.FirstMonthOfYear),
})
if err != nil {
return nil, err
}
_, watermark, err = rt.Resolve(rilltime.ResolverContext{
Now: time.Now(),
MinTime: minTime,
MaxTime: watermark,
FirstDay: int(q.metricsViewSpec.FirstDayOfWeek),
FirstMonth: int(q.metricsViewSpec.FirstMonthOfYear),
})
if err != nil {
return nil, err
}

return &metricsview.Expression{
Expand Down Expand Up @@ -116,6 +111,7 @@ func (q *query) getWatermark(ctx context.Context, colName string) (watermark tim
return watermark, nil
}

// getMinTime creates a executor and calls GetMinTime
func (q *query) getMinTime(ctx context.Context, colName string) (time.Time, error) {
k-anshul marked this conversation as resolved.
Show resolved Hide resolved
if colName == "" {
colName = q.metricsViewSpec.TimeDimension
Expand All @@ -125,71 +121,42 @@ func (q *query) getMinTime(ctx context.Context, colName string) (time.Time, erro
return time.Time{}, nil
}

olap, release, err := q.controller.AcquireOLAP(ctx, q.metricsViewSpec.Connector)
if err != nil {
return time.Time{}, err
}
defer release()

sql := fmt.Sprintf("SELECT MIN(%s) FROM %s", olap.Dialect().EscapeIdentifier(colName), olap.Dialect().EscapeTable(q.metricsViewSpec.Database, q.metricsViewSpec.DatabaseSchema, q.metricsViewSpec.Table))
result, err := olap.Execute(ctx, &drivers.Statement{Query: sql, Priority: q.priority})
ex, err := metricsview.NewExecutor(ctx, q.controller.Runtime, q.instanceID, q.metricsViewSpec, false, q.security, q.priority)
AdityaHegde marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return time.Time{}, err
}
defer result.Close()

var t time.Time
for result.Next() {
if err := result.Scan(&t); err != nil {
return time.Time{}, fmt.Errorf("error scanning min time: %w", err)
}
}
return t, nil
return ex.GetMinTime(ctx, colName)
}

func parseTimeRangeArgs(args []ast.ExprNode) (*rilltime.RillTime, int, string, error) {
func parseTimeRangeArgs(args []ast.ExprNode) (*rilltime.RillTime, string, error) {
if len(args) == 0 {
return nil, 0, "", fmt.Errorf("metrics sql: mandatory arg duration missing for time_range_end() function")
return nil, "", fmt.Errorf("metrics sql: mandatory arg duration missing for time_range_end() function")
}
if len(args) > 3 {
return nil, 0, "", fmt.Errorf("metrics sql: time_range_end() function expects at most 3 arguments")
if len(args) > 2 {
return nil, "", fmt.Errorf("metrics sql: time_range_end() function expects at most 2 arguments")
}
// identify optional args
var (
col string
unit int
err error
col string
err error
)
// identify unit
if len(args) == 1 {
unit = 1
} else {
val, err := parseValueExpr(args[1])
if err != nil {
return nil, 0, "", err
}
unit, err = strconv.Atoi(val)
if err != nil {
return nil, 0, "", err
}
}

// identify column name
if len(args) == 3 {
col, err = parseColumnNameExpr(args[2])
if len(args) == 2 {
col, err = parseColumnNameExpr(args[1])
if err != nil {
return nil, 0, "", err
return nil, "", err
}
}

du, err := parseValueExpr(args[0])
if err != nil {
return nil, 0, "", err
return nil, "", err
}

rt, err := rilltime.Parse(strings.TrimSuffix(strings.TrimPrefix(du, "'"), "'"))
if err != nil {
return nil, 0, "", fmt.Errorf("metrics sql: invalid ISO8601 duration %s", du)
return nil, "", fmt.Errorf("metrics sql: invalid ISO8601 duration %s", du)
}
return rt, unit, col, nil
return rt, col, nil
}
Loading