Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Jobs] Revisit Ray Job execution and monitoring #45120

Open
wants to merge 123 commits into
base: master
Choose a base branch
from

Conversation

alexeykudinkin
Copy link
Contributor

@alexeykudinkin alexeykudinkin commented May 3, 2024

Why are these changes needed?

Context

Motivations for this refactoring are multiple:

  • Consolidating all of the job management in one place (JobSupervisor; previously spread b/w JobManager and JobSupervisor)

  • Decoupling management and monitoring of the job execution from the execution of the job driver (previously was coupled inside JobSupervisor)

These steps are necessary to be able

  • Perform job management and monitoring exclusively from the head-node
  • Run actual job drivers on any node (worker or head)

Changes

With stated goals in mind following are primary changes that were implemented (with the rest just to facilitate this migration):

  1. All of the job management and monitoring is consolidated inside JobSupervisor (always running on a head node)
  2. Actual job driver execution is performed by (descendent) JobExecutor actor (could run on any node)

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

finally:
self.monitored_jobs.remove(job_id)

async def _monitor_job_internal(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This moved to JobSupervisor._monitor_job_internal

self._logger = logging.getLogger(f"{__name__}.supervisor-{job_id}")
self._configure_logger()

def _configure_logger(self) -> None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has been promoted to module method to be reused

Comment on lines -352 to -390
polling_task.cancel()
if sys.platform == "win32" and self._win32_job_object:
win32job.TerminateJobObject(self._win32_job_object, -1)
elif sys.platform != "win32":
stop_signal = os.environ.get("RAY_JOB_STOP_SIGNAL", "SIGTERM")
if stop_signal not in self.VALID_STOP_SIGNALS:
self._logger.warning(
f"{stop_signal} not a valid stop signal. Terminating "
"job with SIGTERM."
)
stop_signal = "SIGTERM"

job_process = psutil.Process(child_pid)
proc_to_kill = [job_process] + job_process.children(recursive=True)

# Send stop signal and wait for job to terminate gracefully,
# otherwise SIGKILL job forcefully after timeout.
self._kill_processes(proc_to_kill, getattr(signal, stop_signal))
try:
stop_job_wait_time = int(
os.environ.get(
"RAY_JOB_STOP_WAIT_TIME_S",
self.DEFAULT_RAY_JOB_STOP_WAIT_TIME_S,
)
)
poll_job_stop_task = create_task(self._poll_all(proc_to_kill))
await asyncio.wait_for(poll_job_stop_task, stop_job_wait_time)
self._logger.info(
f"Job {self._job_id} has been terminated gracefully "
f"with {stop_signal}."
)
except asyncio.TimeoutError:
self._logger.warning(
f"Attempt to gracefully terminate job {self._job_id} "
f"through {stop_signal} has timed out after "
f"{stop_job_wait_time} seconds. Job is now being "
"force-killed with SIGKILL."
)
self._kill_processes(proc_to_kill, signal.SIGKILL)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extracted as stop_process

# Timeout to finalize job status after job driver exiting
JOB_STATUS_FINALIZATION_TIMEOUT_S = 60

def __init__(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most important changes in JobSupervisor:

  • JS took over whole job management lifecycle from JobManager
  • Actual driver execution have been moved to JobRunner

Comment on lines -101 to -125
def _get_actor_for_job(self, job_id: str) -> Optional[ActorHandle]:
try:
return ray.get_actor(
JOB_ACTOR_NAME_TEMPLATE.format(job_id=job_id),
namespace=SUPERVISOR_ACTOR_RAY_NAMESPACE,
)
except ValueError: # Ray returns ValueError for nonexistent actor.
return None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extracted to utils (renamed to _get_supervisor_actor_for_job)

Comment on lines 73 to 82
self._log_client = JobLogStorageClient()
self._supervisor_actor_cls = ray.remote(JobSupervisor)
self.monitored_jobs = set()
try:
self.event_logger = get_event_logger(Event.SourceType.JOBS, logs_dir)
except Exception:
self.event_logger = None

self._recover_running_jobs_event = asyncio.Event()
run_background_task(self._recover_running_jobs())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Monitoring moved to JobSupervisor

Comment on lines -84 to -116
async def _recover_running_jobs(self):
"""Recovers all running jobs from the status client.

For each job, we will spawn a coroutine to monitor it.
Each will be added to self._running_jobs and reconciled.
"""
try:
all_jobs = await self._job_info_client.get_all_jobs()
for job_id, job_info in all_jobs.items():
if not job_info.status.is_terminal():
run_background_task(self._monitor_job(job_id))
finally:
# This event is awaited in `submit_job` to avoid race conditions between
# recovery and new job submission, so it must always get set even if there
# are exceptions.
self._recover_running_jobs_event.set()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleted

Comment on lines +420 to +450
def _get_driver_scheduling_strategy(
self, resources_specified: bool
) -> SchedulingStrategyT:
"""Get the scheduling strategy for the job.

If resources_specified is true, or if the environment variable is set to
allow the job's driver (entrypoint) to run on worker nodes, we will use Ray's
default actor placement strategy. Otherwise, we will force the job to use the
head node.

Args:
resources_specified: Whether the job specified any resources
(CPUs, GPUs, or custom resources).

Returns:
The scheduling strategy to use for the job.
"""
if resources_specified:
return "DEFAULT"
elif os.environ.get(RAY_JOB_ALLOW_DRIVER_ON_WORKER_NODES_ENV_VAR, "0") == "1":
self._logger.info(
f"({self._job_id}) {RAY_JOB_ALLOW_DRIVER_ON_WORKER_NODES_ENV_VAR} was set to 1. "
"Using Ray's default actor scheduling strategy for the job "
"driver instead of running it on the head node."
)
return "DEFAULT"

# If the user did not specify any resources or set the driver on worker nodes
# env var, we will run the driver on the head node.
#
# NOTE: This is preserved for compatibility reasons
return NodeAffinitySchedulingStrategy(
node_id=ray.worker.global_worker.current_node_id.hex(), soft=True
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved from JobManager (no logic changes)

Comment on lines +455 to +494
def _get_runner_runtime_env(
self,
*,
user_runtime_env: Dict[str, Any],
submission_id: str,
entrypoint_resources_specified: bool,
) -> Dict[str, Any]:
"""Configure and return the runtime_env for the supervisor actor.

Args:
user_runtime_env: The runtime_env specified by the user.
entrypoint_resources_specified: Whether the user specified resources in the
submit_job() call. If so, we will skip the workaround introduced
in #24546 for GPU detection and just use the user's resource
requests, so that the behavior matches that of the user specifying
resources for any other actor.

Returns:
The runtime_env for the supervisor actor.
"""
# Make a copy to avoid mutating passed runtime_env.
runtime_env = (
copy.deepcopy(user_runtime_env) if user_runtime_env is not None else {}
)

# NOTE(edoakes): Can't use .get(, {}) here because we need to handle the case
# where env_vars is explicitly set to `None`.
env_vars = runtime_env.get("env_vars")
if env_vars is None:
env_vars = {}

env_vars[ray_constants.RAY_WORKER_NICENESS] = "0"

if not entrypoint_resources_specified:
# Don't set CUDA_VISIBLE_DEVICES for the supervisor actor so the
# driver can use GPUs if it wants to. This will be removed from
# the driver's runtime_env so it isn't inherited by tasks & actors.
env_vars[ray_constants.NOSET_CUDA_VISIBLE_DEVICES_ENV_VAR] = "1"
runtime_env["env_vars"] = env_vars

if os.getenv(RAY_STREAM_RUNTIME_ENV_LOG_TO_JOB_DRIVER_LOG_ENV_VAR, "0") == "1":
config = runtime_env.get("config")
# Empty fields may be set to None, so we need to check for None explicitly.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved from JobManager (_get_supervisor_runtime_env previously), no logic changes

Comment on lines -296 to -330
curr_info = await self._job_info_client.get_info(self._job_id)
if curr_info is None:
raise RuntimeError(f"Status could not be retrieved for job {self._job_id}.")
curr_status = curr_info.status
curr_message = curr_info.message
if curr_status == JobStatus.RUNNING:
raise RuntimeError(
f"Job {self._job_id} is already in RUNNING state. "
f"JobSupervisor.run() should only be called once. "
)
if curr_status != JobStatus.PENDING:
raise RuntimeError(
f"Job {self._job_id} is not in PENDING state. "
f"Current status is {curr_status} with message {curr_message}."
)

if _start_signal_actor:
# Block in PENDING state until start signal received.
await _start_signal_actor.wait.remote()

driver_agent_http_address = (
"http://"
f"{ray.worker.global_worker.node.node_ip_address}:"
f"{ray.worker.global_worker.node.dashboard_agent_listen_port}"
)
driver_node_id = ray.worker.global_worker.current_node_id.hex()

await self._job_info_client.put_status(
self._job_id,
JobStatus.RUNNING,
jobinfo_replace_kwargs={
"driver_agent_http_address": driver_agent_http_address,
"driver_node_id": driver_node_id,
},
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All state management has been lifted up to JobSupervisor (to consolidate ownership scope, avoid race-conditions)

Comment on lines 958 to 990
os.environ.update(self._get_driver_env_vars())

self._logger.info(f"({self._job_id}) Executing job driver's entrypoint")

log_path = self._log_client.get_log_file_path(self._job_id)
child_process = self._exec_entrypoint(log_path)
child_pid = child_process.pid

# Execute job's entrypoint in the subprocess
self._driver_process = self._exec_entrypoint(log_path)

except Exception as e:
self._logger.error(
f"({self._job_id}) Got unexpected exception while executing job's entrypoint: {repr(e)}",
exc_info=e,
)

async def join(self) -> JobExecutionResult:
"""
Joins job execution blocking until either of the following conditions is true

1. Job driver has completed (subprocess exited)
2. Job has been interrupted (due to Ray job being stopped)

NOTE: This method should only be called after `JobExecutor.start`
"""

try:
assert self._driver_process, "Job's driver is not running"

child_process = self._driver_process

# Block until either of the following occurs:
# - Process executing job's entrypoint completes (exits, returning specific exit-code)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes

  • Splitting job running sequence in 2 methods (so that job is annotated as RUNNING only if we're able to successfully start the entrypoint)
  • No logic changes otherwise)

@alexeykudinkin alexeykudinkin changed the title [WIP] Revisit Ray Job management [Jobs] Revisit Ray Job execution and monitoring May 18, 2024
@alexeykudinkin alexeykudinkin requested a review from edoakes May 18, 2024 01:48
@edoakes
Copy link
Contributor

edoakes commented May 20, 2024

My understanding is that there are two primary issues we are trying to solve here:

  1. Always run the control plane/supervising logic on the head node to avoid issues if worker nodes are terminated (which should be assumed to happen frequently).
  2. Enable running job drivers on worker nodes if requested.

The minimal change we could make to address the above is to simply only run the JobManager on the head node only (in the API server) but allow the spawned JobSupervisor actors to optionally run on worker nodes. This would require very little code change.

What is the reasoning for adding an additional component in the middle here (what you have renamed to the JobSupervisor)? The PR description says that this is necessary to satisfy (1) and (2) above but I don't follow. I also don't see how this is "consolidating all of the job management in one place" -- if anything, it is less consolidated now as there is a 3rd component in the mix which complicates edge case handling (e.g., if the JobSupervisor and/or the JobExecutor fail to start or exit unexpectedly.

Independent of what we decide as the end state, this is a huge refactor on code that has had a lot of subtle issues ironed out over time (and is currently very stable), so I'm not comfortable with merging a complete overhaul in one shot (the risk/reward ratio is out of whack). So let's discuss how to make the changes more incrementally.

@alexeykudinkin
Copy link
Contributor Author

@edoakes made the changes we've discussed regarding running JobSupervisor in process in separate PR to make review easier:

#45664

We can review these independently, then i'll squash them into 1

Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
  - Appropriately report job-run status from the runner
  - Handle exceptions in the JobSupervisor

Signed-off-by: Alexey Kudinkin <[email protected]>
Properly handle job being stopped;

Signed-off-by: Alexey Kudinkin <[email protected]>
…e_info` method;

Wire in job driver's Dashboard Agent details into `JobInfo`

Signed-off-by: Alexey Kudinkin <[email protected]>
Tidying up

Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
…fy whether job driver is running

Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
…as job-status in GCS as RUNNING

Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Signed-off-by: Alexey Kudinkin <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants