Skip to content

Commit

Permalink
Job launcher (NVIDIA#3049)
Browse files Browse the repository at this point in the history
* WIP: implement the ProcessJobLaunch.

* WIP: working ProcessJobLauncher impelentation.

* Added k8s_launcher implementation.

* Added logger for k8s_launcher.py

* Added empty launcher_map check.

* Added more config for K8sJobLauncher.

* renamed RunProcessKey.JOB_LAUNCHER.

* Separated out the JobHandleSpec.

* Support for the launcher deploy image.

* Changed the _get_job_launcher logic.

* add more handled for the deployment_map change.

* Added logging for job launcher.

* Fixed extract_job_image usage.

* Added job_meta for launch_job.

* codestyle fix.

* refactoried.

* extract to use constants.

* Change the JobLauncherSpec API signiture.

* Added can_launch() to JobLauncherSpec.

* refactor.

* removed duplicate const.

* removed no use import.

* changed to raise NotImplementedError().

* refactored.

* Changed to use event to get the job launcher.

* updated K8sJobLauncher.

* codestyle fix.

* removed no use import.

* JobReturnCode standard.

* fixed the _get_job_launcher() condition logic.
  • Loading branch information
yhwen authored Nov 1, 2024
1 parent 80f9e82 commit dd256fe
Show file tree
Hide file tree
Showing 23 changed files with 658 additions and 90 deletions.
1 change: 1 addition & 0 deletions nvflare/apis/event_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,4 @@ class EventType(object):

AUTHORIZE_COMMAND_CHECK = "_authorize_command_check"
BEFORE_BUILD_COMPONENT = "_before_build_component"
GET_JOB_LAUNCHER = "_get_job_launcher"
8 changes: 7 additions & 1 deletion nvflare/apis/fl_constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ class FLContextKey(object):
AUTHORIZATION_REASON = "_authorization_reason"
DISCONNECTED_CLIENT_NAME = "_disconnected_client_name"
RECONNECTED_CLIENT_NAME = "_reconnected_client_name"
SITE_OBJ = "_site_obj_"
JOB_LAUNCHER = "_job_launcher"

CLIENT_REGISTER_DATA = "_client_register_data"
SECURITY_ITEMS = "_security_items"
Expand Down Expand Up @@ -324,7 +326,7 @@ class SnapshotKey(object):
class RunProcessKey(object):
LISTEN_PORT = "_listen_port"
CONNECTION = "_conn"
CHILD_PROCESS = "_child_process"
JOB_HANDLE = "_job_launcher"
STATUS = "_status"
JOB_ID = "_job_id"
PARTICIPANTS = "_participants"
Expand Down Expand Up @@ -356,6 +358,10 @@ class JobConstants:
CLIENT_JOB_CONFIG = "config_fed_client.json"
META_FILE = "meta.json"
META = "meta"
SITES = "sites"
JOB_IMAGE = "image"
JOB_ID = "job_id"
JOB_LAUNCHER = "job_launcher"


class WorkspaceConstants:
Expand Down
76 changes: 76 additions & 0 deletions nvflare/apis/job_launcher_spec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import abstractmethod

from nvflare.apis.fl_component import FLComponent
from nvflare.apis.fl_constant import FLContextKey
from nvflare.apis.fl_context import FLContext
from nvflare.fuel.common.exit_codes import ProcessExitCode


class JobReturnCode(ProcessExitCode):
SUCCESS = 0
EXECUTION_ERROR = 1
ABORTED = 9
UNKNOWN = 127


def add_launcher(launcher, fl_ctx: FLContext):
job_launcher: list = fl_ctx.get_prop(FLContextKey.JOB_LAUNCHER, [])
job_launcher.append(launcher)
fl_ctx.set_prop(FLContextKey.JOB_LAUNCHER, job_launcher, private=True, sticky=False)


class JobHandleSpec:
@abstractmethod
def terminate(self):
"""To terminate the job run.
Returns: the job run return code.
"""
raise NotImplementedError()

@abstractmethod
def poll(self):
"""To get the return code of the job run.
Returns: return_code
"""
raise NotImplementedError()

@abstractmethod
def wait(self):
"""To wait until the job run complete.
Returns: returns until the job run complete.
"""
raise NotImplementedError()


class JobLauncherSpec(FLComponent):
@abstractmethod
def launch_job(self, job_meta: dict, fl_ctx: FLContext) -> JobHandleSpec:
"""To launch a job run.
Args:
job_meta: job meta data
fl_ctx: FLContext
Returns: boolean to indicates the job launch success or fail.
"""
raise NotImplementedError()
4 changes: 2 additions & 2 deletions nvflare/apis/server_engine_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,12 @@ def restore_components(self, snapshot: RunSnapshot, fl_ctx: FLContext):
pass

@abstractmethod
def start_client_job(self, job_id, client_sites, fl_ctx: FLContext):
def start_client_job(self, job, client_sites, fl_ctx: FLContext):
"""To send the start client run commands to the clients
Args:
client_sites: client sites
job_id: job_id
job: job object
fl_ctx: FLContext
Returns:
Expand Down
13 changes: 13 additions & 0 deletions nvflare/app_common/job_launcher/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
120 changes: 120 additions & 0 deletions nvflare/app_common/job_launcher/process_launcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import shlex
import subprocess
import sys

from nvflare.apis.event_type import EventType
from nvflare.apis.fl_constant import FLContextKey
from nvflare.apis.fl_context import FLContext
from nvflare.apis.job_def import JobMetaKey
from nvflare.apis.job_launcher_spec import JobHandleSpec, JobLauncherSpec, JobReturnCode, add_launcher
from nvflare.apis.workspace import Workspace
from nvflare.private.fed.utils.fed_utils import add_custom_dir_to_path, extract_job_image

JOB_RETURN_CODE_MAPPING = {0: JobReturnCode.SUCCESS, 1: JobReturnCode.EXECUTION_ERROR, 9: JobReturnCode.ABORTED}


class ProcessHandle(JobHandleSpec):
def __init__(self, process):
super().__init__()

self.process = process
self.logger = logging.getLogger(self.__class__.__name__)

def terminate(self):
if self.process:
try:
os.killpg(os.getpgid(self.process.pid), 9)
self.logger.debug("kill signal sent")
except:
pass

self.process.terminate()

def poll(self):
if self.process:
return JOB_RETURN_CODE_MAPPING.get(self.process.poll(), JobReturnCode.EXECUTION_ERROR)
else:
return JobReturnCode.UNKNOWN

def wait(self):
if self.process:
self.process.wait()


class ProcessJobLauncher(JobLauncherSpec):
def __init__(self):
super().__init__()

self.logger = logging.getLogger(self.__class__.__name__)

def launch_job(self, job_meta: dict, fl_ctx: FLContext) -> JobHandleSpec:

new_env = os.environ.copy()
workspace_obj: Workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT)
args = fl_ctx.get_prop(FLContextKey.ARGS)
client = fl_ctx.get_prop(FLContextKey.SITE_OBJ)
job_id = job_meta.get(JobMetaKey.JOB_ID)
server_config = fl_ctx.get_prop(FLContextKey.SERVER_CONFIG)
if not server_config:
raise RuntimeError(f"missing {FLContextKey.SERVER_CONFIG} in FL context")
service = server_config[0].get("service", {})
if not isinstance(service, dict):
raise RuntimeError(f"expect server config data to be dict but got {type(service)}")

app_custom_folder = workspace_obj.get_app_custom_dir(job_id)
if app_custom_folder != "":
add_custom_dir_to_path(app_custom_folder, new_env)

command_options = ""
for t in args.set:
command_options += " " + t
command = (
f"{sys.executable} -m nvflare.private.fed.app.client.worker_process -m "
+ args.workspace
+ " -w "
+ (workspace_obj.get_startup_kit_dir())
+ " -t "
+ client.token
+ " -d "
+ client.ssid
+ " -n "
+ job_id
+ " -c "
+ client.client_name
+ " -p "
+ str(client.cell.get_internal_listener_url())
+ " -g "
+ service.get("target")
+ " -scheme "
+ service.get("scheme", "grpc")
+ " -s fed_client.json "
" --set" + command_options + " print_conf=True"
)
# use os.setsid to create new process group ID
process = subprocess.Popen(shlex.split(command, True), preexec_fn=os.setsid, env=new_env)

self.logger.info("Worker child process ID: {}".format(process.pid))

return ProcessHandle(process)

def handle_event(self, event_type: str, fl_ctx: FLContext):
if event_type == EventType.GET_JOB_LAUNCHER:
job_meta = fl_ctx.get_prop(FLContextKey.JOB_META)
job_image = extract_job_image(job_meta, fl_ctx.get_identity_name())
if not job_image:
add_launcher(self, fl_ctx)
5 changes: 4 additions & 1 deletion nvflare/app_common/job_schedulers/job_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from nvflare.apis.job_def_manager_spec import JobDefManagerSpec
from nvflare.apis.job_scheduler_spec import DispatchInfo, JobSchedulerSpec
from nvflare.apis.server_engine_spec import ServerEngineSpec
from nvflare.private.fed.utils.fed_utils import extract_participants

SCHEDULE_RESULT_OK = 0 # the job is scheduled
SCHEDULE_RESULT_NO_RESOURCE = 1 # job is not scheduled due to lack of resources
Expand Down Expand Up @@ -109,7 +110,9 @@ def _try_job(self, job: Job, fl_ctx: FLContext) -> (int, Optional[Dict[str, Disp
applicable_sites = []
sites_to_app = {}
for app_name in job.deploy_map:
for site_name in job.deploy_map[app_name]:
deployments = job.deploy_map[app_name]
deployments = extract_participants(deployments)
for site_name in deployments:
if site_name.upper() == ALL_SITES:
# deploy_map: {"app_name": ["ALL_SITES"]} will be treated as deploying to all online clients
applicable_sites = online_site_names
Expand Down
13 changes: 13 additions & 0 deletions nvflare/app_opt/job_launcher/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Loading

0 comments on commit dd256fe

Please sign in to comment.