From c8018908ffc13c650fb1d0cac1d6427139f5d90c Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 7 Nov 2024 12:10:40 -0500 Subject: [PATCH] add test utils to mark a run successful or failed --- .../dagster/dagster/_core/test_utils.py | 35 ++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/python_modules/dagster/dagster/_core/test_utils.py b/python_modules/dagster/dagster/_core/test_utils.py index 87a02c9100b84..519ca365d6eb2 100644 --- a/python_modules/dagster/dagster/_core/test_utils.py +++ b/python_modules/dagster/dagster/_core/test_utils.py @@ -53,7 +53,8 @@ from dagster._core.definitions.source_asset import SourceAsset from dagster._core.definitions.unresolved_asset_job_definition import define_asset_job from dagster._core.errors import DagsterUserCodeUnreachableError -from dagster._core.events import DagsterEvent +from dagster._core.events import DagsterEvent, DagsterEventType +from dagster._core.events.log import EventLogEntry from dagster._core.instance import DagsterInstance # test utils from separate light weight file since are exported top level @@ -216,6 +217,38 @@ def register_managed_run_for_test( ) +def mark_run_successful(instance: DagsterInstance, run: DagsterRun) -> None: + instance.handle_new_event( + EventLogEntry( + error_info=None, + level="debug", + user_message="", + run_id=run.run_id, + timestamp=time.time(), + dagster_event=DagsterEvent( + event_type_value=DagsterEventType.RUN_SUCCESS.value, + job_name=run.job_name, + ), + ) + ) + + +def mark_run_failed(instance: DagsterInstance, run: DagsterRun) -> None: + instance.handle_new_event( + EventLogEntry( + error_info=None, + level="debug", + user_message="", + run_id=run.run_id, + timestamp=time.time(), + dagster_event=DagsterEvent( + event_type_value=DagsterEventType.RUN_FAILURE.value, + job_name=run.job_name, + ), + ) + ) + + def wait_for_runs_to_finish( instance: DagsterInstance, timeout: float = 20, run_tags: Optional[Mapping[str, str]] = None ) -> None: