From d307d1d35066ea06fa826680d96e491c7f40fa09 Mon Sep 17 00:00:00 2001 From: prha <1040172+prha@users.noreply.github.com> Date: Thu, 22 Aug 2024 09:24:33 -0700 Subject: [PATCH] merge captured log manager / compute log manager (#23531) ## Summary & Motivation Previously we asserted that the `compute_log_manager` conformed to the `CapturedLogManager` interface. This PR explicitly moves the `CapturedLogManager` interface into the `ComputeLogManager` abstract class. This is just for cognitive simplification and typing simplicity, so that we can just access `instance.compute_log_manager` everywhere. There's some question here whether this should be a breaking change or not (for `1.9`) ## How I Tested These Changes BK --- .../sphinx/sections/api/apidocs/internals.rst | 4 - .../implementation/execution/__init__.py | 8 +- .../implementation/fetch_logs.py | 5 - .../dagster_graphql/schema/backfill.py | 5 +- .../schema/logs/compute_logs.py | 2 +- .../dagster_graphql/schema/util.py | 5 +- .../dagster_webserver/webserver.py | 2 +- .../_core/definitions/instigation_logger.py | 9 +- .../dagster/dagster/_core/events/__init__.py | 2 +- .../_core/execution/plan/execute_plan.py | 2 - .../dagster/_core/instance/__init__.py | 4 +- .../_core/storage/captured_log_manager.py | 358 ----------------- .../cloud_storage_compute_log_manager.py | 7 +- .../_core/storage/compute_log_manager.py | 359 +++++++++++++++++- .../storage/local_compute_log_manager.py | 7 +- .../_core/storage/noop_compute_log_manager.py | 7 +- .../test_run_status_sensors.py | 2 - .../daemon_sensor_tests/test_sensor_run.py | 2 - .../daemon_tests/test_backfill.py | 4 +- .../engine_tests/test_multiprocessing.py | 2 - .../logging_tests/test_stdout.py | 6 +- .../test_captured_log_manager.py | 14 +- .../storage_tests/test_compute_log_manager.py | 225 ++++++++++- ..._log_manager.py => compute_log_manager.py} | 80 ++-- .../test_tags.py | 2 +- .../dagster_aws/s3/compute_log_manager.py | 3 +- .../s3_tests/test_compute_log_manager.py | 10 +- .../blob_tests/test_compute_log_manager.py | 14 +- .../dagster_gcp/gcs/compute_log_manager.py | 3 +- .../gcs_tests/test_compute_log_manager.py | 10 +- 30 files changed, 659 insertions(+), 504 deletions(-) delete mode 100644 python_modules/dagster/dagster/_core/storage/captured_log_manager.py rename python_modules/dagster/dagster_tests/storage_tests/utils/{captured_log_manager.py => compute_log_manager.py} (70%) diff --git a/docs/sphinx/sections/api/apidocs/internals.rst b/docs/sphinx/sections/api/apidocs/internals.rst index 545b2504ac582..8e2ed341a734c 100644 --- a/docs/sphinx/sections/api/apidocs/internals.rst +++ b/docs/sphinx/sections/api/apidocs/internals.rst @@ -148,10 +148,6 @@ See also: :py:class:`dagster_postgres.PostgresEventLogStorage` and :py:class:`da Compute log manager ------------------- -.. currentmodule:: dagster._core.storage.captured_log_manager - -.. autoclass:: CapturedLogManager - .. currentmodule:: dagster._core.storage.compute_log_manager .. autoclass:: ComputeLogManager diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py index 2a2758612796c..dd8ced30960a3 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py @@ -24,7 +24,6 @@ EngineEventData, ) from dagster._core.instance import DagsterInstance -from dagster._core.storage.captured_log_manager import CapturedLogData, CapturedLogManager from dagster._core.storage.dagster_run import CANCELABLE_RUN_STATUSES from dagster._core.workspace.permissions import Permissions from dagster._utils.error import serializable_error_info_from_exc_info @@ -46,6 +45,8 @@ ) if TYPE_CHECKING: + from dagster._core.storage.compute_log_manager import CapturedLogData + from dagster_graphql.schema.logs.compute_logs import GrapheneCapturedLogs from dagster_graphql.schema.pipelines.subscription import ( GraphenePipelineRunLogsSubscriptionFailure, @@ -315,13 +316,10 @@ async def gen_captured_log_data( instance = graphene_info.context.instance compute_log_manager = instance.compute_log_manager - if not isinstance(compute_log_manager, CapturedLogManager): - return - subscription = compute_log_manager.subscribe(log_key, cursor) loop = asyncio.get_event_loop() - queue: asyncio.Queue[CapturedLogData] = asyncio.Queue() + queue: asyncio.Queue["CapturedLogData"] = asyncio.Queue() def _enqueue(new_event): loop.call_soon_threadsafe(queue.put_nowait, new_event) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_logs.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_logs.py index 104874b70e931..4c388db3b623b 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_logs.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_logs.py @@ -1,7 +1,5 @@ from typing import TYPE_CHECKING, Sequence -from dagster._core.storage.captured_log_manager import CapturedLogManager - from dagster_graphql.schema.util import ResolveInfo if TYPE_CHECKING: @@ -13,9 +11,6 @@ def get_captured_log_metadata( ) -> "GrapheneCapturedLogsMetadata": from ..schema.logs.compute_logs import GrapheneCapturedLogsMetadata - if not isinstance(graphene_info.context.instance.compute_log_manager, CapturedLogManager): - return GrapheneCapturedLogsMetadata() - metadata = graphene_info.context.instance.compute_log_manager.get_log_metadata(log_key) return GrapheneCapturedLogsMetadata( stdoutDownloadUrl=metadata.stdout_download_url, diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py b/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py index 18a6b13245a49..e03727e9f13aa 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py @@ -18,7 +18,7 @@ from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill from dagster._core.instance import DagsterInstance from dagster._core.remote_representation.external import ExternalPartitionSet -from dagster._core.storage.captured_log_manager import CapturedLogManager, ComputeIOType +from dagster._core.storage.compute_log_manager import ComputeIOType from dagster._core.storage.dagster_run import DagsterRun, RunPartitionData, RunRecord, RunsFilter from dagster._core.storage.tags import ( ASSET_PARTITION_RANGE_END_TAG, @@ -651,9 +651,6 @@ def resolve_logEvents(self, graphene_info: ResolveInfo, cursor: Optional[str] = instance = graphene_info.context.instance - if not isinstance(instance.compute_log_manager, CapturedLogManager): - return GrapheneInstigationEventConnection(events=[], cursor="", hasMore=False) - if not instance.backfill_log_storage_enabled(): return GrapheneInstigationEventConnection(events=[], cursor="", hasMore=False) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/logs/compute_logs.py b/python_modules/dagster-graphql/dagster_graphql/schema/logs/compute_logs.py index d5761862aaaee..f1655545295e0 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/logs/compute_logs.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/logs/compute_logs.py @@ -1,5 +1,5 @@ import graphene -from dagster._core.storage.captured_log_manager import CapturedLogData +from dagster._core.storage.compute_log_manager import CapturedLogData from dagster_graphql.schema.util import non_null_list diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/util.py b/python_modules/dagster-graphql/dagster_graphql/schema/util.py index 1b2f335d82788..b1c6f8eb1dc22 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/util.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/util.py @@ -1,7 +1,7 @@ from typing import cast import graphene -from dagster._core.storage.captured_log_manager import CapturedLogManager +from dagster._core.storage.compute_log_manager import ComputeLogManager from dagster._core.workspace.context import WorkspaceRequestContext @@ -15,6 +15,5 @@ def non_null_list(of_type): return graphene.NonNull(graphene.List(graphene.NonNull(of_type))) -def get_compute_log_manager(graphene_info: ResolveInfo) -> CapturedLogManager: - assert isinstance(graphene_info.context.instance.compute_log_manager, CapturedLogManager) +def get_compute_log_manager(graphene_info: ResolveInfo) -> ComputeLogManager: return graphene_info.context.instance.compute_log_manager diff --git a/python_modules/dagster-webserver/dagster_webserver/webserver.py b/python_modules/dagster-webserver/dagster_webserver/webserver.py index 81522bfb6114d..eef61c6a209d2 100644 --- a/python_modules/dagster-webserver/dagster_webserver/webserver.py +++ b/python_modules/dagster-webserver/dagster_webserver/webserver.py @@ -9,8 +9,8 @@ from dagster import __version__ as dagster_version from dagster._annotations import deprecated from dagster._core.debug import DebugRunPayload -from dagster._core.storage.captured_log_manager import ComputeIOType from dagster._core.storage.cloud_storage_compute_log_manager import CloudStorageComputeLogManager +from dagster._core.storage.compute_log_manager import ComputeIOType from dagster._core.storage.local_compute_log_manager import LocalComputeLogManager from dagster._core.storage.runs.sql_run_storage import SqlRunStorage from dagster._core.workspace.context import BaseWorkspaceRequestContext, IWorkspaceProcessContext diff --git a/python_modules/dagster/dagster/_core/definitions/instigation_logger.py b/python_modules/dagster/dagster/_core/definitions/instigation_logger.py index 6b4996aeb7c21..e54cfe59db44e 100644 --- a/python_modules/dagster/dagster/_core/definitions/instigation_logger.py +++ b/python_modules/dagster/dagster/_core/definitions/instigation_logger.py @@ -8,7 +8,7 @@ from dagster import _seven from dagster._core.instance import DagsterInstance from dagster._core.log_manager import LOG_RECORD_METADATA_ATTR -from dagster._core.storage.captured_log_manager import CapturedLogManager, ComputeIOType +from dagster._core.storage.compute_log_manager import ComputeIOType, ComputeLogManager from dagster._core.utils import coerce_valid_log_level from dagster._utils.log import create_console_logger @@ -47,7 +47,7 @@ def emit(self, record: logging.LogRecord): class CapturedLogHandler(logging.Handler): - """Persist logging records to an IO stream controlled by the CapturedLogManager.""" + """Persist logging records to an IO stream controlled by the ComputeLogManager.""" def __init__(self, write_stream: IO): self._write_stream = write_stream @@ -105,7 +105,7 @@ def __enter__(self): if ( self._log_key and self._instance - and isinstance(self._instance.compute_log_manager, CapturedLogManager) + and isinstance(self._instance.compute_log_manager, ComputeLogManager) ): write_stream = self._exit_stack.enter_context( self._instance.compute_log_manager.open_log_stream( @@ -147,9 +147,6 @@ def has_captured_logs(self): def get_instigation_log_records( instance: DagsterInstance, log_key: Sequence[str] ) -> Sequence[Mapping[str, Any]]: - if not isinstance(instance.compute_log_manager, CapturedLogManager): - return [] - log_data = instance.compute_log_manager.get_log_data(log_key) raw_logs = log_data.stderr.decode("utf-8") if log_data.stderr else "" diff --git a/python_modules/dagster/dagster/_core/events/__init__.py b/python_modules/dagster/dagster/_core/events/__init__.py index 442f3e1bcea7a..0ec330df6f9f9 100644 --- a/python_modules/dagster/dagster/_core/events/__init__.py +++ b/python_modules/dagster/dagster/_core/events/__init__.py @@ -48,7 +48,7 @@ from dagster._core.execution.plan.objects import StepFailureData, StepRetryData, StepSuccessData from dagster._core.execution.plan.outputs import StepOutputData from dagster._core.log_manager import DagsterLogManager -from dagster._core.storage.captured_log_manager import CapturedLogContext +from dagster._core.storage.compute_log_manager import CapturedLogContext from dagster._core.storage.dagster_run import DagsterRunStatus from dagster._serdes import NamedTupleSerializer, whitelist_for_serdes from dagster._serdes.serdes import EnumSerializer, UnpackContext, is_whitelisted_for_serdes_object diff --git a/python_modules/dagster/dagster/_core/execution/plan/execute_plan.py b/python_modules/dagster/dagster/_core/execution/plan/execute_plan.py index 44091d2cd4eb3..d6ae3d168d91f 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/execute_plan.py +++ b/python_modules/dagster/dagster/_core/execution/plan/execute_plan.py @@ -25,7 +25,6 @@ step_failure_event_from_exc_info, ) from dagster._core.execution.plan.plan import ExecutionPlan -from dagster._core.storage.captured_log_manager import CapturedLogManager from dagster._utils.error import SerializableErrorInfo, serializable_error_info_from_exc_info @@ -37,7 +36,6 @@ def inner_plan_execution_iterator( check.inst_param(job_context, "pipeline_context", PlanExecutionContext) check.inst_param(execution_plan, "execution_plan", ExecutionPlan) compute_log_manager = job_context.instance.compute_log_manager - assert isinstance(compute_log_manager, CapturedLogManager) step_keys = [step.key for step in execution_plan.get_steps_to_execute_in_topo_order()] with execution_plan.start( retry_mode=job_context.retry_mode, diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index ab9a797043fdf..7f36a30e5809a 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -409,7 +409,7 @@ def __init__( from dagster._core.run_coordinator import RunCoordinator from dagster._core.scheduler import Scheduler from dagster._core.secrets import SecretsLoader - from dagster._core.storage.captured_log_manager import CapturedLogManager + from dagster._core.storage.compute_log_manager import ComputeLogManager from dagster._core.storage.event_log import EventLogStorage from dagster._core.storage.root import LocalArtifactStorage from dagster._core.storage.runs import RunStorage @@ -427,7 +427,7 @@ def __init__( if compute_log_manager: self._compute_log_manager = check.inst_param( - compute_log_manager, "compute_log_manager", CapturedLogManager + compute_log_manager, "compute_log_manager", ComputeLogManager ) self._compute_log_manager.register_instance(self) else: diff --git a/python_modules/dagster/dagster/_core/storage/captured_log_manager.py b/python_modules/dagster/dagster/_core/storage/captured_log_manager.py deleted file mode 100644 index 39eaac5694351..0000000000000 --- a/python_modules/dagster/dagster/_core/storage/captured_log_manager.py +++ /dev/null @@ -1,358 +0,0 @@ -import os -from abc import ABC, abstractmethod -from contextlib import contextmanager -from enum import Enum -from typing import IO, Callable, Generator, Iterator, NamedTuple, Optional, Sequence, Tuple - -from typing_extensions import Final, Self - -import dagster._check as check -from dagster._core.captured_log_api import LogLineCursor - -MAX_BYTES_CHUNK_READ: Final = 4194304 # 4 MB - - -class ComputeIOType(Enum): - STDOUT = "stdout" - STDERR = "stderr" - - -class CapturedLogContext( - NamedTuple( - "_CapturedLogContext", - [ - ("log_key", Sequence[str]), - ("external_url", Optional[str]), - ("external_stdout_url", Optional[str]), - ("external_stderr_url", Optional[str]), - ], - ) -): - """Object representing the context in which logs are captured. Can be used by external logging - sidecar implementations to point the Dagster UI to an external url to view compute logs instead of a - Dagster-managed location. - """ - - def __new__( - cls, - log_key: Sequence[str], - external_stdout_url: Optional[str] = None, - external_stderr_url: Optional[str] = None, - external_url: Optional[str] = None, - ): - if external_url and (external_stdout_url or external_stderr_url): - check.failed( - "Cannot specify both `external_url` and one of" - " `external_stdout_url`/`external_stderr_url`" - ) - - return super(CapturedLogContext, cls).__new__( - cls, - log_key, - external_stdout_url=external_stdout_url, - external_stderr_url=external_stderr_url, - external_url=external_url, - ) - - -class CapturedLogData( - NamedTuple( - "_CapturedLogData", - [ - ("log_key", Sequence[str]), - ("stdout", Optional[bytes]), - ("stderr", Optional[bytes]), - ("cursor", Optional[str]), - ], - ) -): - """Object representing captured log data, either a partial chunk of the log data or the full - capture. Contains the raw bytes and optionally the cursor offset for the partial chunk. - """ - - def __new__( - cls, - log_key: Sequence[str], - stdout: Optional[bytes] = None, - stderr: Optional[bytes] = None, - cursor: Optional[str] = None, - ): - return super(CapturedLogData, cls).__new__(cls, log_key, stdout, stderr, cursor) - - -class CapturedLogMetadata( - NamedTuple( - "_CapturedLogMetadata", - [ - ("stdout_location", Optional[str]), - ("stderr_location", Optional[str]), - ("stdout_download_url", Optional[str]), - ("stderr_download_url", Optional[str]), - ], - ) -): - """Object representing metadata info for the captured log data, containing a display string for - the location of the log data and a URL for direct download of the captured log data. - """ - - def __new__( - cls, - stdout_location: Optional[str] = None, - stderr_location: Optional[str] = None, - stdout_download_url: Optional[str] = None, - stderr_download_url: Optional[str] = None, - ): - return super(CapturedLogMetadata, cls).__new__( - cls, - stdout_location=stdout_location, - stderr_location=stderr_location, - stdout_download_url=stdout_download_url, - stderr_download_url=stderr_download_url, - ) - - -class CapturedLogSubscription: - def __init__( - self, manager: "CapturedLogManager", log_key: Sequence[str], cursor: Optional[str] - ): - self._manager = manager - self._log_key = log_key - self._cursor = cursor - self._observer: Optional[Callable[[CapturedLogData], None]] = None - self.is_complete = False - - def __call__(self, observer: Optional[Callable[[CapturedLogData], None]]) -> Self: - self._observer = observer - self.fetch() - if self._manager.is_capture_complete(self._log_key): - self.complete() - return self - - @property - def log_key(self) -> Sequence[str]: - return self._log_key - - def dispose(self) -> None: - self._observer = None - self._manager.unsubscribe(self) - - def fetch(self) -> None: - if not self._observer: - return - - should_fetch = True - while should_fetch: - log_data = self._manager.get_log_data( - self._log_key, - self._cursor, - max_bytes=MAX_BYTES_CHUNK_READ, - ) - if not self._cursor or log_data.cursor != self._cursor: - self._observer(log_data) - self._cursor = log_data.cursor - should_fetch = _has_max_data(log_data.stdout) or _has_max_data(log_data.stderr) - - def complete(self) -> None: - self.is_complete = True - - -def _has_max_data(chunk: Optional[bytes]) -> bool: - # function is used as predicate but does not actually return a boolean - return chunk and len(chunk) >= MAX_BYTES_CHUNK_READ # type: ignore - - -class CapturedLogManager(ABC): - """Abstract base class for capturing the unstructured logs (stdout/stderr) in the current - process, stored / retrieved with a provided log_key. - """ - - @abstractmethod - @contextmanager - def capture_logs(self, log_key: Sequence[str]) -> Generator[CapturedLogContext, None, None]: - """Context manager for capturing the stdout/stderr within the current process, and persisting - it under the given log key. - - Args: - log_key (List[String]): The log key identifying the captured logs - """ - - @abstractmethod - @contextmanager - def open_log_stream( - self, log_key: Sequence[str], io_type: ComputeIOType - ) -> Iterator[Optional[IO[bytes]]]: - """Context manager for providing an IO stream that enables the caller to write to a log stream - managed by the captured log manager, to be read later using the given log key. - - Args: - log_key (List[String]): The log key identifying the captured logs - """ - - @abstractmethod - def is_capture_complete(self, log_key: Sequence[str]) -> bool: - """Flag indicating when the log capture for a given log key has completed. - - Args: - log_key (List[String]): The log key identifying the captured logs - - Returns: - Boolean - """ - - @abstractmethod - def get_log_data( - self, - log_key: Sequence[str], - cursor: Optional[str] = None, - max_bytes: Optional[int] = None, - ) -> CapturedLogData: - """Returns a chunk of the captured stdout logs for a given log key. - - Args: - log_key (List[String]): The log key identifying the captured logs - cursor (Optional[str]): A cursor representing the position of the log chunk to fetch - max_bytes (Optional[int]): A limit on the size of the log chunk to fetch - - Returns: - CapturedLogData - """ - - @abstractmethod - def get_log_metadata(self, log_key: Sequence[str]) -> CapturedLogMetadata: - """Returns the metadata of the captured logs for a given log key, including - displayable information on where the logs are persisted. - - Args: - log_key (List[String]): The log key identifying the captured logs - - Returns: - CapturedLogMetadata - """ - - @abstractmethod - def delete_logs( - self, log_key: Optional[Sequence[str]] = None, prefix: Optional[Sequence[str]] = None - ) -> None: - """Deletes the captured logs for a given log key. - - Args: - log_key(Optional[List[String]]): The log key of the logs to delete - prefix(Optional[List[String]]): The prefix of the log keys to delete - """ - - @abstractmethod - def subscribe( - self, log_key: Sequence[str], cursor: Optional[str] = None - ) -> CapturedLogSubscription: - """Registers an observable object for log data. - - Args: - log_key (List[String]): The log key identifying the captured logs - cursor (Optional[String]): The string cursor marking the position within the log stream - Returns: - CapturedLogSubscription - """ - - def unsubscribe(self, subscription: CapturedLogSubscription) -> None: - """Deregisters an observable object from receiving log updates. - - Args: - subscription (CapturedLogSubscription): subscription object which manages when to send - back data to the subscriber - """ - pass - - def build_log_key_for_run(self, run_id: str, step_key: str) -> Sequence[str]: - """Legacy adapter to translate run_id/key to captured log manager-based log_key.""" - return [run_id, "compute_logs", step_key] - - def get_log_keys_for_log_key_prefix( - self, log_key_prefix: Sequence[str], io_type: ComputeIOType - ) -> Sequence[Sequence[str]]: - """Returns the logs keys for a given log key prefix. This is determined by looking at the - directory defined by the log key prefix and creating a log_key for each file in the directory. - """ - raise NotImplementedError("Must implement get_log_keys_for_log_key_prefix") - - def _get_log_lines_for_log_key( - self, log_key: Sequence[str], io_type: ComputeIOType - ) -> Sequence[str]: - """For a log key, gets the corresponding file, and splits the file into lines.""" - log_data = self.get_log_data(log_key) - if io_type == ComputeIOType.STDOUT: - raw_logs = log_data.stdout.decode("utf-8") if log_data.stdout else "" - else: - raw_logs = log_data.stderr.decode("utf-8") if log_data.stderr else "" - log_lines = raw_logs.split("\n") - - return log_lines - - def read_log_lines_for_log_key_prefix( - self, log_key_prefix: Sequence[str], cursor: Optional[str], io_type: ComputeIOType - ) -> Tuple[Sequence[str], Optional[LogLineCursor]]: - """For a given directory defined by log_key_prefix that contains files, read the logs from the files - as if they are a single continuous file. Reads env var DAGSTER_CAPTURED_LOG_CHUNK_SIZE lines at a time. - Returns the lines read and the next cursor. - - Note that the has_more_now attribute of the cursor indicates if there are more logs that can be read immediately. - If has_more_now if False, the process producing logs could still be running and dump more logs into the - directory at a later time. - """ - num_lines = int(os.getenv("DAGSTER_CAPTURED_LOG_CHUNK_SIZE", "1000")) - # find all of the log_keys to read from and sort them in the order to be read - log_keys = sorted( - self.get_log_keys_for_log_key_prefix(log_key_prefix, io_type=io_type), - key=lambda x: "/".join(x), - ) - if len(log_keys) == 0: - return [], None - - log_cursor = LogLineCursor.parse(cursor) if cursor else None - if log_cursor is None: - log_key_to_fetch_idx = 0 - line_cursor = 0 - else: - log_key_to_fetch_idx = log_keys.index(log_cursor.log_key) - line_cursor = log_cursor.line - - if line_cursor == -1: - # line_cursor for -1 means the entirety of the file has been read, but the next file - # didn't exist yet. So we see if a new file has been added. - # if the next file doesn't exist yet, return - if log_key_to_fetch_idx + 1 >= len(log_keys): - return [], log_cursor - log_key_to_fetch_idx += 1 - line_cursor = 0 - - log_lines = self._get_log_lines_for_log_key(log_keys[log_key_to_fetch_idx], io_type=io_type) - records = [] - has_more = True - - while len(records) < num_lines: - remaining_log_lines = log_lines[line_cursor:] - remaining_lines_to_fetch = num_lines - len(records) - if remaining_lines_to_fetch < len(remaining_log_lines): - records.extend(remaining_log_lines[:remaining_lines_to_fetch]) - line_cursor += remaining_lines_to_fetch - else: - records.extend(remaining_log_lines) - line_cursor = -1 - - if line_cursor == -1: - # we've read the entirety of the file, update the cursor - if log_key_to_fetch_idx + 1 >= len(log_keys): - # no more files to process - has_more = False - break - log_key_to_fetch_idx += 1 - line_cursor = 0 - if len(records) < num_lines: - # we still need more records, so fetch the next file - log_lines = self._get_log_lines_for_log_key( - log_keys[log_key_to_fetch_idx], io_type=io_type - ) - - new_cursor = LogLineCursor( - log_key=log_keys[log_key_to_fetch_idx], line=line_cursor, has_more_now=has_more - ) - return records, new_cursor diff --git a/python_modules/dagster/dagster/_core/storage/cloud_storage_compute_log_manager.py b/python_modules/dagster/dagster/_core/storage/cloud_storage_compute_log_manager.py index 4f1d3d01c661d..be04ce4a7eeb8 100644 --- a/python_modules/dagster/dagster/_core/storage/cloud_storage_compute_log_manager.py +++ b/python_modules/dagster/dagster/_core/storage/cloud_storage_compute_log_manager.py @@ -8,15 +8,14 @@ from typing import IO, Iterator, Optional, Sequence from dagster._core.instance import T_DagsterInstance -from dagster._core.storage.captured_log_manager import ( +from dagster._core.storage.compute_log_manager import ( CapturedLogContext, CapturedLogData, - CapturedLogManager, CapturedLogMetadata, CapturedLogSubscription, ComputeIOType, + ComputeLogManager, ) -from dagster._core.storage.compute_log_manager import ComputeLogManager from dagster._core.storage.local_compute_log_manager import ( IO_TYPE_EXTENSION, LocalComputeLogManager, @@ -25,7 +24,7 @@ SUBSCRIPTION_POLLING_INTERVAL = 5 -class CloudStorageComputeLogManager(CapturedLogManager, ComputeLogManager[T_DagsterInstance]): +class CloudStorageComputeLogManager(ComputeLogManager[T_DagsterInstance]): """Abstract class that uses the local compute log manager to capture logs and stores them in remote cloud storage. """ diff --git a/python_modules/dagster/dagster/_core/storage/compute_log_manager.py b/python_modules/dagster/dagster/_core/storage/compute_log_manager.py index 2a7b17077c8cd..9a3a8979e8410 100644 --- a/python_modules/dagster/dagster/_core/storage/compute_log_manager.py +++ b/python_modules/dagster/dagster/_core/storage/compute_log_manager.py @@ -1,14 +1,365 @@ -from abc import ABC +import os +from abc import ABC, abstractmethod +from contextlib import contextmanager +from enum import Enum +from typing import IO, Callable, Generator, Iterator, NamedTuple, Optional, Sequence, Tuple +from typing_extensions import Final, Self + +import dagster._check as check +from dagster._core.captured_log_api import LogLineCursor from dagster._core.instance import MayHaveInstanceWeakref, T_DagsterInstance -from .captured_log_manager import ComputeIOType as ComputeIOType +MAX_BYTES_CHUNK_READ: Final = 4194304 # 4 MB + + +class ComputeIOType(Enum): + STDOUT = "stdout" + STDERR = "stderr" + + +class CapturedLogContext( + NamedTuple( + "_CapturedLogContext", + [ + ("log_key", Sequence[str]), + ("external_url", Optional[str]), + ("external_stdout_url", Optional[str]), + ("external_stderr_url", Optional[str]), + ], + ) +): + """Object representing the context in which logs are captured. Can be used by external logging + sidecar implementations to point the Dagster UI to an external url to view compute logs instead of a + Dagster-managed location. + """ + + def __new__( + cls, + log_key: Sequence[str], + external_stdout_url: Optional[str] = None, + external_stderr_url: Optional[str] = None, + external_url: Optional[str] = None, + ): + if external_url and (external_stdout_url or external_stderr_url): + check.failed( + "Cannot specify both `external_url` and one of" + " `external_stdout_url`/`external_stderr_url`" + ) + + return super(CapturedLogContext, cls).__new__( + cls, + log_key, + external_stdout_url=external_stdout_url, + external_stderr_url=external_stderr_url, + external_url=external_url, + ) + + +class CapturedLogData( + NamedTuple( + "_CapturedLogData", + [ + ("log_key", Sequence[str]), + ("stdout", Optional[bytes]), + ("stderr", Optional[bytes]), + ("cursor", Optional[str]), + ], + ) +): + """Object representing captured log data, either a partial chunk of the log data or the full + capture. Contains the raw bytes and optionally the cursor offset for the partial chunk. + """ + + def __new__( + cls, + log_key: Sequence[str], + stdout: Optional[bytes] = None, + stderr: Optional[bytes] = None, + cursor: Optional[str] = None, + ): + return super(CapturedLogData, cls).__new__(cls, log_key, stdout, stderr, cursor) + + +class CapturedLogMetadata( + NamedTuple( + "_CapturedLogMetadata", + [ + ("stdout_location", Optional[str]), + ("stderr_location", Optional[str]), + ("stdout_download_url", Optional[str]), + ("stderr_download_url", Optional[str]), + ], + ) +): + """Object representing metadata info for the captured log data, containing a display string for + the location of the log data and a URL for direct download of the captured log data. + """ + + def __new__( + cls, + stdout_location: Optional[str] = None, + stderr_location: Optional[str] = None, + stdout_download_url: Optional[str] = None, + stderr_download_url: Optional[str] = None, + ): + return super(CapturedLogMetadata, cls).__new__( + cls, + stdout_location=stdout_location, + stderr_location=stderr_location, + stdout_download_url=stdout_download_url, + stderr_download_url=stderr_download_url, + ) + + +class CapturedLogSubscription: + def __init__( + self, + manager: "ComputeLogManager[T_DagsterInstance]", + log_key: Sequence[str], + cursor: Optional[str], + ): + self._manager = manager + self._log_key = log_key + self._cursor = cursor + self._observer: Optional[Callable[[CapturedLogData], None]] = None + self.is_complete = False + + def __call__(self, observer: Optional[Callable[[CapturedLogData], None]]) -> Self: + self._observer = observer + self.fetch() + if self._manager.is_capture_complete(self._log_key): + self.complete() + return self + + @property + def log_key(self) -> Sequence[str]: + return self._log_key + + def dispose(self) -> None: + self._observer = None + self._manager.unsubscribe(self) + + def fetch(self) -> None: + if not self._observer: + return + + should_fetch = True + while should_fetch: + log_data = self._manager.get_log_data( + self._log_key, + self._cursor, + max_bytes=MAX_BYTES_CHUNK_READ, + ) + if not self._cursor or log_data.cursor != self._cursor: + self._observer(log_data) + self._cursor = log_data.cursor + should_fetch = _has_max_data(log_data.stdout) or _has_max_data(log_data.stderr) + + def complete(self) -> None: + self.is_complete = True + + +def _has_max_data(chunk: Optional[bytes]) -> bool: + # function is used as predicate but does not actually return a boolean + return chunk and len(chunk) >= MAX_BYTES_CHUNK_READ # type: ignore class ComputeLogManager(ABC, MayHaveInstanceWeakref[T_DagsterInstance]): - """Abstract base class for storing unstructured compute logs (stdout/stderr) from the compute - steps of pipeline solids. + """Abstract base class for capturing the unstructured logs (stdout/stderr) in the current + process, stored / retrieved with a provided log_key. """ + @abstractmethod + @contextmanager + def capture_logs(self, log_key: Sequence[str]) -> Generator[CapturedLogContext, None, None]: + """Context manager for capturing the stdout/stderr within the current process, and persisting + it under the given log key. + + Args: + log_key (List[String]): The log key identifying the captured logs + """ + + @abstractmethod + @contextmanager + def open_log_stream( + self, log_key: Sequence[str], io_type: ComputeIOType + ) -> Iterator[Optional[IO[bytes]]]: + """Context manager for providing an IO stream that enables the caller to write to a log stream + managed by the captured log manager, to be read later using the given log key. + + Args: + log_key (List[String]): The log key identifying the captured logs + """ + + @abstractmethod + def is_capture_complete(self, log_key: Sequence[str]) -> bool: + """Flag indicating when the log capture for a given log key has completed. + + Args: + log_key (List[String]): The log key identifying the captured logs + + Returns: + Boolean + """ + + @abstractmethod + def get_log_data( + self, + log_key: Sequence[str], + cursor: Optional[str] = None, + max_bytes: Optional[int] = None, + ) -> CapturedLogData: + """Returns a chunk of the captured stdout logs for a given log key. + + Args: + log_key (List[String]): The log key identifying the captured logs + cursor (Optional[str]): A cursor representing the position of the log chunk to fetch + max_bytes (Optional[int]): A limit on the size of the log chunk to fetch + + Returns: + CapturedLogData + """ + + @abstractmethod + def get_log_metadata(self, log_key: Sequence[str]) -> CapturedLogMetadata: + """Returns the metadata of the captured logs for a given log key, including + displayable information on where the logs are persisted. + + Args: + log_key (List[String]): The log key identifying the captured logs + + Returns: + CapturedLogMetadata + """ + + @abstractmethod + def delete_logs( + self, log_key: Optional[Sequence[str]] = None, prefix: Optional[Sequence[str]] = None + ) -> None: + """Deletes the captured logs for a given log key. + + Args: + log_key(Optional[List[String]]): The log key of the logs to delete + prefix(Optional[List[String]]): The prefix of the log keys to delete + """ + + @abstractmethod + def subscribe( + self, log_key: Sequence[str], cursor: Optional[str] = None + ) -> CapturedLogSubscription: + """Registers an observable object for log data. + + Args: + log_key (List[String]): The log key identifying the captured logs + cursor (Optional[String]): The string cursor marking the position within the log stream + Returns: + CapturedLogSubscription + """ + + def unsubscribe(self, subscription: CapturedLogSubscription) -> None: + """Deregisters an observable object from receiving log updates. + + Args: + subscription (CapturedLogSubscription): subscription object which manages when to send + back data to the subscriber + """ + pass + def dispose(self): pass + + def build_log_key_for_run(self, run_id: str, step_key: str) -> Sequence[str]: + """Legacy adapter to translate run_id/key to captured log manager-based log_key.""" + return [run_id, "compute_logs", step_key] + + def get_log_keys_for_log_key_prefix( + self, log_key_prefix: Sequence[str], io_type: ComputeIOType + ) -> Sequence[Sequence[str]]: + """Returns the logs keys for a given log key prefix. This is determined by looking at the + directory defined by the log key prefix and creating a log_key for each file in the directory. + """ + raise NotImplementedError("Must implement get_log_keys_for_log_key_prefix") + + def _get_log_lines_for_log_key( + self, log_key: Sequence[str], io_type: ComputeIOType + ) -> Sequence[str]: + """For a log key, gets the corresponding file, and splits the file into lines.""" + log_data = self.get_log_data(log_key) + if io_type == ComputeIOType.STDOUT: + raw_logs = log_data.stdout.decode("utf-8") if log_data.stdout else "" + else: + raw_logs = log_data.stderr.decode("utf-8") if log_data.stderr else "" + log_lines = raw_logs.split("\n") + + return log_lines + + def read_log_lines_for_log_key_prefix( + self, log_key_prefix: Sequence[str], cursor: Optional[str], io_type: ComputeIOType + ) -> Tuple[Sequence[str], Optional[LogLineCursor]]: + """For a given directory defined by log_key_prefix that contains files, read the logs from the files + as if they are a single continuous file. Reads env var DAGSTER_CAPTURED_LOG_CHUNK_SIZE lines at a time. + Returns the lines read and the next cursor. + + Note that the has_more_now attribute of the cursor indicates if there are more logs that can be read immediately. + If has_more_now if False, the process producing logs could still be running and dump more logs into the + directory at a later time. + """ + num_lines = int(os.getenv("DAGSTER_CAPTURED_LOG_CHUNK_SIZE", "1000")) + # find all of the log_keys to read from and sort them in the order to be read + log_keys = sorted( + self.get_log_keys_for_log_key_prefix(log_key_prefix, io_type=io_type), + key=lambda x: "/".join(x), + ) + if len(log_keys) == 0: + return [], None + + log_cursor = LogLineCursor.parse(cursor) if cursor else None + if log_cursor is None: + log_key_to_fetch_idx = 0 + line_cursor = 0 + else: + log_key_to_fetch_idx = log_keys.index(log_cursor.log_key) + line_cursor = log_cursor.line + + if line_cursor == -1: + # line_cursor for -1 means the entirety of the file has been read, but the next file + # didn't exist yet. So we see if a new file has been added. + # if the next file doesn't exist yet, return + if log_key_to_fetch_idx + 1 >= len(log_keys): + return [], log_cursor + log_key_to_fetch_idx += 1 + line_cursor = 0 + + log_lines = self._get_log_lines_for_log_key(log_keys[log_key_to_fetch_idx], io_type=io_type) + records = [] + has_more = True + + while len(records) < num_lines: + remaining_log_lines = log_lines[line_cursor:] + remaining_lines_to_fetch = num_lines - len(records) + if remaining_lines_to_fetch < len(remaining_log_lines): + records.extend(remaining_log_lines[:remaining_lines_to_fetch]) + line_cursor += remaining_lines_to_fetch + else: + records.extend(remaining_log_lines) + line_cursor = -1 + + if line_cursor == -1: + # we've read the entirety of the file, update the cursor + if log_key_to_fetch_idx + 1 >= len(log_keys): + # no more files to process + has_more = False + break + log_key_to_fetch_idx += 1 + line_cursor = 0 + if len(records) < num_lines: + # we still need more records, so fetch the next file + log_lines = self._get_log_lines_for_log_key( + log_keys[log_key_to_fetch_idx], io_type=io_type + ) + + new_cursor = LogLineCursor( + log_key=log_keys[log_key_to_fetch_idx], line=line_cursor, has_more_now=has_more + ) + return records, new_cursor diff --git a/python_modules/dagster/dagster/_core/storage/local_compute_log_manager.py b/python_modules/dagster/dagster/_core/storage/local_compute_log_manager.py index c9aab7a4d01e4..761dd7a92d819 100644 --- a/python_modules/dagster/dagster/_core/storage/local_compute_log_manager.py +++ b/python_modules/dagster/dagster/_core/storage/local_compute_log_manager.py @@ -23,15 +23,14 @@ from dagster._utils import ensure_dir, ensure_file, touch_file from dagster._utils.security import non_secure_md5_hash_str -from .captured_log_manager import ( +from .compute_log_manager import ( CapturedLogContext, CapturedLogData, - CapturedLogManager, CapturedLogMetadata, CapturedLogSubscription, ComputeIOType, + ComputeLogManager, ) -from .compute_log_manager import ComputeLogManager DEFAULT_WATCHDOG_POLLING_TIMEOUT: Final = 2.5 @@ -43,7 +42,7 @@ MAX_FILENAME_LENGTH: Final = 255 -class LocalComputeLogManager(CapturedLogManager, ComputeLogManager, ConfigurableClass): +class LocalComputeLogManager(ComputeLogManager, ConfigurableClass): """Stores copies of stdout & stderr for each compute step locally on disk.""" def __init__( diff --git a/python_modules/dagster/dagster/_core/storage/noop_compute_log_manager.py b/python_modules/dagster/dagster/_core/storage/noop_compute_log_manager.py index 173857583e13d..5903356a70c6f 100644 --- a/python_modules/dagster/dagster/_core/storage/noop_compute_log_manager.py +++ b/python_modules/dagster/dagster/_core/storage/noop_compute_log_manager.py @@ -4,19 +4,18 @@ from typing_extensions import Self import dagster._check as check -from dagster._core.storage.captured_log_manager import ( +from dagster._core.storage.compute_log_manager import ( CapturedLogContext, CapturedLogData, - CapturedLogManager, CapturedLogMetadata, CapturedLogSubscription, ComputeIOType, + ComputeLogManager, ) -from dagster._core.storage.compute_log_manager import ComputeLogManager from dagster._serdes import ConfigurableClass, ConfigurableClassData -class NoOpComputeLogManager(CapturedLogManager, ComputeLogManager, ConfigurableClass): +class NoOpComputeLogManager(ComputeLogManager, ConfigurableClass): """When enabled for a Dagster instance, stdout and stderr will not be available for any step.""" def __init__(self, inst_data: Optional[ConfigurableClassData] = None): diff --git a/python_modules/dagster/dagster_tests/daemon_sensor_tests/test_run_status_sensors.py b/python_modules/dagster/dagster_tests/daemon_sensor_tests/test_run_status_sensors.py index 6be4ab2c593f9..9fed514cf7a2f 100644 --- a/python_modules/dagster/dagster_tests/daemon_sensor_tests/test_run_status_sensors.py +++ b/python_modules/dagster/dagster_tests/daemon_sensor_tests/test_run_status_sensors.py @@ -19,7 +19,6 @@ from dagster._core.log_manager import LOG_RECORD_METADATA_ATTR from dagster._core.remote_representation import CodeLocation, ExternalRepository from dagster._core.scheduler.instigation import SensorInstigatorData, TickStatus -from dagster._core.storage.captured_log_manager import CapturedLogManager from dagster._core.test_utils import ( create_test_daemon_workspace_context, environ, @@ -1747,5 +1746,4 @@ def test_logging_run_status_sensor( assert records record = records[0] assert record[LOG_RECORD_METADATA_ATTR]["orig_message"] == f"run succeeded: {run.run_id}" - assert isinstance(instance.compute_log_manager, CapturedLogManager) instance.compute_log_manager.delete_logs(log_key=tick.log_key) diff --git a/python_modules/dagster/dagster_tests/daemon_sensor_tests/test_sensor_run.py b/python_modules/dagster/dagster_tests/daemon_sensor_tests/test_sensor_run.py index cdbc51515353a..884a2eae00633 100644 --- a/python_modules/dagster/dagster_tests/daemon_sensor_tests/test_sensor_run.py +++ b/python_modules/dagster/dagster_tests/daemon_sensor_tests/test_sensor_run.py @@ -79,7 +79,6 @@ SensorInstigatorData, TickStatus, ) -from dagster._core.storage.captured_log_manager import CapturedLogManager from dagster._core.test_utils import ( BlockingThreadPoolExecutor, create_test_daemon_workspace_context, @@ -3055,7 +3054,6 @@ def test_sensor_logging_on_tick_failure( assert len(records) == 1 assert records[0][LOG_RECORD_METADATA_ATTR]["orig_message"] == "hello hello" - assert isinstance(instance.compute_log_manager, CapturedLogManager) instance.compute_log_manager.delete_logs(log_key=tick.log_key) diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index cd1823910ca86..171ae60134a75 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -56,7 +56,7 @@ InProcessCodeLocationOrigin, RemoteRepositoryOrigin, ) -from dagster._core.storage.captured_log_manager import CapturedLogManager, ComputeIOType +from dagster._core.storage.compute_log_manager import ComputeIOType from dagster._core.storage.dagster_run import ( IN_PROGRESS_RUN_STATUSES, DagsterRun, @@ -2639,8 +2639,6 @@ def override_backfill_storage_setting(self): cm = instance.compute_log_manager - assert isinstance(cm, CapturedLogManager) - logs, cursor = cm.read_log_lines_for_log_key_prefix( ["backfill", backfill.backfill_id], cursor=None, io_type=ComputeIOType.STDERR ) diff --git a/python_modules/dagster/dagster_tests/execution_tests/engine_tests/test_multiprocessing.py b/python_modules/dagster/dagster_tests/execution_tests/engine_tests/test_multiprocessing.py index 60d57a9117344..38f060af391fd 100644 --- a/python_modules/dagster/dagster_tests/execution_tests/engine_tests/test_multiprocessing.py +++ b/python_modules/dagster/dagster_tests/execution_tests/engine_tests/test_multiprocessing.py @@ -24,7 +24,6 @@ from dagster._core.execution import execution_result from dagster._core.execution.api import execute_job from dagster._core.instance import DagsterInstance -from dagster._core.storage.captured_log_manager import CapturedLogManager from dagster._core.storage.mem_io_manager import mem_io_manager from dagster._core.test_utils import instance_for_test from dagster._utils import safe_tempfile_path, segfault @@ -424,7 +423,6 @@ def test_crash_multiprocessing(): if event.event_type == DagsterEventType.LOGS_CAPTURED ] event = capture_events[0] - assert isinstance(instance.compute_log_manager, CapturedLogManager) log_key = instance.compute_log_manager.build_log_key_for_run( result.run_id, event.logs_captured_data.file_key ) diff --git a/python_modules/dagster/dagster_tests/logging_tests/test_stdout.py b/python_modules/dagster/dagster_tests/logging_tests/test_stdout.py index 94d1453ac7f7e..1437d145cb004 100644 --- a/python_modules/dagster/dagster_tests/logging_tests/test_stdout.py +++ b/python_modules/dagster/dagster_tests/logging_tests/test_stdout.py @@ -15,7 +15,7 @@ from dagster._core.execution.compute_logs import should_disable_io_stream_redirect from dagster._core.instance import DagsterInstance from dagster._core.instance.ref import InstanceRef -from dagster._core.storage.captured_log_manager import CapturedLogManager, ComputeIOType +from dagster._core.storage.compute_log_manager import ComputeIOType from dagster._core.storage.dagster_run import DagsterRun from dagster._core.storage.local_compute_log_manager import ( IO_TYPE_EXTENSION, @@ -121,10 +121,9 @@ def test_compute_log_to_disk_multiprocess(): @pytest.mark.skipif( should_disable_io_stream_redirect(), reason="compute logs disabled for win / py3.6+" ) -def test_captured_log_manager(): +def test_compute_log_manager(): with instance_for_test() as instance: manager = instance.compute_log_manager - assert isinstance(manager, CapturedLogManager) spew_job = define_job() result = spew_job.execute_in_process(instance=instance) @@ -249,7 +248,6 @@ def execute_inner(step_key: str, dagster_run: DagsterRun, instance_ref: Instance def inner_step(instance: DagsterInstance, dagster_run: DagsterRun, step_key: str) -> None: - assert isinstance(instance.compute_log_manager, CapturedLogManager) log_key = [dagster_run.run_id, "compute_logs", step_key] with instance.compute_log_manager.capture_logs(log_key): time.sleep(0.1) diff --git a/python_modules/dagster/dagster_tests/storage_tests/test_captured_log_manager.py b/python_modules/dagster/dagster_tests/storage_tests/test_captured_log_manager.py index 2b82898f27c60..2b02df51b9711 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/test_captured_log_manager.py +++ b/python_modules/dagster/dagster_tests/storage_tests/test_captured_log_manager.py @@ -7,7 +7,7 @@ import pytest from dagster import job, op from dagster._core.events import DagsterEventType -from dagster._core.storage.captured_log_manager import CapturedLogContext, ComputeIOType +from dagster._core.storage.compute_log_manager import CapturedLogContext, ComputeIOType from dagster._core.storage.local_compute_log_manager import LocalComputeLogManager from dagster._core.storage.noop_compute_log_manager import NoOpComputeLogManager from dagster._core.test_utils import instance_for_test @@ -15,7 +15,7 @@ from dagster._time import get_current_datetime from typing_extensions import Self -from .utils.captured_log_manager import TestCapturedLogManager +from .utils.compute_log_manager import TestComputeLogManager def test_compute_log_manager_instance(): @@ -24,11 +24,11 @@ def test_compute_log_manager_instance(): assert instance.compute_log_manager._instance # noqa: SLF001 -class TestLocalCapturedLogManager(TestCapturedLogManager): +class TestLocalComputeLogManager(TestComputeLogManager): __test__ = True - @pytest.fixture(name="captured_log_manager") - def captured_log_manager(self): + @pytest.fixture(name="compute_log_manager") + def compute_log_manager(self): with tempfile.TemporaryDirectory() as tmpdir_path: return LocalComputeLogManager(tmpdir_path) @@ -56,7 +56,7 @@ def capture_logs(self, log_key: Sequence[str]) -> Generator[CapturedLogContext, ) -def test_external_captured_log_manager(): +def test_external_compute_log_manager(): @op def my_op(): print("hello out") # noqa: T201 @@ -69,7 +69,7 @@ def my_job(): with instance_for_test( overrides={ "compute_logs": { - "module": "dagster_tests.storage_tests.test_captured_log_manager", + "module": "dagster_tests.storage_tests.test_compute_log_manager", "class": "ExternalTestComputeLogManager", }, }, diff --git a/python_modules/dagster/dagster_tests/storage_tests/test_compute_log_manager.py b/python_modules/dagster/dagster_tests/storage_tests/test_compute_log_manager.py index a416d9e93424b..53c878db2d70b 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/test_compute_log_manager.py +++ b/python_modules/dagster/dagster_tests/storage_tests/test_compute_log_manager.py @@ -7,28 +7,23 @@ from dagster._core.instance import DagsterInstance, InstanceRef, InstanceType from dagster._core.launcher import DefaultRunLauncher from dagster._core.run_coordinator import DefaultRunCoordinator -from dagster._core.storage.captured_log_manager import ( +from dagster._core.storage.compute_log_manager import ( CapturedLogContext, CapturedLogData, - CapturedLogManager, CapturedLogMetadata, CapturedLogSubscription, ComputeIOType, + ComputeLogManager, ) -from dagster._core.storage.compute_log_manager import ComputeLogManager from dagster._core.storage.event_log import SqliteEventLogStorage from dagster._core.storage.root import LocalArtifactStorage from dagster._core.storage.runs import SqliteRunStorage from dagster._core.test_utils import environ, instance_for_test +from .utils.compute_log_manager import TestComputeLogManager -def test_compute_log_manager_instance(): - with instance_for_test() as instance: - assert instance.compute_log_manager - assert instance.compute_log_manager._instance # noqa: SLF001 - -class BrokenCapturedLogManager(CapturedLogManager, ComputeLogManager): +class BrokenComputeLogManager(ComputeLogManager): def __init__(self, fail_on_setup=False, fail_on_teardown=False): self._fail_on_setup = check.opt_bool_param(fail_on_setup, "fail_on_setup") self._fail_on_teardown = check.opt_bool_param(fail_on_teardown, "fail_on_teardown") @@ -76,7 +71,7 @@ def unsubscribe(self, subscription: CapturedLogSubscription): @contextmanager -def broken_captured_log_manager_instance(fail_on_setup=False, fail_on_teardown=False): +def broken_compute_log_manager_instance(fail_on_setup=False, fail_on_teardown=False): with tempfile.TemporaryDirectory() as temp_dir: with environ({"DAGSTER_HOME": temp_dir}): yield DagsterInstance( @@ -84,7 +79,7 @@ def broken_captured_log_manager_instance(fail_on_setup=False, fail_on_teardown=F local_artifact_storage=LocalArtifactStorage(temp_dir), run_storage=SqliteRunStorage.from_local(temp_dir), event_storage=SqliteEventLogStorage(temp_dir), - compute_log_manager=BrokenCapturedLogManager( + compute_log_manager=BrokenComputeLogManager( fail_on_setup=fail_on_setup, fail_on_teardown=fail_on_teardown ), run_coordinator=DefaultRunCoordinator(), @@ -136,7 +131,7 @@ def boo_job(): def test_broken_compute_log_manager(): - with broken_captured_log_manager_instance(fail_on_setup=True) as instance: + with broken_compute_log_manager_instance(fail_on_setup=True) as instance: yay_result = yay_job.execute_in_process(instance=instance) assert yay_result.success assert _has_setup_exception(yay_result) @@ -145,7 +140,7 @@ def test_broken_compute_log_manager(): assert not boo_result.success assert _has_setup_exception(boo_result) - with broken_captured_log_manager_instance(fail_on_teardown=True) as instance: + with broken_compute_log_manager_instance(fail_on_teardown=True) as instance: yay_result = yay_job.execute_in_process(instance=instance) assert yay_result.success assert _has_teardown_exception(yay_result) @@ -155,7 +150,7 @@ def test_broken_compute_log_manager(): assert not boo_result.success assert _has_teardown_exception(boo_result) - with broken_captured_log_manager_instance() as instance: + with broken_compute_log_manager_instance() as instance: yay_result = yay_job.execute_in_process(instance=instance) assert yay_result.success assert not _has_setup_exception(yay_result) @@ -165,3 +160,205 @@ def test_broken_compute_log_manager(): assert not boo_result.success assert not _has_setup_exception(boo_result) assert not _has_teardown_exception(boo_result) + + +import os +import sys +from contextlib import contextmanager +from typing import Any, Generator, Mapping, Sequence + +import pytest +from dagster import job, op +from dagster._core.events import DagsterEventType +from dagster._core.storage.compute_log_manager import CapturedLogContext, ComputeIOType +from dagster._core.storage.local_compute_log_manager import LocalComputeLogManager +from dagster._core.storage.noop_compute_log_manager import NoOpComputeLogManager +from dagster._serdes import ConfigurableClassData +from dagster._time import get_current_datetime +from typing_extensions import Self + + +def test_compute_log_manager_instance(): + with instance_for_test() as instance: + assert instance.compute_log_manager + assert instance.compute_log_manager._instance # noqa: SLF001 + + +class TestLocalComputeLogManager(TestComputeLogManager): + __test__ = True + + @pytest.fixture(name="compute_log_manager") + def compute_log_manager(self): + with tempfile.TemporaryDirectory() as tmpdir_path: + return LocalComputeLogManager(tmpdir_path) + + +class ExternalTestComputeLogManager(NoOpComputeLogManager): + """Test compute log manager that does not actually capture logs, but generates an external url + to be shown within the Dagster UI. + """ + + @classmethod + def from_config_value( + cls, inst_data: ConfigurableClassData, config_value: Mapping[str, Any] + ) -> Self: + return cls(inst_data=inst_data, **config_value) + + def enabled(self, _dagster_run, _step_key): + return True + + @contextmanager + def capture_logs(self, log_key: Sequence[str]) -> Generator[CapturedLogContext, None, None]: + yield CapturedLogContext( + log_key=log_key, + external_stdout_url="https://fake.com/stdout", + external_stderr_url="https://fake.com/stderr", + ) + + +def test_external_compute_log_manager(): + @op + def my_op(): + print("hello out") # noqa: T201 + print("hello error", file=sys.stderr) # noqa: T201 + + @job + def my_job(): + my_op() + + with instance_for_test( + overrides={ + "compute_logs": { + "module": "dagster_tests.storage_tests.test_compute_log_manager", + "class": "ExternalTestComputeLogManager", + }, + }, + ) as instance: + result = my_job.execute_in_process(instance=instance) + assert result.success + assert result.run_id + captured_log_entries = instance.all_logs( + result.run_id, of_type=DagsterEventType.LOGS_CAPTURED + ) + assert len(captured_log_entries) == 1 + entry = captured_log_entries[0] + assert ( + entry.dagster_event.logs_captured_data.external_stdout_url == "https://fake.com/stdout" + ) + assert ( + entry.dagster_event.logs_captured_data.external_stderr_url == "https://fake.com/stderr" + ) + + +def test_get_log_keys_for_log_key_prefix(): + with tempfile.TemporaryDirectory() as tmpdir_path: + cm = LocalComputeLogManager(tmpdir_path) + evaluation_time = get_current_datetime() + log_key_prefix = ["test_log_bucket", evaluation_time.strftime("%Y%m%d_%H%M%S")] + + def write_log_file(file_id: int): + full_log_key = [*log_key_prefix, f"{file_id}"] + with cm.open_log_stream(full_log_key, ComputeIOType.STDERR) as f: + f.write("foo") + + for i in range(4): + write_log_file(i) + + log_keys = cm.get_log_keys_for_log_key_prefix(log_key_prefix, io_type=ComputeIOType.STDERR) + assert sorted(log_keys) == [ + [*log_key_prefix, "0"], + [*log_key_prefix, "1"], + [*log_key_prefix, "2"], + [*log_key_prefix, "3"], + ] + + +def test_read_log_lines_for_log_key_prefix(): + """Tests that we can read a sequence of files in a bucket as if they are a single file.""" + with tempfile.TemporaryDirectory() as tmpdir_path: + cm = LocalComputeLogManager(tmpdir_path) + evaluation_time = get_current_datetime() + log_key_prefix = ["test_log_bucket", evaluation_time.strftime("%Y%m%d_%H%M%S")] + + all_logs = [] + + def write_log_file(file_id: int): + full_log_key = [*log_key_prefix, f"{file_id}"] + with cm.open_log_stream(full_log_key, ComputeIOType.STDERR) as f: + num_lines = 10 + for j in range(num_lines): + msg = f"file: {file_id}, line: {j}" + all_logs.append(msg) + f.write(msg) + if j < num_lines - 1: + f.write("\n") + + for i in range(4): + write_log_file(i) + + all_logs_iter = iter(all_logs) + + os.environ["DAGSTER_CAPTURED_LOG_CHUNK_SIZE"] = "10" + # read the entirety of the first file + log_lines, cursor = cm.read_log_lines_for_log_key_prefix( + log_key_prefix, cursor=None, io_type=ComputeIOType.STDERR + ) + assert len(log_lines) == 10 + assert cursor.has_more_now + assert cursor.log_key == [*log_key_prefix, "1"] + assert cursor.line == 0 + for ll in log_lines: + assert ll == next(all_logs_iter) + + # read half of the next log file + os.environ["DAGSTER_CAPTURED_LOG_CHUNK_SIZE"] = "5" + log_lines, cursor = cm.read_log_lines_for_log_key_prefix( + log_key_prefix, cursor=cursor.to_string(), io_type=ComputeIOType.STDERR + ) + assert len(log_lines) == 5 + assert cursor.has_more_now + assert cursor.log_key == [*log_key_prefix, "1"] + assert cursor.line == 5 + for ll in log_lines: + assert ll == next(all_logs_iter) + + # read the next ten lines, five will be in the second file, five will be in the third + os.environ["DAGSTER_CAPTURED_LOG_CHUNK_SIZE"] = "10" + log_lines, cursor = cm.read_log_lines_for_log_key_prefix( + log_key_prefix, cursor=cursor.to_string(), io_type=ComputeIOType.STDERR + ) + assert len(log_lines) == 10 + assert cursor.has_more_now + assert cursor.log_key == [*log_key_prefix, "2"] + assert cursor.line == 5 + for ll in log_lines: + assert ll == next(all_logs_iter) + + # read the remaining 15 lines, but request 20 + os.environ["DAGSTER_CAPTURED_LOG_CHUNK_SIZE"] = "20" + log_lines, cursor = cm.read_log_lines_for_log_key_prefix( + log_key_prefix, cursor=cursor.to_string(), io_type=ComputeIOType.STDERR + ) + assert len(log_lines) == 15 + assert not cursor.has_more_now + assert cursor.log_key == [*log_key_prefix, "3"] + # processed up to the end of the file, but there is not another file to process so cursor should be -1 + assert cursor.line == -1 + for ll in log_lines: + assert ll == next(all_logs_iter) + + # write a final log file + + write_log_file(4) + + os.environ["DAGSTER_CAPTURED_LOG_CHUNK_SIZE"] = "15" + log_lines, cursor = cm.read_log_lines_for_log_key_prefix( + log_key_prefix, cursor=cursor.to_string(), io_type=ComputeIOType.STDERR + ) + assert len(log_lines) == 10 + assert not cursor.has_more_now + assert cursor.log_key == [*log_key_prefix, "4"] + # processed up to the end of the file, but there is not another file to process so cursor should be -1 + assert cursor.line == -1 + for ll in log_lines: + assert ll == next(all_logs_iter) diff --git a/python_modules/dagster/dagster_tests/storage_tests/utils/captured_log_manager.py b/python_modules/dagster/dagster_tests/storage_tests/utils/compute_log_manager.py similarity index 70% rename from python_modules/dagster/dagster_tests/storage_tests/utils/captured_log_manager.py rename to python_modules/dagster/dagster_tests/storage_tests/utils/compute_log_manager.py index 0ceded16131a4..9066ee4428b56 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/utils/captured_log_manager.py +++ b/python_modules/dagster/dagster_tests/storage_tests/utils/compute_log_manager.py @@ -5,31 +5,31 @@ import pytest from dagster._core.execution.compute_logs import should_disable_io_stream_redirect -from dagster._core.storage.captured_log_manager import ComputeIOType +from dagster._core.storage.compute_log_manager import ComputeIOType from dagster._time import get_current_datetime -class TestCapturedLogManager: +class TestComputeLogManager: """You can extend this class to easily run these set of tests on any compute log manager. When extending, you simply need to override the `compute_log_manager` fixture and return your - implementation of `CapturedLogManager`. + implementation of `ComputeLogManager`. For example: ``` - class TestMyComputeLogManagerImplementation(TestCapturedLogManager): + class TestMyComputeLogManagerImplementation(TestComputeLogManager): __test__ = True - @pytest.fixture(scope='function', name='captured_log_manager') - def captured_log_manager(self): - return MyCapturedLogManagerImplementation() + @pytest.fixture(scope='function', name='compute_log_manager') + def compute_log_manager(self): + return MyComputeLogManagerImplementation() ``` """ __test__ = False - @pytest.fixture(name="captured_log_manager") - def captured_log_manager(self): + @pytest.fixture(name="compute_log_manager") + def compute_log_manager(self): yield @pytest.fixture(name="write_manager") @@ -43,24 +43,24 @@ def read_manager(self): @pytest.mark.skipif( should_disable_io_stream_redirect(), reason="compute logs disabled for win / py3.6+" ) - def test_capture(self, captured_log_manager): + def test_capture(self, compute_log_manager): now = get_current_datetime() log_key = ["arbitrary", "log", "key", now.strftime("%Y_%m_%d__%H_%M_%S")] - with captured_log_manager.capture_logs(log_key) as context: + with compute_log_manager.capture_logs(log_key) as context: print("HELLO WORLD") # noqa: T201 print("HELLO ERROR", file=sys.stderr) # noqa: T201 - assert not captured_log_manager.is_capture_complete(log_key) + assert not compute_log_manager.is_capture_complete(log_key) assert context.log_key == log_key - assert captured_log_manager.is_capture_complete(log_key) + assert compute_log_manager.is_capture_complete(log_key) - log_data = captured_log_manager.get_log_data(log_key) + log_data = compute_log_manager.get_log_data(log_key) assert log_data.stdout == b"HELLO WORLD\n" assert log_data.stderr == b"HELLO ERROR\n" assert log_data.cursor - log_metadata = captured_log_manager.get_log_metadata(log_key) + log_metadata = compute_log_manager.get_log_metadata(log_key) assert log_metadata.stdout_location assert log_metadata.stderr_location assert log_metadata.stdout_download_url @@ -69,23 +69,23 @@ def test_capture(self, captured_log_manager): @pytest.mark.skipif( should_disable_io_stream_redirect(), reason="compute logs disabled for win / py3.6+" ) - def test_long_key(self, captured_log_manager): + def test_long_key(self, compute_log_manager): log_key = ["".join(random.choice(string.ascii_lowercase) for x in range(300))] - with captured_log_manager.capture_logs(log_key) as context: + with compute_log_manager.capture_logs(log_key) as context: print("HELLO WORLD") # noqa: T201 print("HELLO ERROR", file=sys.stderr) # noqa: T201 - assert not captured_log_manager.is_capture_complete(log_key) + assert not compute_log_manager.is_capture_complete(log_key) assert context.log_key == log_key - assert captured_log_manager.is_capture_complete(log_key) + assert compute_log_manager.is_capture_complete(log_key) - log_data = captured_log_manager.get_log_data(log_key) + log_data = compute_log_manager.get_log_data(log_key) assert log_data.stdout == b"HELLO WORLD\n" assert log_data.stderr == b"HELLO ERROR\n" assert log_data.cursor - log_metadata = captured_log_manager.get_log_metadata(log_key) + log_metadata = compute_log_manager.get_log_metadata(log_key) assert log_metadata.stdout_location assert log_metadata.stderr_location assert log_metadata.stdout_download_url @@ -154,53 +154,53 @@ def test_complete_checks(self, write_manager, read_manager): assert write_manager.is_capture_complete(log_key) assert read_manager.is_capture_complete(log_key) - def test_log_stream(self, captured_log_manager): + def test_log_stream(self, compute_log_manager): log_key = ["some", "log", "key"] - with captured_log_manager.open_log_stream(log_key, ComputeIOType.STDOUT) as write_stream: + with compute_log_manager.open_log_stream(log_key, ComputeIOType.STDOUT) as write_stream: write_stream.write("hello hello") - log_data = captured_log_manager.get_log_data(log_key) + log_data = compute_log_manager.get_log_data(log_key) assert log_data.stdout == b"hello hello" - def test_delete_logs(self, captured_log_manager): + def test_delete_logs(self, compute_log_manager): log_key = ["some", "log", "key"] other_log_key = ["other", "log", "key"] - with captured_log_manager.open_log_stream(log_key, ComputeIOType.STDOUT) as write_stream: + with compute_log_manager.open_log_stream(log_key, ComputeIOType.STDOUT) as write_stream: write_stream.write("hello hello") - with captured_log_manager.open_log_stream( + with compute_log_manager.open_log_stream( other_log_key, ComputeIOType.STDOUT ) as write_stream: write_stream.write("hello hello") - log_data = captured_log_manager.get_log_data(log_key) + log_data = compute_log_manager.get_log_data(log_key) assert log_data.stdout == b"hello hello" - other_log_data = captured_log_manager.get_log_data(other_log_key) + other_log_data = compute_log_manager.get_log_data(other_log_key) assert other_log_data.stdout == b"hello hello" - captured_log_manager.delete_logs(log_key=log_key) + compute_log_manager.delete_logs(log_key=log_key) - log_data = captured_log_manager.get_log_data(log_key) + log_data = compute_log_manager.get_log_data(log_key) assert log_data.stdout is None - other_log_data = captured_log_manager.get_log_data(other_log_key) + other_log_data = compute_log_manager.get_log_data(other_log_key) assert other_log_data.stdout == b"hello hello" - def test_delete_log_prefix(self, captured_log_manager): + def test_delete_log_prefix(self, compute_log_manager): log_key = ["some", "log", "key"] other_log_key = ["some", "log", "other_key"] - with captured_log_manager.open_log_stream(log_key, ComputeIOType.STDOUT) as write_stream: + with compute_log_manager.open_log_stream(log_key, ComputeIOType.STDOUT) as write_stream: write_stream.write("hello hello") - with captured_log_manager.open_log_stream( + with compute_log_manager.open_log_stream( other_log_key, ComputeIOType.STDOUT ) as write_stream: write_stream.write("hello hello") - log_data = captured_log_manager.get_log_data(log_key) + log_data = compute_log_manager.get_log_data(log_key) assert log_data.stdout == b"hello hello" - other_log_data = captured_log_manager.get_log_data(other_log_key) + other_log_data = compute_log_manager.get_log_data(other_log_key) assert other_log_data.stdout == b"hello hello" - captured_log_manager.delete_logs(prefix=["some", "log"]) + compute_log_manager.delete_logs(prefix=["some", "log"]) - log_data = captured_log_manager.get_log_data(log_key) + log_data = compute_log_manager.get_log_data(log_key) assert log_data.stdout is None - other_log_data = captured_log_manager.get_log_data(other_log_key) + other_log_data = compute_log_manager.get_log_data(other_log_key) assert other_log_data.stdout is None diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_tags.py b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_tags.py index f330e3594fd92..e6e2cd7ad3b22 100644 --- a/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_tags.py +++ b/python_modules/libraries/dagster-airflow/dagster_airflow_tests/test_dagster_pipeline_factory/test_tags.py @@ -7,7 +7,7 @@ from airflow.utils.dates import days_ago from dagster import DagsterEventType from dagster._core.instance import AIRFLOW_EXECUTION_DATE_STR -from dagster._core.storage.captured_log_manager import ComputeIOType +from dagster._core.storage.compute_log_manager import ComputeIOType from dagster._core.storage.local_compute_log_manager import ( IO_TYPE_EXTENSION, LocalComputeLogManager, diff --git a/python_modules/libraries/dagster-aws/dagster_aws/s3/compute_log_manager.py b/python_modules/libraries/dagster-aws/dagster_aws/s3/compute_log_manager.py index 7608e0f938964..dc43278a370b0 100644 --- a/python_modules/libraries/dagster-aws/dagster_aws/s3/compute_log_manager.py +++ b/python_modules/libraries/dagster-aws/dagster_aws/s3/compute_log_manager.py @@ -12,12 +12,11 @@ _check as check, ) from dagster._config.config_type import Noneable -from dagster._core.storage.captured_log_manager import CapturedLogContext from dagster._core.storage.cloud_storage_compute_log_manager import ( CloudStorageComputeLogManager, PollingComputeLogSubscriptionManager, ) -from dagster._core.storage.compute_log_manager import ComputeIOType +from dagster._core.storage.compute_log_manager import CapturedLogContext, ComputeIOType from dagster._core.storage.local_compute_log_manager import ( IO_TYPE_EXTENSION, LocalComputeLogManager, diff --git a/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_compute_log_manager.py b/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_compute_log_manager.py index 8be9056d6f4a7..835092d7c715c 100644 --- a/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_compute_log_manager.py +++ b/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_compute_log_manager.py @@ -8,7 +8,7 @@ from dagster._core.instance import DagsterInstance, InstanceRef, InstanceType from dagster._core.launcher import DefaultRunLauncher from dagster._core.run_coordinator import DefaultRunCoordinator -from dagster._core.storage.captured_log_manager import ComputeIOType +from dagster._core.storage.compute_log_manager import ComputeIOType from dagster._core.storage.event_log import SqliteEventLogStorage from dagster._core.storage.local_compute_log_manager import IO_TYPE_EXTENSION from dagster._core.storage.root import LocalArtifactStorage @@ -18,7 +18,7 @@ from dagster_aws.s3 import S3ComputeLogManager ensure_dagster_tests_import() -from dagster_tests.storage_tests.test_captured_log_manager import TestCapturedLogManager +from dagster_tests.storage_tests.test_compute_log_manager import TestComputeLogManager HELLO_WORLD = "Hello World" SEPARATOR = os.linesep if (os.name == "nt" and sys.version_info < (3,)) else "\n" @@ -244,11 +244,11 @@ def write_log_file(file_id: int, io_type: ComputeIOType): ] -class TestS3ComputeLogManager(TestCapturedLogManager): +class TestS3ComputeLogManager(TestComputeLogManager): __test__ = True - @pytest.fixture(name="captured_log_manager") - def captured_log_manager(self, mock_s3_bucket): + @pytest.fixture(name="compute_log_manager") + def compute_log_manager(self, mock_s3_bucket): with tempfile.TemporaryDirectory() as temp_dir: yield S3ComputeLogManager( bucket=mock_s3_bucket.name, prefix="my_prefix", local_dir=temp_dir diff --git a/python_modules/libraries/dagster-azure/dagster_azure_tests/blob_tests/test_compute_log_manager.py b/python_modules/libraries/dagster-azure/dagster_azure_tests/blob_tests/test_compute_log_manager.py index dfc8bf65e3bc3..493ebcc64bd5a 100644 --- a/python_modules/libraries/dagster-azure/dagster_azure_tests/blob_tests/test_compute_log_manager.py +++ b/python_modules/libraries/dagster-azure/dagster_azure_tests/blob_tests/test_compute_log_manager.py @@ -8,7 +8,7 @@ from dagster._core.instance import DagsterInstance, InstanceRef, InstanceType from dagster._core.launcher.sync_in_memory_run_launcher import SyncInMemoryRunLauncher from dagster._core.run_coordinator import DefaultRunCoordinator -from dagster._core.storage.captured_log_manager import ComputeIOType +from dagster._core.storage.compute_log_manager import ComputeIOType from dagster._core.storage.event_log import SqliteEventLogStorage from dagster._core.storage.local_compute_log_manager import IO_TYPE_EXTENSION from dagster._core.storage.root import LocalArtifactStorage @@ -18,7 +18,7 @@ from dagster_azure.blob import AzureBlobComputeLogManager, FakeBlobServiceClient ensure_dagster_tests_import() -from dagster_tests.storage_tests.test_captured_log_manager import TestCapturedLogManager +from dagster_tests.storage_tests.test_compute_log_manager import TestComputeLogManager HELLO_WORLD = "Hello World" SEPARATOR = os.linesep if (os.name == "nt" and sys.version_info < (3,)) else "\n" @@ -226,11 +226,11 @@ def write_log_file(file_id: int, io_type: ComputeIOType): ] -class TestAzureComputeLogManager(TestCapturedLogManager): +class TestAzureComputeLogManager(TestComputeLogManager): __test__ = True - @pytest.fixture(name="captured_log_manager") - def captured_log_manager( + @pytest.fixture(name="compute_log_manager") + def compute_log_manager( self, blob_client, storage_account, @@ -280,8 +280,8 @@ def write_manager( ) @pytest.fixture(name="read_manager") - def read_manager(self, captured_log_manager): - yield captured_log_manager + def read_manager(self, compute_log_manager): + yield compute_log_manager @mock.patch("dagster_azure.blob.compute_log_manager.DefaultAzureCredential") diff --git a/python_modules/libraries/dagster-gcp/dagster_gcp/gcs/compute_log_manager.py b/python_modules/libraries/dagster-gcp/dagster_gcp/gcs/compute_log_manager.py index ab009391f5f7a..d8f973ccf36d4 100644 --- a/python_modules/libraries/dagster-gcp/dagster_gcp/gcs/compute_log_manager.py +++ b/python_modules/libraries/dagster-gcp/dagster_gcp/gcs/compute_log_manager.py @@ -11,12 +11,11 @@ _check as check, ) from dagster._config.config_type import Noneable -from dagster._core.storage.captured_log_manager import CapturedLogContext from dagster._core.storage.cloud_storage_compute_log_manager import ( CloudStorageComputeLogManager, PollingComputeLogSubscriptionManager, ) -from dagster._core.storage.compute_log_manager import ComputeIOType +from dagster._core.storage.compute_log_manager import CapturedLogContext, ComputeIOType from dagster._core.storage.local_compute_log_manager import ( IO_TYPE_EXTENSION, LocalComputeLogManager, diff --git a/python_modules/libraries/dagster-gcp/dagster_gcp_tests/gcs_tests/test_compute_log_manager.py b/python_modules/libraries/dagster-gcp/dagster_gcp_tests/gcs_tests/test_compute_log_manager.py index 981b0f2d3d392..b3153deff5434 100644 --- a/python_modules/libraries/dagster-gcp/dagster_gcp_tests/gcs_tests/test_compute_log_manager.py +++ b/python_modules/libraries/dagster-gcp/dagster_gcp_tests/gcs_tests/test_compute_log_manager.py @@ -10,7 +10,7 @@ from dagster._core.instance.ref import InstanceRef from dagster._core.launcher import DefaultRunLauncher from dagster._core.run_coordinator import DefaultRunCoordinator -from dagster._core.storage.captured_log_manager import ComputeIOType +from dagster._core.storage.compute_log_manager import ComputeIOType from dagster._core.storage.event_log import SqliteEventLogStorage from dagster._core.storage.root import LocalArtifactStorage from dagster._core.storage.runs import SqliteRunStorage @@ -20,7 +20,7 @@ from google.cloud import storage ensure_dagster_tests_import() -from dagster_tests.storage_tests.test_captured_log_manager import TestCapturedLogManager +from dagster_tests.storage_tests.test_compute_log_manager import TestComputeLogManager HELLO_WORLD = "Hello World" SEPARATOR = os.linesep if (os.name == "nt" and sys.version_info < (3,)) else "\n" @@ -337,11 +337,11 @@ def _return_mocked_blob(*args, **kwargs): @pytest.mark.integration -class TestGCSComputeLogManager(TestCapturedLogManager): +class TestGCSComputeLogManager(TestComputeLogManager): __test__ = True - @pytest.fixture(name="captured_log_manager") - def captured_log_manager(self, gcs_bucket): + @pytest.fixture(name="compute_log_manager") + def compute_log_manager(self, gcs_bucket): with tempfile.TemporaryDirectory() as temp_dir: yield GCSComputeLogManager(bucket=gcs_bucket, prefix="my_prefix", local_dir=temp_dir)