-
Notifications
You must be signed in to change notification settings - Fork 6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Train] Update run status and actor status for train runs. #46395
Changes from 21 commits
5ce0cc1
8057587
54c1106
1ed6114
8312b2b
28f2437
bf0cdeb
423d650
5478d35
cc44bb4
e09f818
1abe4f0
ef866e8
7687e61
1e6fb9c
c7caa36
2820020
e129ab4
578f746
15af2bc
e350ef4
31ab9a7
17c4c1c
a17415a
cd0965f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,8 @@ | |
import ray | ||
import ray.dashboard.optional_utils as dashboard_optional_utils | ||
import ray.dashboard.utils as dashboard_utils | ||
from ray.core.generated import gcs_service_pb2, gcs_service_pb2_grpc | ||
from ray.dashboard.modules.actor.actor_head import actor_table_data_to_dict | ||
from ray.dashboard.modules.job.common import JobInfoStorageClient | ||
from ray.dashboard.modules.job.utils import find_jobs_by_job_ids | ||
from ray.util.annotations import DeveloperAPI | ||
|
@@ -20,10 +22,9 @@ def __init__(self, dashboard_head): | |
super().__init__(dashboard_head) | ||
self._train_stats_actor = None | ||
self._job_info_client = None | ||
self._gcs_actor_info_stub = None | ||
|
||
# TODO(aguo): Update this to a "v2" path since I made a backwards-incompatible | ||
# change. Will do so after the API is more stable. | ||
@routes.get("/api/train/runs") | ||
@routes.get("/api/train/v2/runs") | ||
@dashboard_optional_utils.init_ray_and_catch_exceptions() | ||
@DeveloperAPI | ||
async def get_train_runs(self, req: Request) -> Response: | ||
|
@@ -57,6 +58,7 @@ async def get_train_runs(self, req: Request) -> Response: | |
else: | ||
try: | ||
train_runs = await stats_actor.get_all_train_runs.remote() | ||
await self._add_actor_status(train_runs) | ||
# Sort train runs in reverse chronological order | ||
train_runs = sorted( | ||
train_runs.values(), | ||
|
@@ -92,6 +94,44 @@ async def get_train_runs(self, req: Request) -> Response: | |
content_type="application/json", | ||
) | ||
|
||
async def _add_actor_status(self, train_runs): | ||
try: | ||
from ray.train._internal.state.schema import ActorStatusEnum, RunStatusEnum | ||
except ImportError: | ||
logger.exception( | ||
"Train is not installed. Please run `pip install ray[train]` " | ||
"when setting up Ray on your cluster." | ||
) | ||
|
||
actor_status_table = {} | ||
try: | ||
logger.info("Getting all actor info from GCS.") | ||
request = gcs_service_pb2.GetAllActorInfoRequest() | ||
reply = await self._gcs_actor_info_stub.GetAllActorInfo(request, timeout=5) | ||
woshiyyya marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if reply.status.code == 0: | ||
for message in reply.actor_table_data: | ||
actor_table_data = actor_table_data_to_dict(message) | ||
actor_status_table[actor_table_data["actorId"]] = actor_table_data[ | ||
"state" | ||
] | ||
except Exception: | ||
logger.exception("Error Getting all actor info from GCS.") | ||
|
||
for train_run in train_runs.values(): | ||
for worker_info in train_run.workers: | ||
worker_info.status = actor_status_table.get(worker_info.actor_id, None) | ||
|
||
# If the controller died but the run status is not updated, | ||
# mark the train run as aborted | ||
controller_actor_status = actor_status_table.get( | ||
train_run.controller_actor_id, None | ||
) | ||
if ( | ||
controller_actor_status == ActorStatusEnum.DEAD | ||
and train_run.run_status == RunStatusEnum.STARTED | ||
): | ||
train_run.run_status = RunStatusEnum.ABORTED | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Feels a bit messy to update the run status here. If we keep this here I'd at least rename the method to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one is doing some post processing to handle some abnormally terminated cases. Let me update the function name here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should there be a |
||
|
||
@staticmethod | ||
def is_minimal_module(): | ||
return False | ||
|
@@ -102,6 +142,11 @@ async def run(self, server): | |
self._dashboard_head.gcs_aio_client | ||
) | ||
|
||
gcs_channel = self._dashboard_head.aiogrpc_gcs_channel | ||
self._gcs_actor_info_stub = gcs_service_pb2_grpc.ActorInfoGcsServiceStub( | ||
gcs_channel | ||
) | ||
|
||
async def get_train_stats_actor(self): | ||
""" | ||
Gets the train stats actor and caches it as an instance variable. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,25 @@ | ||
from enum import Enum | ||
from typing import List, Optional | ||
|
||
from ray._private.pydantic_compat import BaseModel, Field | ||
from ray.dashboard.modules.job.pydantic_models import JobDetails | ||
from ray.util.annotations import DeveloperAPI | ||
|
||
|
||
@DeveloperAPI | ||
class RunStatusEnum(str, Enum): | ||
STARTED = "STARTED" | ||
FINISHED = "FINISHED" | ||
ERRORED = "ERRORED" | ||
ABORTED = "ABORTED" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the difference between ERRORED and ABORTED? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We only mark it as We mark it as |
||
|
||
|
||
@DeveloperAPI | ||
class ActorStatusEnum(str, Enum): | ||
woshiyyya marked this conversation as resolved.
Show resolved
Hide resolved
|
||
DEAD = "DEAD" | ||
ALIVE = "ALIVE" | ||
|
||
|
||
@DeveloperAPI | ||
class TrainWorkerInfo(BaseModel): | ||
"""Metadata of a Ray Train worker.""" | ||
|
@@ -21,6 +36,9 @@ class TrainWorkerInfo(BaseModel): | |
gpu_ids: List[int] = Field( | ||
description="A list of GPU ids allocated to that worker." | ||
) | ||
status: Optional[ActorStatusEnum] = Field( | ||
description="The status of the train worker actor. It can be ALIVE or DEAD." | ||
) | ||
|
||
|
||
@DeveloperAPI | ||
|
@@ -46,9 +64,21 @@ class TrainRunInfo(BaseModel): | |
datasets: List[TrainDatasetInfo] = Field( | ||
description="A List of dataset info for this Train run." | ||
) | ||
run_status: RunStatusEnum = Field( | ||
woshiyyya marked this conversation as resolved.
Show resolved
Hide resolved
|
||
description="The current status of the train run. It can be one of the " | ||
"following: STARTED, FINISHED, or ERRORED." | ||
woshiyyya marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) | ||
status_detail: str = Field( | ||
description="Detailed information about the current run status, " | ||
"such as error messages." | ||
) | ||
Comment on lines
+77
to
+80
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the purpose of this? We only ever have one "User Error" message right now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is for tracking the error reason for now, and can be extend to track the details of current run status in the future(e.g. scaling up / down/ recovering when doing elastic training. |
||
start_time_ms: int = Field( | ||
description="The UNIX timestamp of the start time of this Train run." | ||
) | ||
end_time_ms: Optional[int] = Field( | ||
description="The UNIX timestamp of the end time of this Train run. " | ||
"If null, the Train run has not ended yet." | ||
) | ||
|
||
|
||
@DeveloperAPI | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,6 +1,6 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import logging | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import os | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from typing import Dict | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from typing import Any, Dict | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import ray | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from ray.data import Dataset | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -23,16 +23,19 @@ class TrainRunStateManager: | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def __init__(self, state_actor) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.state_actor = state_actor | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.train_run_info_dict = {} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def register_train_run( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
run_id: str, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
job_id: str, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
run_name: str, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
run_status: str, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
controller_actor_id: str, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
datasets: Dict[str, Dataset], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
worker_group: WorkerGroup, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
start_time_ms: float, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
status_detail: str = "", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"""Collect Train Run Info and report to StateActor.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -82,14 +85,22 @@ def collect_train_worker_info(): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for ds_name, ds in datasets.items() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
train_run_info = TrainRunInfo( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.train_run_info_dict = dict( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
id=run_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
job_id=job_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
name=run_name, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
controller_actor_id=controller_actor_id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
workers=worker_info_list, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
datasets=dataset_info_list, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
start_time_ms=start_time_ms, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
run_status=run_status, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
status_detail=status_detail, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
train_run_info = TrainRunInfo(**self.train_run_info_dict) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ray.get(self.state_actor.register_train_run.remote(train_run_info)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def update_train_run_info(self, updates: Dict[str, Any]) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"""Update specific fields of a registered TrainRunInfo instance.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.train_run_info_dict.update(updates) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
train_run_info = TrainRunInfo(**self.train_run_info_dict) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ray.get(self.state_actor.register_train_run.remote(train_run_info)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can cause problems if not careful in the future e.g. if somewhere we call Something like this:
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a good point! Let me update this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this in this method since it's already checked at the start of
get_train_runs
? Also if this ever does happen it'll just error out when trying to use these imports later in the code.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed