diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py
index 7fe3026e41ae8..c8aa4b22336aa 100644
--- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py
+++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py
@@ -54,6 +54,7 @@
 from dagster._core.storage.dagster_run import (
     CANCELABLE_RUN_STATUSES,
     IN_PROGRESS_RUN_STATUSES,
+    NOT_FINISHED_STATUSES,
     DagsterRunStatus,
     RunsFilter,
 )
@@ -166,11 +167,12 @@ def with_latest_storage_id(self, latest_storage_id: Optional[int]) -> "AssetBack
     def with_requested_runs_for_target_roots(self, requested_runs_for_target_roots: bool):
         return self._replace(requested_runs_for_target_roots=requested_runs_for_target_roots)
 
-    def is_complete(self) -> bool:
+    def all_targeted_partitions_have_materialization_status(self) -> bool:
         """The asset backfill is complete when all runs to be requested have finished (success,
         failure, or cancellation). Since the AssetBackfillData object stores materialization states
-        per asset partition, the daemon continues to update the backfill data until all runs have
-        finished in order to display the final partition statuses in the UI.
+        per asset partition, we can use the materialization states and whether any runs for the backfill are
+        not finished to determine if the backfill is complete. We want the daemon to continue to update
+        the backfill data until all runs have finished in order to display the final partition statuses in the UI.
         """
         return (
             (
@@ -927,6 +929,67 @@ def _check_validity_and_deserialize_asset_backfill_data(
     return asset_backfill_data
 
 
+def backfill_is_complete(
+    backfill_id: str,
+    backfill_data: AssetBackfillData,
+    instance: DagsterInstance,
+    logger: logging.Logger,
+):
+    """A backfill is complete when:
+    1. all asset partitions in the target subset have a materialization state (successful, failed, downstream of a failed partition).
+    2. there are no in progress runs for the backfill.
+    3. there are no failed runs that will result in an automatic retry, but have not yet been retried.
+
+    Condition 1 ensures that for each asset partition we have attempted to materialize it or have determined we
+    cannot materialize it because of a failed dependency. Condition 2 ensures that no retries of failed runs are
+    in progress. Condition 3 guards against a race condition where a failed run could be automatically retried
+    but it was not added into the queue in time to be caught by condition 2.
+
+    Since the AssetBackfillData object stores materialization states per asset partition, we want to ensure the
+    daemon continues to update the backfill data until all runs have finished in order to display the
+    final partition statuses in the UI.
+    """
+    # Condition 1 - if any asset partitions in the target subset do not have a materialization state, the backfill
+    # is not complete
+    if not backfill_data.all_targeted_partitions_have_materialization_status():
+        logger.info(
+            "Not all targeted asset partitions have a materialization status. Backfill is still in progress."
+        )
+        return False
+    # Condition 2 - if there are in progress runs for the backfill, the backfill is not complete
+    if (
+        len(
+            instance.get_run_ids(
+                filters=RunsFilter(
+                    statuses=NOT_FINISHED_STATUSES,
+                    tags={BACKFILL_ID_TAG: backfill_id},
+                ),
+                limit=1,
+            )
+        )
+        > 0
+    ):
+        logger.info("Backfill has in progress runs. Backfill is still in progress.")
+        return False
+    # Condition 3 - if there are runs that will be retried, but have not yet been retried, the backfill is not complete
+    if any(
+        [
+            run.is_complete_and_waiting_to_retry
+            for run in instance.get_runs(
+                filters=RunsFilter(
+                    tags={BACKFILL_ID_TAG: backfill_id},
+                    statuses=[DagsterRunStatus.FAILURE],
+                )
+            )
+        ]
+    ):
+        logger.info(
+            "Some runs for the backfill will be retried, but have not been launched. Backfill is still in progress."
+        )
+        return False
+    return True
+
+
 def execute_asset_backfill_iteration(
     backfill: "PartitionBackfill",
     logger: logging.Logger,
@@ -1045,11 +1108,12 @@ def execute_asset_backfill_iteration(
 
         updated_backfill_data = updated_backfill.get_asset_backfill_data(asset_graph)
 
-        if updated_backfill_data.is_complete():
-            # The asset backfill is complete when all runs to be requested have finished (success,
-            # failure, or cancellation). Since the AssetBackfillData object stores materialization states
-            # per asset partition, the daemon continues to update the backfill data until all runs have
-            # finished in order to display the final partition statuses in the UI.
+        if backfill_is_complete(
+            backfill_id=backfill.backfill_id,
+            backfill_data=updated_backfill_data,
+            instance=instance,
+            logger=logger,
+        ):
             if (
                 updated_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets
                 > 0
diff --git a/python_modules/dagster/dagster/_core/storage/dagster_run.py b/python_modules/dagster/dagster/_core/storage/dagster_run.py
index 46f38e31a7d8a..5c948829638eb 100644
--- a/python_modules/dagster/dagster/_core/storage/dagster_run.py
+++ b/python_modules/dagster/dagster/_core/storage/dagster_run.py
@@ -24,6 +24,7 @@
 from dagster._core.origin import JobPythonOrigin
 from dagster._core.storage.tags import (
     ASSET_EVALUATION_ID_TAG,
+    AUTO_RETRY_RUN_ID_TAG,
     AUTOMATION_CONDITION_TAG,
     BACKFILL_ID_TAG,
     PARENT_RUN_ID_TAG,
@@ -33,10 +34,12 @@
     SCHEDULE_NAME_TAG,
     SENSOR_NAME_TAG,
     TICK_ID_TAG,
+    WILL_RETRY_TAG,
 )
 from dagster._core.utils import make_new_run_id
 from dagster._record import IHaveNew, record_custom
 from dagster._serdes.serdes import NamedTupleSerializer, whitelist_for_serdes
+from dagster._utils.tags import get_boolean_tag_value
 
 if TYPE_CHECKING:
     from dagster._core.definitions.schedule_definition import ScheduleDefinition
@@ -478,6 +481,24 @@ def is_resume_retry(self) -> bool:
         """bool: If this run was created from retrying another run from the point of failure."""
         return self.tags.get(RESUME_RETRY_TAG) == "true"
 
+    @property
+    def is_complete_and_waiting_to_retry(self):
+        """Indicates if a run is waiting to be retried by the auto-reexecution system.
+        Returns True if 1) the run is complete, 2) the run is in a failed state (therefore eligible for retry),
+        3) the run is marked as needing to be retried, and 4) the retried run has not been launched yet.
+        Otherwise returns False.
+        """
+        if self.status in NOT_FINISHED_STATUSES:
+            return False
+        if self.status != DagsterRunStatus.FAILURE:
+            return False
+        will_retry = get_boolean_tag_value(self.tags.get(WILL_RETRY_TAG), default_value=False)
+        retry_not_launched = self.tags.get(AUTO_RETRY_RUN_ID_TAG) is None
+        if will_retry:
+            return retry_not_launched
+
+        return False
+
     @property
     def previous_run_id(self) -> Optional[str]:
         # Compat
diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py
index 7ad4fb7bf76d1..e3ebe7a6ae057 100644
--- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py
+++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py
@@ -55,6 +55,7 @@
     AssetBackfillData,
     AssetBackfillIterationResult,
     AssetBackfillStatus,
+    backfill_is_complete,
     execute_asset_backfill_iteration_inner,
     get_canceling_asset_backfill_iteration_data,
 )
@@ -618,7 +619,12 @@ def run_backfill_to_completion(
         evaluation_time=backfill_data.backfill_start_datetime,
     )
 
-    while not backfill_data.is_complete():
+    while not backfill_is_complete(
+        backfill_id=backfill_id,
+        backfill_data=backfill_data,
+        instance=instance,
+        logger=logging.getLogger("fake_logger"),
+    ):
         iteration_count += 1
 
         result1 = execute_asset_backfill_iteration_consume_generator(
@@ -628,7 +634,6 @@ def run_backfill_to_completion(
             instance=instance,
         )
 
-        # iteration_count += 1
         assert result1.backfill_data != backfill_data
 
         instance_queryer = _get_instance_queryer(
diff --git a/python_modules/dagster/dagster_tests/daemon_tests/conftest.py b/python_modules/dagster/dagster_tests/daemon_tests/conftest.py
index 9231dc882e573..edb4acfd36dbe 100644
--- a/python_modules/dagster/dagster_tests/daemon_tests/conftest.py
+++ b/python_modules/dagster/dagster_tests/daemon_tests/conftest.py
@@ -1,5 +1,6 @@
 import os
 import sys
+import tempfile
 from typing import Iterator, Optional, cast
 
 import pytest
@@ -21,15 +22,22 @@
 
 @pytest.fixture(name="instance_module_scoped", scope="module")
 def instance_module_scoped_fixture() -> Iterator[DagsterInstance]:
-    with instance_for_test(
-        overrides={
-            "run_launcher": {
-                "module": "dagster._core.launcher.sync_in_memory_run_launcher",
-                "class": "SyncInMemoryRunLauncher",
+    with tempfile.TemporaryDirectory() as temp_dir:
+        with instance_for_test(
+            overrides={
+                "run_launcher": {
+                    "module": "dagster._core.launcher.sync_in_memory_run_launcher",
+                    "class": "SyncInMemoryRunLauncher",
+                },
+                "event_log_storage": {
+                    "module": "dagster._core.storage.event_log",
+                    "class": "ConsolidatedSqliteEventLogStorage",
+                    "config": {"base_dir": temp_dir},
+                },
+                "run_retries": {"enabled": True},
             }
-        }
-    ) as instance:
-        yield instance
+        ) as instance:
+            yield instance
 
 
 @pytest.fixture(name="instance", scope="function")
diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py
index 7610743f51cf2..7b659ef0c6442 100644
--- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py
+++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py
@@ -46,6 +46,8 @@
     PartitionsSelector,
 )
 from dagster._core.errors import DagsterUserCodeUnreachableError
+from dagster._core.events import DagsterEvent, DagsterEventType
+from dagster._core.events.log import EventLogEntry
 from dagster._core.execution.asset_backfill import (
     AssetBackfillData,
     get_asset_backfill_run_chunk_size,
@@ -67,6 +69,7 @@
     ASSET_PARTITION_RANGE_END_TAG,
     ASSET_PARTITION_RANGE_START_TAG,
     BACKFILL_ID_TAG,
+    MAX_RETRIES_TAG,
     PARTITION_NAME_TAG,
 )
 from dagster._core.test_utils import (
@@ -79,6 +82,9 @@
 from dagster._core.types.loadable_target_origin import LoadableTargetOrigin
 from dagster._core.workspace.context import WorkspaceProcessContext
 from dagster._daemon import get_default_daemon_logger
+from dagster._daemon.auto_run_reexecution.auto_run_reexecution import (
+    consume_new_runs_for_automatic_reexecution,
+)
 from dagster._daemon.backfill import execute_backfill_iteration
 from dagster._seven import IS_WINDOWS, get_system_temp_directory
 from dagster._time import get_current_timestamp
@@ -228,6 +234,17 @@ def bar(a1):
     return a1
 
 
+@asset(partitions_def=static_partitions)
+def pass_on_retry(context):
+    if context.run.parent_run_id is None:
+        raise Exception("I failed!")
+
+
+@asset(partitions_def=static_partitions)
+def always_fails():
+    raise Exception("I always fail")
+
+
 @asset(
     config_schema={"myparam": Field(str, description="YYYY-MM-DD")},
 )
@@ -425,6 +442,8 @@ def the_repo():
         ab1,
         ab2,
         define_asset_job("twisted_asset_mess", selection="*b2", partitions_def=static_partitions),
+        always_fails,
+        pass_on_retry,
         # baz is a configurable asset which has no dependencies
         baz,
         asset_a,
@@ -2789,3 +2808,157 @@ def test_asset_backfill_from_asset_graph_subset_with_static_and_time_partitions(
     )
     assert backfill
     assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS
+
+
+def test_asset_backfill_not_complete_until_retries_complete(
+    instance: DagsterInstance,
+    workspace_context: WorkspaceProcessContext,
+    remote_repo: RemoteRepository,
+):
+    del remote_repo
+    backfill_id = "run_retries_backfill"
+    partition_keys = static_partitions.get_partition_keys()
+    asset_selection = [AssetKey("foo"), AssetKey("a1"), AssetKey("bar")]
+    instance.add_backfill(
+        PartitionBackfill.from_asset_partitions(
+            asset_graph=workspace_context.create_request_context().asset_graph,
+            backfill_id=backfill_id,
+            tags={"custom_tag_key": "custom_tag_value"},
+            backfill_timestamp=get_current_timestamp(),
+            asset_selection=asset_selection,
+            partition_names=partition_keys,
+            dynamic_partitions_store=instance,
+            all_partitions=False,
+            title=None,
+            description=None,
+        )
+    )
+    assert instance.get_runs_count() == 0
+    backfill = instance.get_backfill(backfill_id)
+    assert backfill
+    assert backfill.status == BulkActionStatus.REQUESTED
+
+    list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
+    assert instance.get_runs_count() == 3
+    wait_for_all_runs_to_start(instance, timeout=30)
+    assert instance.get_runs_count() == 3
+    wait_for_all_runs_to_finish(instance, timeout=30)
+
+    assert instance.get_runs_count() == 3
+    runs = reversed(list(instance.get_runs()))
+    for run in runs:
+        assert run.tags[BACKFILL_ID_TAG] == backfill_id
+        assert run.tags["custom_tag_key"] == "custom_tag_value"
+        assert step_succeeded(instance, run, "foo")
+        assert step_succeeded(instance, run, "reusable")
+        assert step_succeeded(instance, run, "bar")
+
+    # simulate a retry of a run
+    run_to_retry = instance.get_runs()[0]
+    retried_run = create_run_for_test(
+        instance=instance,
+        job_name=run_to_retry.job_name,
+        tags=run_to_retry.tags,
+        root_run_id=run_to_retry.run_id,
+        parent_run_id=run_to_retry.run_id,
+    )
+
+    # since there is a run in progress, the backfill should not be marked as complete, even though
+    # all targeted asset partitions have a completed state
+    list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
+    backfill = instance.get_backfill(backfill_id)
+    assert backfill
+    assert backfill.asset_backfill_data
+    assert backfill.asset_backfill_data.all_targeted_partitions_have_materialization_status()
+    assert backfill.status == BulkActionStatus.REQUESTED
+
+    # manually mark the run as successful to show that the backfill will be marked as complete
+    # since there are no in progress runs
+    instance.handle_new_event(
+        EventLogEntry(
+            error_info=None,
+            level="debug",
+            user_message="",
+            run_id=retried_run.run_id,
+            timestamp=time.time(),
+            dagster_event=DagsterEvent(
+                event_type_value=DagsterEventType.RUN_SUCCESS.value,
+                job_name=retried_run.job_name,
+            ),
+        )
+    )
+
+    retried_run = instance.get_runs(filters=RunsFilter(run_ids=[retried_run.run_id]))[0]
+    assert retried_run.status == DagsterRunStatus.SUCCESS
+
+    list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
+    backfill = instance.get_backfill(backfill_id)
+    assert backfill
+    assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS
+
+
+def test_asset_backfill_not_complete_if_automatic_retry_could_happen(
+    instance: DagsterInstance,
+    workspace_context: WorkspaceProcessContext,
+    remote_repo: RemoteRepository,
+):
+    del remote_repo
+    backfill_id = "run_retries_backfill"
+    partition_keys = static_partitions.get_partition_keys()
+    asset_selection = [AssetKey("foo"), AssetKey("pass_on_retry")]
+    instance.add_backfill(
+        PartitionBackfill.from_asset_partitions(
+            asset_graph=workspace_context.create_request_context().asset_graph,
+            backfill_id=backfill_id,
+            tags={"custom_tag_key": "custom_tag_value", MAX_RETRIES_TAG: "2"},
+            backfill_timestamp=get_current_timestamp(),
+            asset_selection=asset_selection,
+            partition_names=partition_keys,
+            dynamic_partitions_store=instance,
+            all_partitions=False,
+            title=None,
+            description=None,
+        )
+    )
+    assert instance.get_runs_count() == 0
+    backfill = instance.get_backfill(backfill_id)
+    assert backfill
+    assert backfill.status == BulkActionStatus.REQUESTED
+
+    list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
+    assert instance.get_runs_count() == 3
+    wait_for_all_runs_to_start(instance, timeout=30)
+    assert instance.get_runs_count() == 3
+    wait_for_all_runs_to_finish(instance, timeout=30)
+
+    assert instance.get_runs_count() == 3
+    runs = reversed(list(instance.get_runs()))
+    for run in runs:
+        assert run.tags[BACKFILL_ID_TAG] == backfill_id
+        assert run.tags["custom_tag_key"] == "custom_tag_value"
+        assert step_succeeded(instance, run, "foo")
+        assert step_failed(instance, run, "pass_on_retry")
+
+    # since the failed runs should have automatic retries launched for them, the backfill should not
+    # be considered complete, even though the targeted asset partitions have a completed state
+    list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
+    backfill = instance.get_backfill(backfill_id)
+    assert backfill
+    assert backfill.asset_backfill_data
+    assert backfill.asset_backfill_data.all_targeted_partitions_have_materialization_status()
+    assert backfill.status == BulkActionStatus.REQUESTED
+
+    # automatic retries wont get automatically run in test environment, so we run the function manually
+    runs = instance.get_run_records()
+    list(
+        consume_new_runs_for_automatic_reexecution(
+            workspace_process_context=workspace_context, run_records=runs
+        )
+    )
+    wait_for_all_runs_to_finish(instance, timeout=30)
+    assert instance.get_runs_count() == 6
+
+    list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
+    backfill = instance.get_backfill(backfill_id)
+    assert backfill
+    assert backfill.status == BulkActionStatus.COMPLETED_FAILED