Skip to content

Commit

Permalink
fix: do not alias relation before join (#1693)
Browse files Browse the repository at this point in the history
* fix: do not alias relation before join

Signed-off-by: Ruihang Xia <[email protected]>

* Update src/promql/src/error.rs

Co-authored-by: dennis zhuang <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
Co-authored-by: Lei, HUANG <[email protected]>
Co-authored-by: dennis zhuang <[email protected]>
  • Loading branch information
3 people authored Jun 1, 2023
1 parent ce44060 commit 878c6bf
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 34 deletions.
54 changes: 34 additions & 20 deletions src/promql/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,46 +22,60 @@ use snafu::Location;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Unsupported expr type: {}", name))]
#[snafu(display("Unsupported expr type: {}, location: {}", name, location))]
UnsupportedExpr { name: String, location: Location },

#[snafu(display("Unexpected token: {:?}", token))]
#[snafu(display("Unexpected token: {:?}, location: {}", token, location))]
UnexpectedToken {
token: TokenType,
location: Location,
},

#[snafu(display("Internal error during build DataFusion plan, error: {}", source))]
#[snafu(display(
"Internal error during building DataFusion plan, error: {}, location: {}",
source,
location
))]
DataFusionPlanning {
source: datafusion::error::DataFusionError,
location: Location,
},

#[snafu(display("Unexpected plan or expression: {}", desc))]
#[snafu(display("Unexpected plan or expression: {}, location: {}", desc, location))]
UnexpectedPlanExpr { desc: String, location: Location },

#[snafu(display("Unknown table type, downcast failed"))]
#[snafu(display("Unknown table type, downcast failed, location: {}", location))]
UnknownTable { location: Location },

#[snafu(display("Cannot find time index column in table {}", table))]
#[snafu(display(
"Cannot find time index column in table {}, location: {}",
table,
location
))]
TimeIndexNotFound { table: String, location: Location },

#[snafu(display("Cannot find value columns in table {}", table))]
#[snafu(display("Cannot find value columns in table {}, location: {}", table, location))]
ValueNotFound { table: String, location: Location },

#[snafu(display(
"Cannot accept multiple vector as function input, PromQL expr: {:?}",
expr
"Cannot accept multiple vector as function input, PromQL expr: {:?}, location: {}",
expr,
location
))]
MultipleVector { expr: PromExpr, location: Location },

#[snafu(display("Expect a PromQL expr but not found, input expr: {:?}", expr))]
#[snafu(display(
"Expect a PromQL expr but not found, input expr: {:?}, location: {}",
expr,
location
))]
ExpectExpr { expr: PromExpr, location: Location },
#[snafu(display(
"Illegal range: offset {}, length {}, array len {}",
"Illegal range: offset {}, length {}, array len {}, location: {}",
offset,
length,
len
len,
location
))]
IllegalRange {
offset: u32,
Expand All @@ -70,27 +84,27 @@ pub enum Error {
location: Location,
},

#[snafu(display("Empty range is not expected"))]
#[snafu(display("Empty range is not expected, location: {}", location))]
EmptyRange { location: Location },

#[snafu(display(
"Table (metric) name not found, this indicates a procedure error in PromQL planner"
"Table (metric) name not found, this indicates a procedure error in PromQL planner, location: {}", location
))]
TableNameNotFound { location: Location },

#[snafu(display("General catalog error: {source}"))]
#[snafu(display("General catalog error: {source}, location: {}", location))]
Catalog {
#[snafu(backtrace)]
location: Location,
source: catalog::error::Error,
},

#[snafu(display("Expect a range selector, but not found"))]
#[snafu(display("Expect a range selector, but not found, location: {}", location))]
ExpectRangeSelector { location: Location },

#[snafu(display("Zero range in range selector"))]
#[snafu(display("Zero range in range selector, location: {}", location))]
ZeroRangeSelector { location: Location },

#[snafu(display("Cannot find column {col}"))]
#[snafu(display("Cannot find column {col}, location: {}", location))]
ColumnNotFound { col: String, location: Location },
}

Expand All @@ -116,7 +130,7 @@ impl ErrorExt for Error {

TableNameNotFound { .. } => StatusCode::TableNotFound,

Catalog { source } => source.status_code(),
Catalog { source, .. } => source.status_code(),
}
}

Expand Down
21 changes: 7 additions & 14 deletions src/promql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ use crate::functions::{
QuantileOverTime, Rate, Resets, StddevOverTime, StdvarOverTime, SumOverTime,
};

const LEFT_PLAN_JOIN_ALIAS: &str = "lhs";

/// `time()` function in PromQL.
const SPECIAL_TIME_FUNCTION: &str = "time";

Expand Down Expand Up @@ -1151,7 +1149,6 @@ impl PromPlanner {
}

/// Build a inner join on time index column and tag columns to concat two logical plans.
/// The left plan will be alised as [`LEFT_PLAN_JOIN_ALIAS`].
fn join_on_non_field_columns(
&self,
left: LogicalPlan,
Expand All @@ -1171,12 +1168,9 @@ impl PromPlanner {

// Inner Join on time index column to concat two operator
LogicalPlanBuilder::from(left)
.alias(LEFT_PLAN_JOIN_ALIAS)
.context(DataFusionPlanningSnafu)?
.join(
right,
JoinType::Inner,
// (vec![time_index_column.clone()], vec![time_index_column]),
(tag_columns.clone(), tag_columns),
None,
)
Expand Down Expand Up @@ -1742,14 +1736,13 @@ mod test {

let expected = String::from(
"Projection: some_metric.tag_0, some_metric.timestamp, some_metric.field_0 + some_metric.field_0 AS some_metric.field_0 + some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), some_metric.field_0 + some_metric.field_0:Float64;N]\
\n Inner Join: lhs.tag_0 = some_metric.tag_0, lhs.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.tag_0 = Utf8(\"foo\") [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"foo\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100001000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Inner Join: some_metric.tag_0 = some_metric.tag_0, some_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: some_metric.tag_0 = Utf8(\"foo\") [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"foo\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100001000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
Expand Down

0 comments on commit 878c6bf

Please sign in to comment.