Skip to content

Commit

Permalink
[Data] Remove DatasetLogger (ray-project#44400)
Browse files Browse the repository at this point in the history
Ray Data uses a DatasetLogger wrapper class for logging. Some downsides of this implementation are:

- You need to add a redundant get_logger call whenever you log (i.e., logger.get_logger(log_to_stdout=False).info instead of logger.info)
- There's a layer of indirection to the logging module (creating DatasetLogger instead of calling logging.getLogger directly)
- Logging configuration is tightly coupled to the DatasetLogger implementation

To simplify our code, this PR removes DatasetLogger and replaces it with the built-in logging.Logger object. The logger is appropriately configured so that the logging behavior doesn't change.

---------

Signed-off-by: Balaji Veeramani <[email protected]>
  • Loading branch information
bveeramani authored Apr 9, 2024
1 parent 9fb9d75 commit 5e2a276
Show file tree
Hide file tree
Showing 31 changed files with 426 additions and 423 deletions.
9 changes: 9 additions & 0 deletions bazel/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,12 @@ def shutdown_ray():
def preserve_block_order():
ray.data.context.DataContext.get_current().execution_options.preserve_order = True
yield


@pytest.fixture(autouse=True)
def disable_start_message():
context = ray.data.context.DataContext.get_current()
original_value = context.print_on_execution_start
context.print_on_execution_start = False
yield
context.print_on_execution_start = original_value
13 changes: 11 additions & 2 deletions python/ray/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,9 @@ py_test(
)

py_test(
name = "test_logger",
name = "test_logging",
size = "small",
srcs = ["tests/test_logger.py"],
srcs = ["tests/test_logging.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)
Expand Down Expand Up @@ -353,6 +353,15 @@ py_test(
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_exceptions",
size = "small",
srcs = ["tests/test_exceptions.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)


py_test(
name = "test_execution_optimizer",
size = "medium",
Expand Down
2 changes: 2 additions & 0 deletions python/ray/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
ExecutionResources,
NodeIdStr,
)
from ray.data._internal.logging import configure_logging
from ray.data._internal.progress_bar import set_progress_bars
from ray.data.context import DataContext, DatasetContext
from ray.data.dataset import Dataset, Schema
Expand Down Expand Up @@ -62,6 +63,7 @@
_cached_fn = None
_cached_cls = None

configure_logging()

try:
import pyarrow as pa
Expand Down
111 changes: 0 additions & 111 deletions python/ray/data/_internal/dataset_logger.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import collections
import logging
from dataclasses import dataclass
from typing import Any, Dict, Iterator, List, Optional, Union

import ray
from ray.data._internal.compute import ActorPoolStrategy
from ray.data._internal.dataset_logger import DatasetLogger
from ray.data._internal.execution.interfaces import (
ExecutionOptions,
ExecutionResources,
Expand All @@ -20,7 +20,7 @@
from ray.data.context import DataContext
from ray.types import ObjectRef

logger = DatasetLogger(__name__)
logger = logging.getLogger(__name__)

# Higher values here are better for prefetching and locality. It's ok for this to be
# fairly high since streaming backpressure prevents us from overloading actors.
Expand Down Expand Up @@ -123,9 +123,7 @@ def start(self, options: ExecutionOptions):
# situations where the scheduler is unable to schedule downstream operators
# due to lack of available actors, causing an initial "pileup" of objects on
# upstream operators, leading to a spike in memory usage prior to steady state.
logger.get_logger(log_to_stdout=False).info(
f"{self._name}: Waiting for {len(refs)} pool actors to start..."
)
logger.debug(f"{self._name}: Waiting for {len(refs)} pool actors to start...")
try:
ray.get(refs, timeout=DEFAULT_WAIT_FOR_MIN_ACTORS_SEC)
except ray.exceptions.GetTimeoutError:
Expand Down Expand Up @@ -291,7 +289,7 @@ def shutdown(self):
min_workers = self._autoscaling_policy.min_workers
if len(self._output_metadata) < min_workers:
# The user created a stream that has too few blocks to begin with.
logger.get_logger().warning(
logger.warning(
"To ensure full parallelization across an actor pool of size "
f"{min_workers}, the Dataset should consist of at least "
f"{min_workers} distinct blocks. Consider increasing "
Expand Down
6 changes: 3 additions & 3 deletions python/ray/data/_internal/execution/resource_manager.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import logging
import os
import time
from abc import ABC, abstractmethod
from collections import defaultdict
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional

import ray
from ray.data._internal.dataset_logger import DatasetLogger
from ray.data._internal.execution.interfaces.execution_options import (
ExecutionOptions,
ExecutionResources,
Expand All @@ -25,7 +25,7 @@
from ray.data._internal.execution.streaming_executor_state import Topology


logger = DatasetLogger(__name__)
logger = logging.getLogger(__name__)
DEBUG_RESOURCE_MANAGER = os.environ.get("RAY_DATA_DEBUG_RESOURCE_MANAGER", "0") == "1"


Expand Down Expand Up @@ -344,7 +344,7 @@ def print_warning_if_idle_for_too_long(
" `DataContext.get_current().execution_options.exclude_resources`."
" This message will only print once."
)
logger.get_logger(log_to_stdout=True).warning(msg)
logger.warning(msg)

def __init__(self, resource_manager: ResourceManager, reservation_ratio: float):
super().__init__(resource_manager)
Expand Down
47 changes: 20 additions & 27 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import logging
import threading
import time
import uuid
from typing import Dict, Iterator, List, Optional

from ray.data._internal.dataset_logger import DatasetLogger
from ray.data._internal.execution.autoscaling_requester import (
get_or_create_autoscaling_requester_actor,
)
Expand All @@ -30,11 +30,12 @@
select_operator_to_run,
update_operator_states,
)
from ray.data._internal.logging import get_log_path
from ray.data._internal.progress_bar import ProgressBar
from ray.data._internal.stats import DatasetStats, StatsManager
from ray.data.context import DataContext

logger = DatasetLogger(__name__)
logger = logging.getLogger(__name__)

# Force a progress bar update after this many events processed . This avoids the
# progress bar seeming to stall for very large scale workloads.
Expand Down Expand Up @@ -102,22 +103,16 @@ def execute(
self._start_time = time.perf_counter()

if not isinstance(dag, InputDataBuffer):
stdout_logger = logger.get_logger()
log_path = logger.get_datasets_log_path()
message = "Starting execution of Dataset."
if log_path is not None:
message += f" Full log is in {log_path}"
stdout_logger.info(message)
stdout_logger.info("Execution plan of Dataset: %s\n", dag)
logger.get_logger(log_to_stdout=False).info(
"Execution config: %s", self._options
)
if not self._options.verbose_progress:
logger.get_logger(log_to_stdout=False).info(
"Tip: For detailed progress reporting, run "
"`ray.data.DataContext.get_current()."
"execution_options.verbose_progress = True`"
)
context = DataContext.get_current()
if context.print_on_execution_start:
message = "Starting execution of Dataset."
log_path = get_log_path()
if log_path is not None:
message += f" Full log is in {log_path}"
logger.info(message)
logger.info(f"Execution plan of Dataset: {dag}")

logger.debug("Execution config: %s", self._options)

# Setup the streaming DAG topology and start the runner thread.
self._topology, _ = build_streaming_topology(dag, self._options)
Expand Down Expand Up @@ -171,7 +166,7 @@ def shutdown(self, execution_completed: bool = True):
with self._shutdown_lock:
if not self._execution_started or self._shutdown:
return
logger.get_logger(log_to_stdout=False).debug(f"Shutting down {self}.")
logger.debug(f"Shutting down {self}.")
_num_shutdown += 1
self._shutdown = True
# Give the scheduling loop some time to finish processing.
Expand All @@ -190,9 +185,8 @@ def shutdown(self, execution_completed: bool = True):
stats_summary_string = self._final_stats.to_summary().to_string(
include_parent=False
)
logger.get_logger(log_to_stdout=context.enable_auto_log_stats).info(
stats_summary_string,
)
if context.enable_auto_log_stats:
logger.info(stats_summary_string)
# Close the progress bars from top to bottom to avoid them jumping
# around in the console after completion.
if self._global_info:
Expand Down Expand Up @@ -325,7 +319,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool:
f"Operator {op} completed. "
f"Operator Metrics:\n{op._metrics.as_dict()}"
)
logger.get_logger(log_to_stdout=False).info(log_str)
logger.debug(log_str)
self._has_op_completed[op] = True

# Keep going until all operators run to completion.
Expand Down Expand Up @@ -436,13 +430,12 @@ def _debug_dump_topology(topology: Topology, resource_manager: ResourceManager)
topology: The topology to debug.
resource_manager: The resource manager for this topology.
"""
logger.get_logger(log_to_stdout=False).info("Execution Progress:")
logger.debug("Execution Progress:")
for i, (op, state) in enumerate(topology.items()):
logger.get_logger(log_to_stdout=False).info(
logger.debug(
f"{i}: {state.summary_str(resource_manager)}, "
f"Blocks Outputted: {state.num_completed_tasks}/{op.num_outputs_total()}"
)
logger.get_logger(log_to_stdout=False).info("")


def _log_op_metrics(topology: Topology) -> None:
Expand All @@ -454,4 +447,4 @@ def _log_op_metrics(topology: Topology) -> None:
log_str = "Operator Metrics:\n"
for op in topology:
log_str += f"{op.name}: {op.metrics.as_dict()}\n"
logger.get_logger(log_to_stdout=False).info(log_str)
logger.debug(log_str)
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
This is split out from streaming_executor.py to facilitate better unit testing.
"""

import logging
import math
import threading
import time
Expand All @@ -11,7 +12,6 @@
from typing import Dict, List, Optional, Tuple

import ray
from ray.data._internal.dataset_logger import DatasetLogger
from ray.data._internal.execution.autoscaling_requester import (
get_or_create_autoscaling_requester_actor,
)
Expand All @@ -35,7 +35,7 @@
from ray.data._internal.execution.resource_manager import ResourceManager
from ray.data._internal.progress_bar import ProgressBar

logger = DatasetLogger(__name__)
logger = logging.getLogger(__name__)

# Holds the full execution state of the streaming topology. It's a dict mapping each
# operator to tracked streaming exec state.
Expand Down Expand Up @@ -436,14 +436,14 @@ def process_completed_tasks(
" Ignoring this exception with remaining"
f" max_errored_blocks={remaining}."
)
logger.get_logger().error(error_message, exc_info=e)
logger.error(error_message, exc_info=e)
else:
error_message += (
" Dataset execution will now abort."
" To ignore this exception and continue, set"
" DataContext.max_errored_blocks."
)
logger.get_logger().error(error_message)
logger.error(error_message)
raise e from None
else:
assert isinstance(task, MetadataOpTask)
Expand Down
Loading

0 comments on commit 5e2a276

Please sign in to comment.