Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb committed Jan 27, 2025
1 parent dbeded9 commit 9375eab
Show file tree
Hide file tree
Showing 19 changed files with 4,232 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -881,3 +881,13 @@ metric:
offset_window: 1 martian_day
alias: bookings_offset
- name: bookings
---
metric:
name: bookings_offset_one_martian_day_then_2_martian_days
description: tests a metric with nested custom offset windows
type: derived
type_params:
expr: bookings_offset_one_martian_day
metrics:
- name: bookings_offset_one_martian_day
offset_window: 2 martian_day
29 changes: 22 additions & 7 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -1568,7 +1568,10 @@ def _build_time_spine_join_node_for_nested_offset(
time_range_constraint: Optional[TimeRangeConstraint],
metric_source_node: DataflowPlanNode,
) -> DataflowPlanNode:
# TODO: nested custom offset window plans
# use_offset_custom_granularity_node = self._should_use_offset_custom_granularity_node(
# join_description=before_aggregation_time_spine_join_description,
# queried_agg_time_dimension_specs=queried_agg_time_dimension_specs,
# )
join_spec = self._sort_by_base_granularity(queried_agg_time_dimension_specs)[0]
time_spine_node = self._build_time_spine_node(
queried_time_spine_specs=queried_agg_time_dimension_specs,
Expand All @@ -1586,6 +1589,7 @@ def _build_time_spine_join_node_for_nested_offset(
)

# TODO: fix bug here where filter specs are being included in when aggregating.
# After that, can this code be combined with basic offset code?
if len(metric_spec.filter_spec_set.all_filter_specs) > 0 or time_range_constraint:
# FilterElementsNode will only be needed if there are where filter specs that were selected in the group by.
specs_in_filters = set(
Expand Down Expand Up @@ -1646,6 +1650,18 @@ def _build_time_spine_join_node_for_offset(
join_type=join_description.join_type,
)

def _should_use_offset_custom_granularity_node(
self,
join_description: Optional[JoinToTimeSpineDescription],
queried_agg_time_dimension_specs: Sequence[TimeDimensionSpec],
) -> bool:
return bool(
join_description
and join_description.custom_offset_window
and {spec.time_granularity_name for spec in queried_agg_time_dimension_specs}
== {join_description.custom_offset_window.granularity}
)

def _build_aggregated_measure_from_measure_source_node(
self,
metric_input_measure_spec: MetricInputMeasureSpec,
Expand Down Expand Up @@ -1770,11 +1786,9 @@ def _build_aggregated_measure_from_measure_source_node(
)

# If querying an offset metric, join to time spine before aggregation.
use_offset_custom_granularity_node = bool(
before_aggregation_time_spine_join_description
and before_aggregation_time_spine_join_description.custom_offset_window
and {spec.time_granularity_name for spec in queried_agg_time_dimension_specs}
== {before_aggregation_time_spine_join_description.custom_offset_window.granularity}
use_offset_custom_granularity_node = self._should_use_offset_custom_granularity_node(
join_description=before_aggregation_time_spine_join_description,
queried_agg_time_dimension_specs=queried_agg_time_dimension_specs,
)
if before_aggregation_time_spine_join_description and queried_agg_time_dimension_specs:
unaggregated_measure_node = self._build_time_spine_join_node_for_offset(
Expand Down Expand Up @@ -2013,7 +2027,7 @@ def build_custom_offset_time_spine_node(
required_time_spine_specs: Tuple[TimeDimensionSpec, ...],
use_offset_custom_granularity_node: bool,
) -> DataflowPlanNode:
"""Builds an OffsetByCustomGranularityNode used for custom offset windows."""
"""Builds a time spine node used for custom offset windows."""
time_spine_read_node = self._get_time_spine_read_node_for_custom_grain(offset_window.granularity)
if use_offset_custom_granularity_node:
return OffsetCustomGranularityNode.create(
Expand Down Expand Up @@ -2042,6 +2056,7 @@ def _sort_by_base_granularity(self, time_dimension_specs: Sequence[TimeDimension
),
)

# TODO: fix this TODO in other PR
# TODO: label this as more specific; not sure yet what it's specific to.
def _determine_time_spine_join_spec(
self, measure_properties: MeasureSpecProperties, required_time_spine_specs: Tuple[TimeDimensionSpec, ...]
Expand Down
9 changes: 6 additions & 3 deletions metricflow/dataflow/nodes/join_to_time_spine.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def create( # noqa: D102
offset_to_grain: Optional[TimeGranularity] = None,
) -> JoinToTimeSpineNode:
return JoinToTimeSpineNode(
# Note: parent nodes must be in the order of metric_source_node, time_spine_node
parent_nodes=(metric_source_node, time_spine_node),
metric_source_node=metric_source_node,
time_spine_node=time_spine_node,
Expand Down Expand Up @@ -107,10 +108,12 @@ def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa:
)

def with_new_parents(self, new_parent_nodes: Sequence[DataflowPlanNode]) -> JoinToTimeSpineNode: # noqa: D102
assert len(new_parent_nodes) == 1
assert len(new_parent_nodes) == 2, "JoinToTimeSpineNode must have exactly 2 parent nodes."
# Note: parent nodes remain in the order of metric_source_node, time_spine_node

return JoinToTimeSpineNode.create(
metric_source_node=self.metric_source_node,
time_spine_node=self.time_spine_node,
metric_source_node=new_parent_nodes[0],
time_spine_node=new_parent_nodes[1],
requested_agg_time_dimension_specs=self.requested_agg_time_dimension_specs,
standard_offset_window=self.standard_offset_window,
offset_to_grain=self.offset_to_grain,
Expand Down
18 changes: 14 additions & 4 deletions tests_metricflow/integration/query_output/test_offset_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,15 @@ def test_custom_offset_window_with_base_grain(
sql_client: SqlClient,
it_helpers: IntegrationTestHelpers,
) -> None:
"""Gives a side by side comparison of bookings and bookings_offset_one_martian_day."""
"""Gives a side by side comparison of the normal bookings metric with related custom offset metrics, using base grain."""
query_result = it_helpers.mf_engine.query(
MetricFlowQueryRequest.create_with_random_request_id(
metric_names=["bookings", "bookings_offset_one_martian_day"],
metric_names=[
"bookings",
"bookings_offset_one_martian_day",
"bookings_martian_day_over_martian_day",
"bookings_offset_one_martian_day_then_2_martian_days",
],
group_by_names=["metric_time__day", "metric_time__martian_day"],
order_by_names=["metric_time__day", "metric_time__martian_day"],
)
Expand Down Expand Up @@ -129,10 +134,15 @@ def test_custom_offset_window_with_matching_custom_grain(
sql_client: SqlClient,
it_helpers: IntegrationTestHelpers,
) -> None:
"""Gives a side by side comparison of bookings and bookings_offset_one_martian_day."""
"""Gives a side by side comparison of the normal bookings metric with related custom offset metrics, using matching grain."""
query_result = it_helpers.mf_engine.query(
MetricFlowQueryRequest.create_with_random_request_id(
metric_names=["bookings", "bookings_offset_one_martian_day"],
metric_names=[
"bookings",
"bookings_offset_one_martian_day",
"bookings_martian_day_over_martian_day",
# "bookings_offset_one_martian_day_then_2_martian_days",
],
group_by_names=["booking__ds__martian_day", "metric_time__martian_day"],
order_by_names=["booking__ds__martian_day", "metric_time__martian_day"],
)
Expand Down
151 changes: 148 additions & 3 deletions tests_metricflow/query_rendering/test_custom_granularity.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,33 @@ def test_custom_offset_window_with_granularity_and_date_part( # noqa: D103
)


@pytest.mark.sql_engine_snapshot
def test_custom_offset_window_with_where_filter_not_in_group_by( # noqa: D103
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlPlanConverter,
sql_client: SqlClient,
query_parser: MetricFlowQueryParser,
) -> None:
query_spec = query_parser.parse_and_validate_query(
metric_names=("bookings_offset_one_martian_day",),
group_by_names=("metric_time__day",),
where_constraints=[
PydanticWhereFilter(where_sql_template=("{{ TimeDimension('metric_time', 'martian_day') }} = '2020-01-01'"))
],
).query_spec

render_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
dataflow_plan_builder=dataflow_plan_builder,
query_spec=query_spec,
)


@pytest.mark.sql_engine_snapshot
def test_custom_offset_window_with_only_window_grain( # noqa: D103
request: FixtureRequest,
Expand All @@ -690,6 +717,124 @@ def test_custom_offset_window_with_only_window_grain( # noqa: D103
)


# TODO: add more tests
# - with where filter not included in group by
# - nested custom offset
@pytest.mark.sql_engine_snapshot
def test_custom_offset_window_with_matching_grain_where_filter_not_in_group_by( # noqa: D103
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlPlanConverter,
sql_client: SqlClient,
query_parser: MetricFlowQueryParser,
) -> None:
query_spec = query_parser.parse_and_validate_query(
metric_names=("bookings_offset_one_martian_day",),
group_by_names=("booking__ds__martian_day",),
where_constraints=[
PydanticWhereFilter(where_sql_template=("{{ TimeDimension('metric_time', 'martian_day') }} = '2020-01-01'"))
],
).query_spec

render_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
dataflow_plan_builder=dataflow_plan_builder,
query_spec=query_spec,
)


@pytest.mark.sql_engine_snapshot
def test_custom_offset_window_time_over_time( # noqa: D103
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlPlanConverter,
sql_client: SqlClient,
query_parser: MetricFlowQueryParser,
) -> None:
query_spec = query_parser.parse_and_validate_query(
metric_names=("bookings_martian_day_over_martian_day",),
group_by_names=("metric_time__week",),
).query_spec

render_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
dataflow_plan_builder=dataflow_plan_builder,
query_spec=query_spec,
)


@pytest.mark.sql_engine_snapshot
def test_custom_offset_window_time_over_time_with_matching_grain( # noqa: D103
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlPlanConverter,
sql_client: SqlClient,
query_parser: MetricFlowQueryParser,
) -> None:
query_spec = query_parser.parse_and_validate_query(
metric_names=("bookings_martian_day_over_martian_day",),
group_by_names=("metric_time__martian_day",),
).query_spec

render_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
dataflow_plan_builder=dataflow_plan_builder,
query_spec=query_spec,
)


@pytest.mark.sql_engine_snapshot
def test_nested_custom_offset_window( # noqa: D103
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlPlanConverter,
sql_client: SqlClient,
query_parser: MetricFlowQueryParser,
) -> None:
query_spec = query_parser.parse_and_validate_query(
metric_names=("bookings_offset_one_martian_day_then_2_martian_days",),
group_by_names=("metric_time__day",),
).query_spec

render_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
dataflow_plan_builder=dataflow_plan_builder,
query_spec=query_spec,
)


@pytest.mark.sql_engine_snapshot
def test_nested_custom_offset_window_matching_grain( # noqa: D103
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlPlanConverter,
sql_client: SqlClient,
query_parser: MetricFlowQueryParser,
) -> None:
query_spec = query_parser.parse_and_validate_query(
metric_names=("bookings_offset_one_martian_day_then_2_martian_days",),
group_by_names=("metric_time__martian_day",),
).query_spec

render_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
dataflow_plan_builder=dataflow_plan_builder,
query_spec=query_spec,
)
Loading

0 comments on commit 9375eab

Please sign in to comment.