Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[backfill daemon run retries 1/n] update how we determine backfill completion to account for retried runs #25771

Merged
merged 12 commits into from
Dec 5, 2024
Prev Previous commit
Next Next commit
update to use tags and add logging
  • Loading branch information
jamiedemaria committed Dec 5, 2024
commit ef8336f43f3dcd1133b56e9379396d64daf4aec1
69 changes: 40 additions & 29 deletions python_modules/dagster/dagster/_core/execution/asset_backfill.py
Original file line number Diff line number Diff line change
@@ -62,13 +62,16 @@
ASSET_PARTITION_RANGE_END_TAG,
ASSET_PARTITION_RANGE_START_TAG,
BACKFILL_ID_TAG,
DID_RETRY_TAG,
PARTITION_NAME_TAG,
WILL_RETRY_TAG,
)
from dagster._core.utils import make_new_run_id, toposort
from dagster._core.workspace.context import BaseWorkspaceRequestContext, IWorkspaceProcessContext
from dagster._serdes import whitelist_for_serdes
from dagster._time import datetime_from_timestamp, get_current_timestamp
from dagster._utils.caching_instance_queryer import CachingInstanceQueryer
from dagster._utils.tags import get_boolean_tag_value

if TYPE_CHECKING:
from dagster._core.execution.backfill import PartitionBackfill
@@ -930,12 +933,15 @@ def _check_validity_and_deserialize_asset_backfill_data(


def backfill_is_complete(
backfill_id: str, backfill_data: AssetBackfillData, instance: DagsterInstance
backfill_id: str,
backfill_data: AssetBackfillData,
instance: DagsterInstance,
logger: logging.Logger,
):
"""A backfill is complete when:
1. all asset partitions in the target subset have a materialization state (successful, failed, downstream of a failed partition).
2. there are no in progress runs for the backfill.
3. there are no failed runs that will result in an automatic retry.
3. there are no failed runs that will result in an automatic retry, but have not yet been retried.

Condition 1 ensures that for each asset partition we have attempted to materialize it or have determined we
cannot materialize it because of a failed dependency. Condition 2 ensures that no retries of failed runs are
@@ -946,16 +952,16 @@ def backfill_is_complete(
daemon continues to update the backfill data until all runs have finished in order to display the
final partition statuses in the UI.
"""
from dagster._daemon.auto_run_reexecution.auto_run_reexecution import (
filter_runs_to_should_retry,
)

# conditions are in order of least expensive to most expensive to check
# Condition 1 - if any asset partitions in the target subset do not have a materialization state, the backfill
# is not complete
if not backfill_data.all_targeted_partitions_have_materialization_status():
logger.info(
"Not all targeted asset partitions have a materialization status. Backfill is still in progress."
)
return False
# Condition 2 - if there are in progress runs for the backfill, the backfill is not complete
if (
# Condition 1 - all asset partitions in the target subset have a materialization state
backfill_data.all_targeted_partitions_have_materialization_status()
# Condition 2 - there are no in progress runs for the backfill
and len(
len(
instance.get_run_ids(
filters=RunsFilter(
statuses=NOT_FINISHED_STATUSES,
@@ -964,26 +970,28 @@ def backfill_is_complete(
limit=1,
)
)
== 0
# Condition 3 - there are no failed runs that will be retried
and len(
list(
filter_runs_to_should_retry(
instance.get_runs(
filters=RunsFilter(
tags={BACKFILL_ID_TAG: backfill_id},
statuses=[DagsterRunStatus.FAILURE],
)
),
instance,
instance.run_retries_max_retries,
> 0
):
logger.info("Backfill has in progress runs. Backfill is still in progress.")
return False
# Condition 3 - if there are runs that will be retried, but have not yet been retried, the backfill is not complete
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this still behave reasonably on old versions of user code that are not necessarily setting WILL_RETRY_TAG? I think in that case we would just ignore this condition right? (and potentially finish the backfill 'early', like we were doing before)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe i made a bad assumption, but i figured that the version of the backfill daemon would be the same as the version of the auto-retry daemon. and the auto-retry daemon will add the will_retry tag if it wasn't added when the run failure event was handled, which made me think we could rely on this being set

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but yes, in the case that the will_retry tag isn't getting added to runs, the runs will have is_complete_and_waiting_to_retry as False so that would result in the backfill being considered complete

if any(
[
get_boolean_tag_value(run.tags.get(WILL_RETRY_TAG), False)
and run.tags.get(DID_RETRY_TAG) is None
for run in instance.get_runs(
filters=RunsFilter(
tags={BACKFILL_ID_TAG: backfill_id},
statuses=[DagsterRunStatus.FAILURE],
)
)
)
== 0
]
):
return True
return False
logger.info(
"Some runs for the backfill will be retried, but have not been launched. Backfill is still in progress."
)
return False
return True


def execute_asset_backfill_iteration(
@@ -1105,7 +1113,10 @@ def execute_asset_backfill_iteration(
updated_backfill_data = updated_backfill.get_asset_backfill_data(asset_graph)

if backfill_is_complete(
backfill_id=backfill.backfill_id, backfill_data=updated_backfill_data, instance=instance
backfill_id=backfill.backfill_id,
backfill_data=updated_backfill_data,
instance=instance,
logger=logger,
):
if (
updated_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets
Original file line number Diff line number Diff line change
@@ -620,7 +620,10 @@ def run_backfill_to_completion(
)

while not backfill_is_complete(
backfill_id=backfill_id, backfill_data=backfill_data, instance=instance
backfill_id=backfill_id,
backfill_data=backfill_data,
instance=instance,
logger=logging.getLogger("fake_logger"),
):
iteration_count += 1