From fa99a25f6f0d0c22a99dff7d3e88306ed2afdfb8 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 6 Nov 2024 14:57:30 -0500 Subject: [PATCH 01/12] update how we determine backfill completion to account for retried runs --- .../dagster/_core/execution/asset_backfill.py | 76 ++++++++++++++++--- .../execution_tests/test_asset_backfill.py | 6 +- 2 files changed, 69 insertions(+), 13 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index 7fe3026e41ae8..6f5abf66d4ee4 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -54,6 +54,7 @@ from dagster._core.storage.dagster_run import ( CANCELABLE_RUN_STATUSES, IN_PROGRESS_RUN_STATUSES, + NOT_FINISHED_STATUSES, DagsterRunStatus, RunsFilter, ) @@ -68,6 +69,9 @@ from dagster._serdes import whitelist_for_serdes from dagster._time import datetime_from_timestamp, get_current_timestamp from dagster._utils.caching_instance_queryer import CachingInstanceQueryer +from python_modules.dagster.dagster._daemon.auto_run_reexecution.auto_run_reexecution import ( + filter_runs_to_should_retry, +) if TYPE_CHECKING: from dagster._core.execution.backfill import PartitionBackfill @@ -166,11 +170,12 @@ def with_latest_storage_id(self, latest_storage_id: Optional[int]) -> "AssetBack def with_requested_runs_for_target_roots(self, requested_runs_for_target_roots: bool): return self._replace(requested_runs_for_target_roots=requested_runs_for_target_roots) - def is_complete(self) -> bool: + def all_targeted_partitions_have_materialization_status(self) -> bool: """The asset backfill is complete when all runs to be requested have finished (success, failure, or cancellation). Since the AssetBackfillData object stores materialization states - per asset partition, the daemon continues to update the backfill data until all runs have - finished in order to display the final partition statuses in the UI. + per asset partition, we can use the materialization states and whether any runs for the backfill are + not finished to determine if the backfill is complete. We want the daemon to continue to update + the backfill data until all runs have finished in order to display the final partition statuses in the UI. """ return ( ( @@ -927,6 +932,57 @@ def _check_validity_and_deserialize_asset_backfill_data( return asset_backfill_data +def backfill_is_complete( + backfill_id: str, backfill_data: AssetBackfillData, instance: DagsterInstance +): + """A backfill is complete when: + 1. all asset parttiions in the target subset have a materialization state (successful, failed, downstream of a failed partition). + 2. there are no in progress runs for the backfill. + 3. there are no failed runs that will result in an automatic retry. + + Condition 1 ensures that for each asset partition we have attempted to materialize it or have determined we + cannot materialize it because of a failed dependency. Condition 2 ensures that no retries of failed runs are + in progress. Condition 3 guards against a race condition where a failed run could be automatically retried + but it was not added into the queue in time to be caught by condition 2. + + Since the AssetBackfillData object stores materialization states per asset partition, we want to ensure the + daemon continues to update the backfill data until all runs have finished in order to display the + final partition statuses in the UI. + """ + # conditions are in order of least expensive to most expensive to check + if ( + # Condition 1 - all asset partitions in the target subset have a materialization state + backfill_data.all_targeted_partitions_have_materialization_status() + # Condtition 2 - there are no in progress runs for the backfill + and len( + instance.get_runs_ids( + filters=RunsFilter( + statuses=NOT_FINISHED_STATUSES, + tags={"tags": {BACKFILL_ID_TAG: backfill_id}}, + ), + limit=1, + ) + ) + == 0 + # Condition 3 - there are no failed runs that will be retried + and len( + filter_runs_to_should_retry( + instance.get_runs( + filters=RunsFilter( + tags={BACKFILL_ID_TAG: backfill_id}, + statuses=[DagsterRunStatus.FAILURE], + ) + ), + instance, + instance.run_retries_max_retries, + ) + ) + == 0 + ): + return True + return False + + def execute_asset_backfill_iteration( backfill: "PartitionBackfill", logger: logging.Logger, @@ -1045,11 +1101,9 @@ def execute_asset_backfill_iteration( updated_backfill_data = updated_backfill.get_asset_backfill_data(asset_graph) - if updated_backfill_data.is_complete(): - # The asset backfill is complete when all runs to be requested have finished (success, - # failure, or cancellation). Since the AssetBackfillData object stores materialization states - # per asset partition, the daemon continues to update the backfill data until all runs have - # finished in order to display the final partition statuses in the UI. + if backfill_is_complete( + backfill_id=backfill.backfill_id, backfill_data=updated_backfill_data, instance=instance + ): if ( updated_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets > 0 @@ -1216,7 +1270,7 @@ def get_canceling_asset_backfill_iteration_data( " AssetGraphSubset object" ) - failed_subset = AssetGraphSubset.from_asset_partition_set( + failed_subset, _ = AssetGraphSubset.from_asset_partition_set( set(_get_failed_asset_partitions(instance_queryer, backfill_id, asset_graph)), asset_graph, ) @@ -1289,7 +1343,7 @@ def _get_failed_and_downstream_asset_partitions( asset_graph: RemoteWorkspaceAssetGraph, instance_queryer: CachingInstanceQueryer, backfill_start_timestamp: float, -) -> AssetGraphSubset: +) -> Tuple[AssetGraphSubset, bool]: failed_and_downstream_subset = AssetGraphSubset.from_asset_partition_set( asset_graph.bfs_filter_asset_partitions( instance_queryer, @@ -1714,7 +1768,7 @@ def _get_failed_asset_partitions( asset_graph: RemoteAssetGraph, ) -> Sequence[AssetKeyPartitionKey]: """Returns asset partitions that materializations were requested for as part of the backfill, but - will not be materialized. + were not successfully materialized. Includes canceled asset partitions. Implementation assumes that successful runs won't have any failed partitions. diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py index 7ad4fb7bf76d1..1fbc04c5a1d69 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py @@ -55,6 +55,7 @@ AssetBackfillData, AssetBackfillIterationResult, AssetBackfillStatus, + backfill_is_complete, execute_asset_backfill_iteration_inner, get_canceling_asset_backfill_iteration_data, ) @@ -618,7 +619,9 @@ def run_backfill_to_completion( evaluation_time=backfill_data.backfill_start_datetime, ) - while not backfill_data.is_complete(): + while not backfill_is_complete( + backfill_id=backfill_id, backfill_data=backfill_data, instance=instance + ): iteration_count += 1 result1 = execute_asset_backfill_iteration_consume_generator( @@ -628,7 +631,6 @@ def run_backfill_to_completion( instance=instance, ) - # iteration_count += 1 assert result1.backfill_data != backfill_data instance_queryer = _get_instance_queryer( From 382de7ec9243fcfa2192dfbf3507b255a5429b5a Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 6 Nov 2024 15:00:18 -0500 Subject: [PATCH 02/12] formatting fix --- .../dagster/dagster/_core/execution/asset_backfill.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index 6f5abf66d4ee4..b1e480a6ef0ad 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -958,7 +958,7 @@ def backfill_is_complete( instance.get_runs_ids( filters=RunsFilter( statuses=NOT_FINISHED_STATUSES, - tags={"tags": {BACKFILL_ID_TAG: backfill_id}}, + tags={BACKFILL_ID_TAG: backfill_id}, ), limit=1, ) From d1f0cdb8bec8e72f8fe6bc8d4b6a698475bb2c2e Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 7 Nov 2024 12:09:56 -0500 Subject: [PATCH 03/12] test --- .../dagster/_core/execution/asset_backfill.py | 37 +++++----- .../daemon_tests/test_backfill.py | 72 +++++++++++++++++++ 2 files changed, 92 insertions(+), 17 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index b1e480a6ef0ad..3974327094920 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -69,9 +69,6 @@ from dagster._serdes import whitelist_for_serdes from dagster._time import datetime_from_timestamp, get_current_timestamp from dagster._utils.caching_instance_queryer import CachingInstanceQueryer -from python_modules.dagster.dagster._daemon.auto_run_reexecution.auto_run_reexecution import ( - filter_runs_to_should_retry, -) if TYPE_CHECKING: from dagster._core.execution.backfill import PartitionBackfill @@ -936,7 +933,7 @@ def backfill_is_complete( backfill_id: str, backfill_data: AssetBackfillData, instance: DagsterInstance ): """A backfill is complete when: - 1. all asset parttiions in the target subset have a materialization state (successful, failed, downstream of a failed partition). + 1. all asset partitions in the target subset have a materialization state (successful, failed, downstream of a failed partition). 2. there are no in progress runs for the backfill. 3. there are no failed runs that will result in an automatic retry. @@ -949,13 +946,17 @@ def backfill_is_complete( daemon continues to update the backfill data until all runs have finished in order to display the final partition statuses in the UI. """ + from dagster._daemon.auto_run_reexecution.auto_run_reexecution import ( + filter_runs_to_should_retry, + ) + # conditions are in order of least expensive to most expensive to check if ( # Condition 1 - all asset partitions in the target subset have a materialization state backfill_data.all_targeted_partitions_have_materialization_status() # Condtition 2 - there are no in progress runs for the backfill and len( - instance.get_runs_ids( + instance.get_run_ids( filters=RunsFilter( statuses=NOT_FINISHED_STATUSES, tags={BACKFILL_ID_TAG: backfill_id}, @@ -966,15 +967,17 @@ def backfill_is_complete( == 0 # Condition 3 - there are no failed runs that will be retried and len( - filter_runs_to_should_retry( - instance.get_runs( - filters=RunsFilter( - tags={BACKFILL_ID_TAG: backfill_id}, - statuses=[DagsterRunStatus.FAILURE], - ) - ), - instance, - instance.run_retries_max_retries, + list( + filter_runs_to_should_retry( + instance.get_runs( + filters=RunsFilter( + tags={BACKFILL_ID_TAG: backfill_id}, + statuses=[DagsterRunStatus.FAILURE], + ) + ), + instance, + instance.run_retries_max_retries, + ) ) ) == 0 @@ -1270,7 +1273,7 @@ def get_canceling_asset_backfill_iteration_data( " AssetGraphSubset object" ) - failed_subset, _ = AssetGraphSubset.from_asset_partition_set( + failed_subset = AssetGraphSubset.from_asset_partition_set( set(_get_failed_asset_partitions(instance_queryer, backfill_id, asset_graph)), asset_graph, ) @@ -1343,7 +1346,7 @@ def _get_failed_and_downstream_asset_partitions( asset_graph: RemoteWorkspaceAssetGraph, instance_queryer: CachingInstanceQueryer, backfill_start_timestamp: float, -) -> Tuple[AssetGraphSubset, bool]: +) -> AssetGraphSubset: failed_and_downstream_subset = AssetGraphSubset.from_asset_partition_set( asset_graph.bfs_filter_asset_partitions( instance_queryer, @@ -1768,7 +1771,7 @@ def _get_failed_asset_partitions( asset_graph: RemoteAssetGraph, ) -> Sequence[AssetKeyPartitionKey]: """Returns asset partitions that materializations were requested for as part of the backfill, but - were not successfully materialized. + will not be materialized. Includes canceled asset partitions. Implementation assumes that successful runs won't have any failed partitions. diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index 7610743f51cf2..dbbe1a16d8ae4 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -72,6 +72,7 @@ from dagster._core.test_utils import ( create_run_for_test, environ, + mark_run_successful, step_did_not_run, step_failed, step_succeeded, @@ -2789,3 +2790,74 @@ def test_asset_backfill_from_asset_graph_subset_with_static_and_time_partitions( ) assert backfill assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS + + +def test_asset_backfill_not_complete_until_retries_complete( + instance: DagsterInstance, + workspace_context: WorkspaceProcessContext, + remote_repo: RemoteRepository, +): + del remote_repo + backfill_id = "run_retries_backfill" + partition_keys = static_partitions.get_partition_keys() + asset_selection = [AssetKey("foo"), AssetKey("a1"), AssetKey("bar")] + instance.add_backfill( + PartitionBackfill.from_asset_partitions( + asset_graph=workspace_context.create_request_context().asset_graph, + backfill_id=backfill_id, + tags={"custom_tag_key": "custom_tag_value"}, + backfill_timestamp=get_current_timestamp(), + asset_selection=asset_selection, + partition_names=partition_keys, + dynamic_partitions_store=instance, + all_partitions=False, + title=None, + description=None, + ) + ) + assert instance.get_runs_count() == 0 + backfill = instance.get_backfill("run_retries_backfill") + assert backfill + assert backfill.status == BulkActionStatus.REQUESTED + + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + assert instance.get_runs_count() == 3 + wait_for_all_runs_to_start(instance, timeout=30) + assert instance.get_runs_count() == 3 + wait_for_all_runs_to_finish(instance, timeout=30) + + assert instance.get_runs_count() == 3 + runs = reversed(list(instance.get_runs())) + for run in runs: + assert run.tags[BACKFILL_ID_TAG] == backfill_id + assert run.tags["custom_tag_key"] == "custom_tag_value" + assert step_succeeded(instance, run, "foo") + assert step_succeeded(instance, run, "reusable") + assert step_succeeded(instance, run, "bar") + + # simulate a retry of a run + run_to_retry = instance.get_runs()[0] + retried_run = create_run_for_test( + instance=instance, + job_name=run_to_retry.job_name, + tags=run_to_retry.tags, + root_run_id=run_to_retry.run_id, + parent_run_id=run_to_retry.run_id, + ) + + # since there is a run in progress, the backfill should not be marked as complete, even though + # all targeted asset partitions have a completed state + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.REQUESTED + + mark_run_successful(instance, retried_run) + + retried_run = instance.get_runs(filters=RunsFilter(run_ids=[retried_run.run_id]))[0] + assert retried_run.status == DagsterRunStatus.SUCCESS + + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS From 2709f89d80155ab1452d4242e1592213e37a918a Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 7 Nov 2024 12:33:18 -0500 Subject: [PATCH 04/12] test for auto retries --- .../daemon_tests/test_backfill.py | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index dbbe1a16d8ae4..f29367a9c5283 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -67,7 +67,10 @@ ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG, BACKFILL_ID_TAG, + MAX_RETRIES_TAG, + PARENT_RUN_ID_TAG, PARTITION_NAME_TAG, + ROOT_RUN_ID_TAG, ) from dagster._core.test_utils import ( create_run_for_test, @@ -229,6 +232,11 @@ def bar(a1): return a1 +@asset(partitions_def=static_partitions) +def always_fails(): + raise Exception("I always fail") + + @asset( config_schema={"myparam": Field(str, description="YYYY-MM-DD")}, ) @@ -426,6 +434,7 @@ def the_repo(): ab1, ab2, define_asset_job("twisted_asset_mess", selection="*b2", partitions_def=static_partitions), + always_fails, # baz is a configurable asset which has no dependencies baz, asset_a, @@ -2861,3 +2870,70 @@ def test_asset_backfill_not_complete_until_retries_complete( backfill = instance.get_backfill(backfill_id) assert backfill assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS + + +def test_asset_backfill_not_complete_if_automatic_retry_could_happen( + instance: DagsterInstance, + workspace_context: WorkspaceProcessContext, + remote_repo: RemoteRepository, +): + del remote_repo + backfill_id = "run_retries_backfill" + partition_keys = static_partitions.get_partition_keys() + asset_selection = [AssetKey("foo"), AssetKey("always_fails")] + instance.add_backfill( + PartitionBackfill.from_asset_partitions( + asset_graph=workspace_context.create_request_context().asset_graph, + backfill_id=backfill_id, + tags={"custom_tag_key": "custom_tag_value", MAX_RETRIES_TAG: "2"}, + backfill_timestamp=get_current_timestamp(), + asset_selection=asset_selection, + partition_names=partition_keys, + dynamic_partitions_store=instance, + all_partitions=False, + title=None, + description=None, + ) + ) + assert instance.get_runs_count() == 0 + backfill = instance.get_backfill("run_retries_backfill") + assert backfill + assert backfill.status == BulkActionStatus.REQUESTED + + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + assert instance.get_runs_count() == 3 + wait_for_all_runs_to_start(instance, timeout=30) + assert instance.get_runs_count() == 3 + wait_for_all_runs_to_finish(instance, timeout=30) + + assert instance.get_runs_count() == 3 + runs = reversed(list(instance.get_runs())) + for run in runs: + assert run.tags[BACKFILL_ID_TAG] == backfill_id + assert run.tags["custom_tag_key"] == "custom_tag_value" + assert step_succeeded(instance, run, "foo") + assert step_failed(instance, run, "always_fails") + + # since the failed runs should have automatic retries launched for them, the backfill should not + # be considered complete, even though the targeted asset partitions have a completed state + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.REQUESTED + + # automatic retries wont get kicked off in test environment, so we manually create them and mark them completed + runs = instance.get_runs() + for run in runs: + retried_run = create_run_for_test( + instance=instance, + job_name=run.job_name, + tags={**run.tags, ROOT_RUN_ID_TAG: run.run_id, PARENT_RUN_ID_TAG: run.run_id}, + root_run_id=run.run_id, + parent_run_id=run.run_id, + ) + mark_run_successful(instance, retried_run) + + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.COMPLETED_FAILED From 14d2db680bb5579eea726ca663c9d7f69ac9da98 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 11 Nov 2024 15:49:19 -0500 Subject: [PATCH 05/12] small fixes --- .../daemon_tests/test_backfill.py | 59 +++++++++++++------ 1 file changed, 41 insertions(+), 18 deletions(-) diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index f29367a9c5283..c63d3b701c517 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -46,6 +46,8 @@ PartitionsSelector, ) from dagster._core.errors import DagsterUserCodeUnreachableError +from dagster._core.events import DagsterEvent, DagsterEventType +from dagster._core.events.log import EventLogEntry from dagster._core.execution.asset_backfill import ( AssetBackfillData, get_asset_backfill_run_chunk_size, @@ -68,14 +70,11 @@ ASSET_PARTITION_RANGE_START_TAG, BACKFILL_ID_TAG, MAX_RETRIES_TAG, - PARENT_RUN_ID_TAG, PARTITION_NAME_TAG, - ROOT_RUN_ID_TAG, ) from dagster._core.test_utils import ( create_run_for_test, environ, - mark_run_successful, step_did_not_run, step_failed, step_succeeded, @@ -83,6 +82,9 @@ from dagster._core.types.loadable_target_origin import LoadableTargetOrigin from dagster._core.workspace.context import WorkspaceProcessContext from dagster._daemon import get_default_daemon_logger +from dagster._daemon.auto_run_reexecution.auto_run_reexecution import ( + consume_new_runs_for_automatic_reexecution, +) from dagster._daemon.backfill import execute_backfill_iteration from dagster._seven import IS_WINDOWS, get_system_temp_directory from dagster._time import get_current_timestamp @@ -232,6 +234,12 @@ def bar(a1): return a1 +@asset(partitions_def=static_partitions) +def pass_on_retry(context): + if context.run.parent_run_id is None: + raise Exception("I failed!") + + @asset(partitions_def=static_partitions) def always_fails(): raise Exception("I always fail") @@ -435,6 +443,7 @@ def the_repo(): ab2, define_asset_job("twisted_asset_mess", selection="*b2", partitions_def=static_partitions), always_fails, + pass_on_retry, # baz is a configurable asset which has no dependencies baz, asset_a, @@ -2825,7 +2834,7 @@ def test_asset_backfill_not_complete_until_retries_complete( ) ) assert instance.get_runs_count() == 0 - backfill = instance.get_backfill("run_retries_backfill") + backfill = instance.get_backfill(backfill_id) assert backfill assert backfill.status == BulkActionStatus.REQUESTED @@ -2859,9 +2868,24 @@ def test_asset_backfill_not_complete_until_retries_complete( list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) backfill = instance.get_backfill(backfill_id) assert backfill + assert backfill.asset_backfill_data.all_targeted_partitions_have_materialization_status() assert backfill.status == BulkActionStatus.REQUESTED - mark_run_successful(instance, retried_run) + # manually mark the run as successful to show that the backfill will be marked as complete + # since there are no in prgress runs + instance.handle_new_event( + EventLogEntry( + error_info=None, + level="debug", + user_message="", + run_id=run.run_id, + timestamp=time.time(), + dagster_event=DagsterEvent( + event_type_value=DagsterEventType.RUN_SUCCESS.value, + job_name=run.job_name, + ), + ) + ) retried_run = instance.get_runs(filters=RunsFilter(run_ids=[retried_run.run_id]))[0] assert retried_run.status == DagsterRunStatus.SUCCESS @@ -2880,7 +2904,7 @@ def test_asset_backfill_not_complete_if_automatic_retry_could_happen( del remote_repo backfill_id = "run_retries_backfill" partition_keys = static_partitions.get_partition_keys() - asset_selection = [AssetKey("foo"), AssetKey("always_fails")] + asset_selection = [AssetKey("foo"), AssetKey("pass_on_retry")] instance.add_backfill( PartitionBackfill.from_asset_partitions( asset_graph=workspace_context.create_request_context().asset_graph, @@ -2896,7 +2920,7 @@ def test_asset_backfill_not_complete_if_automatic_retry_could_happen( ) ) assert instance.get_runs_count() == 0 - backfill = instance.get_backfill("run_retries_backfill") + backfill = instance.get_backfill(backfill_id) assert backfill assert backfill.status == BulkActionStatus.REQUESTED @@ -2919,19 +2943,18 @@ def test_asset_backfill_not_complete_if_automatic_retry_could_happen( list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) backfill = instance.get_backfill(backfill_id) assert backfill + assert backfill.asset_backfill_data.all_targeted_partitions_have_materialization_status() assert backfill.status == BulkActionStatus.REQUESTED - # automatic retries wont get kicked off in test environment, so we manually create them and mark them completed - runs = instance.get_runs() - for run in runs: - retried_run = create_run_for_test( - instance=instance, - job_name=run.job_name, - tags={**run.tags, ROOT_RUN_ID_TAG: run.run_id, PARENT_RUN_ID_TAG: run.run_id}, - root_run_id=run.run_id, - parent_run_id=run.run_id, - ) - mark_run_successful(instance, retried_run) + # automatic retries wont get automatically run in test environment, so we run the function manually + runs = instance.get_run_records() + list( + consume_new_runs_for_automatic_reexecution( + workspace_process_context=workspace_context, run_records=runs + ) + ) + wait_for_all_runs_to_finish(instance, timeout=30) + assert instance.get_runs_count() == 6 list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) backfill = instance.get_backfill(backfill_id) From 932a44734959208235f6ae62440c9cba8324a76b Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 11 Nov 2024 16:24:00 -0500 Subject: [PATCH 06/12] fix --- .../dagster/dagster_tests/daemon_tests/test_backfill.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index c63d3b701c517..d44750ef843a3 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -2878,11 +2878,11 @@ def test_asset_backfill_not_complete_until_retries_complete( error_info=None, level="debug", user_message="", - run_id=run.run_id, + run_id=retried_run.run_id, timestamp=time.time(), dagster_event=DagsterEvent( event_type_value=DagsterEventType.RUN_SUCCESS.value, - job_name=run.job_name, + job_name=retried_run.job_name, ), ) ) From 6017c7f2ca1434f8613cb505a2007e3dce06364f Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 11 Nov 2024 16:53:40 -0500 Subject: [PATCH 07/12] fix --- .../dagster/dagster_tests/daemon_tests/test_backfill.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index d44750ef843a3..7002307fc4662 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -2936,7 +2936,7 @@ def test_asset_backfill_not_complete_if_automatic_retry_could_happen( assert run.tags[BACKFILL_ID_TAG] == backfill_id assert run.tags["custom_tag_key"] == "custom_tag_value" assert step_succeeded(instance, run, "foo") - assert step_failed(instance, run, "always_fails") + assert step_failed(instance, run, "pass_on_retry") # since the failed runs should have automatic retries launched for them, the backfill should not # be considered complete, even though the targeted asset partitions have a completed state From 247e4f481ed6b6067f14f48257e5d6f14acd992f Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 12 Nov 2024 10:20:13 -0500 Subject: [PATCH 08/12] small --- .../dagster/dagster/_core/execution/asset_backfill.py | 2 +- .../dagster/dagster_tests/daemon_tests/test_backfill.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index 3974327094920..ece46d54c98ba 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -954,7 +954,7 @@ def backfill_is_complete( if ( # Condition 1 - all asset partitions in the target subset have a materialization state backfill_data.all_targeted_partitions_have_materialization_status() - # Condtition 2 - there are no in progress runs for the backfill + # Condition 2 - there are no in progress runs for the backfill and len( instance.get_run_ids( filters=RunsFilter( diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index 7002307fc4662..7b659ef0c6442 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -2868,11 +2868,12 @@ def test_asset_backfill_not_complete_until_retries_complete( list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) backfill = instance.get_backfill(backfill_id) assert backfill + assert backfill.asset_backfill_data assert backfill.asset_backfill_data.all_targeted_partitions_have_materialization_status() assert backfill.status == BulkActionStatus.REQUESTED # manually mark the run as successful to show that the backfill will be marked as complete - # since there are no in prgress runs + # since there are no in progress runs instance.handle_new_event( EventLogEntry( error_info=None, @@ -2943,6 +2944,7 @@ def test_asset_backfill_not_complete_if_automatic_retry_could_happen( list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) backfill = instance.get_backfill(backfill_id) assert backfill + assert backfill.asset_backfill_data assert backfill.asset_backfill_data.all_targeted_partitions_have_materialization_status() assert backfill.status == BulkActionStatus.REQUESTED From ef8336f43f3dcd1133b56e9379396d64daf4aec1 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 25 Nov 2024 13:52:05 -0500 Subject: [PATCH 09/12] update to use tags and add logging --- .../dagster/_core/execution/asset_backfill.py | 69 +++++++++++-------- .../execution_tests/test_asset_backfill.py | 5 +- 2 files changed, 44 insertions(+), 30 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index ece46d54c98ba..4b927ea22a6c8 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -62,13 +62,16 @@ ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG, BACKFILL_ID_TAG, + DID_RETRY_TAG, PARTITION_NAME_TAG, + WILL_RETRY_TAG, ) from dagster._core.utils import make_new_run_id, toposort from dagster._core.workspace.context import BaseWorkspaceRequestContext, IWorkspaceProcessContext from dagster._serdes import whitelist_for_serdes from dagster._time import datetime_from_timestamp, get_current_timestamp from dagster._utils.caching_instance_queryer import CachingInstanceQueryer +from dagster._utils.tags import get_boolean_tag_value if TYPE_CHECKING: from dagster._core.execution.backfill import PartitionBackfill @@ -930,12 +933,15 @@ def _check_validity_and_deserialize_asset_backfill_data( def backfill_is_complete( - backfill_id: str, backfill_data: AssetBackfillData, instance: DagsterInstance + backfill_id: str, + backfill_data: AssetBackfillData, + instance: DagsterInstance, + logger: logging.Logger, ): """A backfill is complete when: 1. all asset partitions in the target subset have a materialization state (successful, failed, downstream of a failed partition). 2. there are no in progress runs for the backfill. - 3. there are no failed runs that will result in an automatic retry. + 3. there are no failed runs that will result in an automatic retry, but have not yet been retried. Condition 1 ensures that for each asset partition we have attempted to materialize it or have determined we cannot materialize it because of a failed dependency. Condition 2 ensures that no retries of failed runs are @@ -946,16 +952,16 @@ def backfill_is_complete( daemon continues to update the backfill data until all runs have finished in order to display the final partition statuses in the UI. """ - from dagster._daemon.auto_run_reexecution.auto_run_reexecution import ( - filter_runs_to_should_retry, - ) - - # conditions are in order of least expensive to most expensive to check + # Condition 1 - if any asset partitions in the target subset do not have a materialization state, the backfill + # is not complete + if not backfill_data.all_targeted_partitions_have_materialization_status(): + logger.info( + "Not all targeted asset partitions have a materialization status. Backfill is still in progress." + ) + return False + # Condition 2 - if there are in progress runs for the backfill, the backfill is not complete if ( - # Condition 1 - all asset partitions in the target subset have a materialization state - backfill_data.all_targeted_partitions_have_materialization_status() - # Condition 2 - there are no in progress runs for the backfill - and len( + len( instance.get_run_ids( filters=RunsFilter( statuses=NOT_FINISHED_STATUSES, @@ -964,26 +970,28 @@ def backfill_is_complete( limit=1, ) ) - == 0 - # Condition 3 - there are no failed runs that will be retried - and len( - list( - filter_runs_to_should_retry( - instance.get_runs( - filters=RunsFilter( - tags={BACKFILL_ID_TAG: backfill_id}, - statuses=[DagsterRunStatus.FAILURE], - ) - ), - instance, - instance.run_retries_max_retries, + > 0 + ): + logger.info("Backfill has in progress runs. Backfill is still in progress.") + return False + # Condition 3 - if there are runs that will be retried, but have not yet been retried, the backfill is not complete + if any( + [ + get_boolean_tag_value(run.tags.get(WILL_RETRY_TAG), False) + and run.tags.get(DID_RETRY_TAG) is None + for run in instance.get_runs( + filters=RunsFilter( + tags={BACKFILL_ID_TAG: backfill_id}, + statuses=[DagsterRunStatus.FAILURE], ) ) - ) - == 0 + ] ): - return True - return False + logger.info( + "Some runs for the backfill will be retried, but have not been launched. Backfill is still in progress." + ) + return False + return True def execute_asset_backfill_iteration( @@ -1105,7 +1113,10 @@ def execute_asset_backfill_iteration( updated_backfill_data = updated_backfill.get_asset_backfill_data(asset_graph) if backfill_is_complete( - backfill_id=backfill.backfill_id, backfill_data=updated_backfill_data, instance=instance + backfill_id=backfill.backfill_id, + backfill_data=updated_backfill_data, + instance=instance, + logger=logger, ): if ( updated_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py index 1fbc04c5a1d69..e3ebe7a6ae057 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py @@ -620,7 +620,10 @@ def run_backfill_to_completion( ) while not backfill_is_complete( - backfill_id=backfill_id, backfill_data=backfill_data, instance=instance + backfill_id=backfill_id, + backfill_data=backfill_data, + instance=instance, + logger=logging.getLogger("fake_logger"), ): iteration_count += 1 From e9ec93113b50a36453a256040c4350d509b3ec0f Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 25 Nov 2024 14:52:36 -0500 Subject: [PATCH 10/12] turn run retries on for backfill tests --- .../dagster/_core/execution/asset_backfill.py | 4 ++-- .../dagster_tests/daemon_tests/conftest.py | 24 ++++++++++++------- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index 4b927ea22a6c8..0dba00d5635d5 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -61,8 +61,8 @@ from dagster._core.storage.tags import ( ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG, + AUTO_RETRY_RUN_ID, BACKFILL_ID_TAG, - DID_RETRY_TAG, PARTITION_NAME_TAG, WILL_RETRY_TAG, ) @@ -978,7 +978,7 @@ def backfill_is_complete( if any( [ get_boolean_tag_value(run.tags.get(WILL_RETRY_TAG), False) - and run.tags.get(DID_RETRY_TAG) is None + and run.tags.get(AUTO_RETRY_RUN_ID) is None for run in instance.get_runs( filters=RunsFilter( tags={BACKFILL_ID_TAG: backfill_id}, diff --git a/python_modules/dagster/dagster_tests/daemon_tests/conftest.py b/python_modules/dagster/dagster_tests/daemon_tests/conftest.py index 9231dc882e573..edb4acfd36dbe 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/conftest.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/conftest.py @@ -1,5 +1,6 @@ import os import sys +import tempfile from typing import Iterator, Optional, cast import pytest @@ -21,15 +22,22 @@ @pytest.fixture(name="instance_module_scoped", scope="module") def instance_module_scoped_fixture() -> Iterator[DagsterInstance]: - with instance_for_test( - overrides={ - "run_launcher": { - "module": "dagster._core.launcher.sync_in_memory_run_launcher", - "class": "SyncInMemoryRunLauncher", + with tempfile.TemporaryDirectory() as temp_dir: + with instance_for_test( + overrides={ + "run_launcher": { + "module": "dagster._core.launcher.sync_in_memory_run_launcher", + "class": "SyncInMemoryRunLauncher", + }, + "event_log_storage": { + "module": "dagster._core.storage.event_log", + "class": "ConsolidatedSqliteEventLogStorage", + "config": {"base_dir": temp_dir}, + }, + "run_retries": {"enabled": True}, } - } - ) as instance: - yield instance + ) as instance: + yield instance @pytest.fixture(name="instance", scope="function") From edfa1a38912f233cc7305b7434a05d36eeaf43c0 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 25 Nov 2024 17:18:06 -0500 Subject: [PATCH 11/12] tag update --- .../dagster/dagster/_core/execution/asset_backfill.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index 0dba00d5635d5..846d9dcc752aa 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -61,7 +61,7 @@ from dagster._core.storage.tags import ( ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG, - AUTO_RETRY_RUN_ID, + AUTO_RETRY_RUN_ID_TAG, BACKFILL_ID_TAG, PARTITION_NAME_TAG, WILL_RETRY_TAG, @@ -978,7 +978,7 @@ def backfill_is_complete( if any( [ get_boolean_tag_value(run.tags.get(WILL_RETRY_TAG), False) - and run.tags.get(AUTO_RETRY_RUN_ID) is None + and run.tags.get(AUTO_RETRY_RUN_ID_TAG) is None for run in instance.get_runs( filters=RunsFilter( tags={BACKFILL_ID_TAG: backfill_id}, From 701562db5b21256ef6a409ce160ca9c7a1444cd6 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 4 Dec 2024 12:10:13 -0500 Subject: [PATCH 12/12] util fn --- .../dagster/_core/execution/asset_backfill.py | 6 +----- .../dagster/_core/storage/dagster_run.py | 21 +++++++++++++++++++ 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index 846d9dcc752aa..c8aa4b22336aa 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -61,17 +61,14 @@ from dagster._core.storage.tags import ( ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG, - AUTO_RETRY_RUN_ID_TAG, BACKFILL_ID_TAG, PARTITION_NAME_TAG, - WILL_RETRY_TAG, ) from dagster._core.utils import make_new_run_id, toposort from dagster._core.workspace.context import BaseWorkspaceRequestContext, IWorkspaceProcessContext from dagster._serdes import whitelist_for_serdes from dagster._time import datetime_from_timestamp, get_current_timestamp from dagster._utils.caching_instance_queryer import CachingInstanceQueryer -from dagster._utils.tags import get_boolean_tag_value if TYPE_CHECKING: from dagster._core.execution.backfill import PartitionBackfill @@ -977,8 +974,7 @@ def backfill_is_complete( # Condition 3 - if there are runs that will be retried, but have not yet been retried, the backfill is not complete if any( [ - get_boolean_tag_value(run.tags.get(WILL_RETRY_TAG), False) - and run.tags.get(AUTO_RETRY_RUN_ID_TAG) is None + run.is_complete_and_waiting_to_retry for run in instance.get_runs( filters=RunsFilter( tags={BACKFILL_ID_TAG: backfill_id}, diff --git a/python_modules/dagster/dagster/_core/storage/dagster_run.py b/python_modules/dagster/dagster/_core/storage/dagster_run.py index 46f38e31a7d8a..5c948829638eb 100644 --- a/python_modules/dagster/dagster/_core/storage/dagster_run.py +++ b/python_modules/dagster/dagster/_core/storage/dagster_run.py @@ -24,6 +24,7 @@ from dagster._core.origin import JobPythonOrigin from dagster._core.storage.tags import ( ASSET_EVALUATION_ID_TAG, + AUTO_RETRY_RUN_ID_TAG, AUTOMATION_CONDITION_TAG, BACKFILL_ID_TAG, PARENT_RUN_ID_TAG, @@ -33,10 +34,12 @@ SCHEDULE_NAME_TAG, SENSOR_NAME_TAG, TICK_ID_TAG, + WILL_RETRY_TAG, ) from dagster._core.utils import make_new_run_id from dagster._record import IHaveNew, record_custom from dagster._serdes.serdes import NamedTupleSerializer, whitelist_for_serdes +from dagster._utils.tags import get_boolean_tag_value if TYPE_CHECKING: from dagster._core.definitions.schedule_definition import ScheduleDefinition @@ -478,6 +481,24 @@ def is_resume_retry(self) -> bool: """bool: If this run was created from retrying another run from the point of failure.""" return self.tags.get(RESUME_RETRY_TAG) == "true" + @property + def is_complete_and_waiting_to_retry(self): + """Indicates if a run is waiting to be retried by the auto-reexecution system. + Returns True if 1) the run is complete, 2) the run is in a failed state (therefore eligible for retry), + 3) the run is marked as needing to be retried, and 4) the retried run has not been launched yet. + Otherwise returns False. + """ + if self.status in NOT_FINISHED_STATUSES: + return False + if self.status != DagsterRunStatus.FAILURE: + return False + will_retry = get_boolean_tag_value(self.tags.get(WILL_RETRY_TAG), default_value=False) + retry_not_launched = self.tags.get(AUTO_RETRY_RUN_ID_TAG) is None + if will_retry: + return retry_not_launched + + return False + @property def previous_run_id(self) -> Optional[str]: # Compat