Skip to content

Commit

Permalink
merge captured log manager / compute log manager (#23531)
Browse files Browse the repository at this point in the history
## Summary & Motivation
Previously we asserted that the `compute_log_manager` conformed to the
`CapturedLogManager` interface.

This PR explicitly moves the `CapturedLogManager` interface into the
`ComputeLogManager` abstract class. This is just for cognitive
simplification and typing simplicity, so that we can just access
`instance.compute_log_manager` everywhere.

There's some question here whether this should be a breaking change or
not (for `1.9`)

## How I Tested These Changes
BK
  • Loading branch information
prha authored Aug 22, 2024
1 parent a64102b commit d307d1d
Show file tree
Hide file tree
Showing 30 changed files with 659 additions and 504 deletions.
4 changes: 0 additions & 4 deletions docs/sphinx/sections/api/apidocs/internals.rst
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,6 @@ See also: :py:class:`dagster_postgres.PostgresEventLogStorage` and :py:class:`da
Compute log manager
-------------------

.. currentmodule:: dagster._core.storage.captured_log_manager

.. autoclass:: CapturedLogManager

.. currentmodule:: dagster._core.storage.compute_log_manager

.. autoclass:: ComputeLogManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
EngineEventData,
)
from dagster._core.instance import DagsterInstance
from dagster._core.storage.captured_log_manager import CapturedLogData, CapturedLogManager
from dagster._core.storage.dagster_run import CANCELABLE_RUN_STATUSES
from dagster._core.workspace.permissions import Permissions
from dagster._utils.error import serializable_error_info_from_exc_info
Expand All @@ -46,6 +45,8 @@
)

if TYPE_CHECKING:
from dagster._core.storage.compute_log_manager import CapturedLogData

from dagster_graphql.schema.logs.compute_logs import GrapheneCapturedLogs
from dagster_graphql.schema.pipelines.subscription import (
GraphenePipelineRunLogsSubscriptionFailure,
Expand Down Expand Up @@ -315,13 +316,10 @@ async def gen_captured_log_data(
instance = graphene_info.context.instance

compute_log_manager = instance.compute_log_manager
if not isinstance(compute_log_manager, CapturedLogManager):
return

subscription = compute_log_manager.subscribe(log_key, cursor)

loop = asyncio.get_event_loop()
queue: asyncio.Queue[CapturedLogData] = asyncio.Queue()
queue: asyncio.Queue["CapturedLogData"] = asyncio.Queue()

def _enqueue(new_event):
loop.call_soon_threadsafe(queue.put_nowait, new_event)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from typing import TYPE_CHECKING, Sequence

from dagster._core.storage.captured_log_manager import CapturedLogManager

from dagster_graphql.schema.util import ResolveInfo

if TYPE_CHECKING:
Expand All @@ -13,9 +11,6 @@ def get_captured_log_metadata(
) -> "GrapheneCapturedLogsMetadata":
from ..schema.logs.compute_logs import GrapheneCapturedLogsMetadata

if not isinstance(graphene_info.context.instance.compute_log_manager, CapturedLogManager):
return GrapheneCapturedLogsMetadata()

metadata = graphene_info.context.instance.compute_log_manager.get_log_metadata(log_key)
return GrapheneCapturedLogsMetadata(
stdoutDownloadUrl=metadata.stdout_download_url,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
from dagster._core.instance import DagsterInstance
from dagster._core.remote_representation.external import ExternalPartitionSet
from dagster._core.storage.captured_log_manager import CapturedLogManager, ComputeIOType
from dagster._core.storage.compute_log_manager import ComputeIOType
from dagster._core.storage.dagster_run import DagsterRun, RunPartitionData, RunRecord, RunsFilter
from dagster._core.storage.tags import (
ASSET_PARTITION_RANGE_END_TAG,
Expand Down Expand Up @@ -651,9 +651,6 @@ def resolve_logEvents(self, graphene_info: ResolveInfo, cursor: Optional[str] =

instance = graphene_info.context.instance

if not isinstance(instance.compute_log_manager, CapturedLogManager):
return GrapheneInstigationEventConnection(events=[], cursor="", hasMore=False)

if not instance.backfill_log_storage_enabled():
return GrapheneInstigationEventConnection(events=[], cursor="", hasMore=False)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import graphene
from dagster._core.storage.captured_log_manager import CapturedLogData
from dagster._core.storage.compute_log_manager import CapturedLogData

from dagster_graphql.schema.util import non_null_list

Expand Down
5 changes: 2 additions & 3 deletions python_modules/dagster-graphql/dagster_graphql/schema/util.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import cast

import graphene
from dagster._core.storage.captured_log_manager import CapturedLogManager
from dagster._core.storage.compute_log_manager import ComputeLogManager
from dagster._core.workspace.context import WorkspaceRequestContext


Expand All @@ -15,6 +15,5 @@ def non_null_list(of_type):
return graphene.NonNull(graphene.List(graphene.NonNull(of_type)))


def get_compute_log_manager(graphene_info: ResolveInfo) -> CapturedLogManager:
assert isinstance(graphene_info.context.instance.compute_log_manager, CapturedLogManager)
def get_compute_log_manager(graphene_info: ResolveInfo) -> ComputeLogManager:
return graphene_info.context.instance.compute_log_manager
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from dagster import __version__ as dagster_version
from dagster._annotations import deprecated
from dagster._core.debug import DebugRunPayload
from dagster._core.storage.captured_log_manager import ComputeIOType
from dagster._core.storage.cloud_storage_compute_log_manager import CloudStorageComputeLogManager
from dagster._core.storage.compute_log_manager import ComputeIOType
from dagster._core.storage.local_compute_log_manager import LocalComputeLogManager
from dagster._core.storage.runs.sql_run_storage import SqlRunStorage
from dagster._core.workspace.context import BaseWorkspaceRequestContext, IWorkspaceProcessContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dagster import _seven
from dagster._core.instance import DagsterInstance
from dagster._core.log_manager import LOG_RECORD_METADATA_ATTR
from dagster._core.storage.captured_log_manager import CapturedLogManager, ComputeIOType
from dagster._core.storage.compute_log_manager import ComputeIOType, ComputeLogManager
from dagster._core.utils import coerce_valid_log_level
from dagster._utils.log import create_console_logger

Expand Down Expand Up @@ -47,7 +47,7 @@ def emit(self, record: logging.LogRecord):


class CapturedLogHandler(logging.Handler):
"""Persist logging records to an IO stream controlled by the CapturedLogManager."""
"""Persist logging records to an IO stream controlled by the ComputeLogManager."""

def __init__(self, write_stream: IO):
self._write_stream = write_stream
Expand Down Expand Up @@ -105,7 +105,7 @@ def __enter__(self):
if (
self._log_key
and self._instance
and isinstance(self._instance.compute_log_manager, CapturedLogManager)
and isinstance(self._instance.compute_log_manager, ComputeLogManager)
):
write_stream = self._exit_stack.enter_context(
self._instance.compute_log_manager.open_log_stream(
Expand Down Expand Up @@ -147,9 +147,6 @@ def has_captured_logs(self):
def get_instigation_log_records(
instance: DagsterInstance, log_key: Sequence[str]
) -> Sequence[Mapping[str, Any]]:
if not isinstance(instance.compute_log_manager, CapturedLogManager):
return []

log_data = instance.compute_log_manager.get_log_data(log_key)
raw_logs = log_data.stderr.decode("utf-8") if log_data.stderr else ""

Expand Down
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/_core/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
from dagster._core.execution.plan.objects import StepFailureData, StepRetryData, StepSuccessData
from dagster._core.execution.plan.outputs import StepOutputData
from dagster._core.log_manager import DagsterLogManager
from dagster._core.storage.captured_log_manager import CapturedLogContext
from dagster._core.storage.compute_log_manager import CapturedLogContext
from dagster._core.storage.dagster_run import DagsterRunStatus
from dagster._serdes import NamedTupleSerializer, whitelist_for_serdes
from dagster._serdes.serdes import EnumSerializer, UnpackContext, is_whitelisted_for_serdes_object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
step_failure_event_from_exc_info,
)
from dagster._core.execution.plan.plan import ExecutionPlan
from dagster._core.storage.captured_log_manager import CapturedLogManager
from dagster._utils.error import SerializableErrorInfo, serializable_error_info_from_exc_info


Expand All @@ -37,7 +36,6 @@ def inner_plan_execution_iterator(
check.inst_param(job_context, "pipeline_context", PlanExecutionContext)
check.inst_param(execution_plan, "execution_plan", ExecutionPlan)
compute_log_manager = job_context.instance.compute_log_manager
assert isinstance(compute_log_manager, CapturedLogManager)
step_keys = [step.key for step in execution_plan.get_steps_to_execute_in_topo_order()]
with execution_plan.start(
retry_mode=job_context.retry_mode,
Expand Down
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ def __init__(
from dagster._core.run_coordinator import RunCoordinator
from dagster._core.scheduler import Scheduler
from dagster._core.secrets import SecretsLoader
from dagster._core.storage.captured_log_manager import CapturedLogManager
from dagster._core.storage.compute_log_manager import ComputeLogManager
from dagster._core.storage.event_log import EventLogStorage
from dagster._core.storage.root import LocalArtifactStorage
from dagster._core.storage.runs import RunStorage
Expand All @@ -427,7 +427,7 @@ def __init__(

if compute_log_manager:
self._compute_log_manager = check.inst_param(
compute_log_manager, "compute_log_manager", CapturedLogManager
compute_log_manager, "compute_log_manager", ComputeLogManager
)
self._compute_log_manager.register_instance(self)
else:
Expand Down
Loading

1 comment on commit d307d1d

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs ready!

✅ Preview
https://dagster-docs-n4oemr5kv-elementl.vercel.app
https://master.dagster.dagster-docs.io

Built with commit d307d1d.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.