Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job launcher server side #3055

Merged
merged 50 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from 47 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
21b5092
WIP: implement the ProcessJobLaunch.
yhwen Oct 10, 2024
4c15158
WIP: working ProcessJobLauncher impelentation.
yhwen Oct 14, 2024
610f460
Added k8s_launcher implementation.
yhwen Oct 14, 2024
048df0d
Added logger for k8s_launcher.py
yhwen Oct 14, 2024
4296f96
Added empty launcher_map check.
yhwen Oct 14, 2024
0dfae5e
Added more config for K8sJobLauncher.
yhwen Oct 16, 2024
044b769
renamed RunProcessKey.JOB_LAUNCHER.
yhwen Oct 16, 2024
02c71e7
Separated out the JobHandleSpec.
yhwen Oct 17, 2024
b14dc75
Support for the launcher deploy image.
yhwen Oct 18, 2024
c88e00b
Changed the _get_job_launcher logic.
yhwen Oct 18, 2024
a7721ed
add more handled for the deployment_map change.
yhwen Oct 18, 2024
9a7e89e
Added logging for job launcher.
yhwen Oct 18, 2024
0c57c7b
Fixed extract_job_image usage.
yhwen Oct 18, 2024
68c025d
Added job_meta for launch_job.
yhwen Oct 18, 2024
24469b4
codestyle fix.
yhwen Oct 18, 2024
1d2fece
refactoried.
yhwen Oct 19, 2024
ceaeb8e
extract to use constants.
yhwen Oct 21, 2024
b974d9a
Change the JobLauncherSpec API signiture.
yhwen Oct 22, 2024
765357b
Added can_launch() to JobLauncherSpec.
yhwen Oct 22, 2024
1d7ebc7
refactor.
yhwen Oct 23, 2024
5103592
Merged from main.
yhwen Oct 23, 2024
dc94e01
removed duplicate const.
yhwen Oct 23, 2024
7f1a9d9
removed no use import.
yhwen Oct 23, 2024
ae8a2dc
changed to raise NotImplementedError().
yhwen Oct 24, 2024
604a29c
Added job launcher support for server side.
yhwen Oct 24, 2024
f923e97
refactored.
yhwen Oct 25, 2024
037c7bb
Changed to use event to get the job launcher.
yhwen Oct 25, 2024
e6123b1
updated K8sJobLauncher.
yhwen Oct 25, 2024
90f8687
codestyle fix.
yhwen Oct 25, 2024
1aa8d39
removed no use import.
yhwen Oct 25, 2024
8153dd3
JobReturnCode standard.
yhwen Oct 28, 2024
baecfff
fixed the _get_job_launcher() condition logic.
yhwen Oct 29, 2024
85ecd10
Merge branch 'job_launcher' into job_launcher_server
yhwen Oct 30, 2024
5d4146a
refactored.
yhwen Oct 30, 2024
a84d7a3
Merge branch 'main' into job_launcher
YuanTingHsieh Oct 30, 2024
ac73549
refactored.
yhwen Oct 30, 2024
01bb0c2
Added ClientK8sJobLauncher.
yhwen Oct 31, 2024
90ad1cb
Added ServerK8sJobLauncher.
yhwen Oct 31, 2024
23b0b92
Fixed missing args.
yhwen Oct 31, 2024
1d94eb4
Merge branch 'main' into job_launcher
YuanTingHsieh Oct 31, 2024
0e148a9
Fixed get_set_list().
yhwen Oct 31, 2024
a0ec82d
refactored.
yhwen Nov 1, 2024
b51ef76
fixed the missing client_name in the workspace object.
yhwen Nov 1, 2024
98d2c10
Merge branch 'job_launcher' into job_launcher_server
yhwen Nov 1, 2024
83c0f85
Merge branch 'main' into job_launcher_server
yhwen Nov 1, 2024
49b30e4
codestyle fix.
yhwen Nov 1, 2024
5e8c70f
removed no use constant.
yhwen Nov 1, 2024
b116410
use a new fl_ctx for get_job_launcher to solve the thread safe issue.
yhwen Nov 12, 2024
ef336c6
changed to use with engine.new_context() as job_launcher_ctx: pattern.
yhwen Nov 14, 2024
6fbc51e
Merge branch 'main' into job_launcher_server
yhwen Nov 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nvflare/apis/fl_constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ class FLContextKey(object):
RECONNECTED_CLIENT_NAME = "_reconnected_client_name"
SITE_OBJ = "_site_obj_"
JOB_LAUNCHER = "_job_launcher"
SNAPSHOT = "job_snapshot"

CLIENT_REGISTER_DATA = "_client_register_data"
SECURITY_ITEMS = "_security_items"
Expand Down
66 changes: 66 additions & 0 deletions nvflare/app_common/job_launcher/client_process_launcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys

from nvflare.apis.fl_constant import FLContextKey, JobConstants
from nvflare.apis.workspace import Workspace
from nvflare.app_common.job_launcher.process_launcher import ProcessJobLauncher
from nvflare.private.fed.utils.fed_utils import add_custom_dir_to_path


class ClientProcessJobLauncher(ProcessJobLauncher):
def get_command(self, launch_data, fl_ctx) -> (str, dict):
new_env = os.environ.copy()
workspace_obj: Workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT)
args = fl_ctx.get_prop(FLContextKey.ARGS)
client = fl_ctx.get_prop(FLContextKey.SITE_OBJ)
job_id = launch_data.get(JobConstants.JOB_ID)
server_config = fl_ctx.get_prop(FLContextKey.SERVER_CONFIG)
if not server_config:
raise RuntimeError(f"missing {FLContextKey.SERVER_CONFIG} in FL context")
service = server_config[0].get("service", {})
if not isinstance(service, dict):
raise RuntimeError(f"expect server config data to be dict but got {type(service)}")

app_custom_folder = workspace_obj.get_app_custom_dir(job_id)
if app_custom_folder != "":
add_custom_dir_to_path(app_custom_folder, new_env)

command_options = ""
for t in args.set:
command_options += " " + t
command = (
f"{sys.executable} -m nvflare.private.fed.app.client.worker_process -m "
+ args.workspace
+ " -w "
+ (workspace_obj.get_startup_kit_dir())
+ " -t "
+ client.token
+ " -d "
+ client.ssid
+ " -n "
+ job_id
+ " -c "
+ client.client_name
+ " -p "
+ str(client.cell.get_internal_listener_url())
+ " -g "
+ service.get("target")
+ " -scheme "
+ service.get("scheme", "grpc")
+ " -s fed_client.json "
" --set" + command_options + " print_conf=True"
)
return command, new_env
64 changes: 18 additions & 46 deletions nvflare/app_common/job_launcher/process_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@
import os
import shlex
import subprocess
import sys
from abc import abstractmethod

from nvflare.apis.event_type import EventType
from nvflare.apis.fl_constant import FLContextKey
from nvflare.apis.fl_context import FLContext
from nvflare.apis.job_def import JobMetaKey
from nvflare.apis.job_launcher_spec import JobHandleSpec, JobLauncherSpec, JobReturnCode, add_launcher
from nvflare.apis.workspace import Workspace
from nvflare.private.fed.utils.fed_utils import add_custom_dir_to_path, extract_job_image
from nvflare.private.fed.utils.fed_utils import extract_job_image

JOB_RETURN_CODE_MAPPING = {0: JobReturnCode.SUCCESS, 1: JobReturnCode.EXECUTION_ERROR, 9: JobReturnCode.ABORTED}

Expand Down Expand Up @@ -64,51 +62,11 @@ def __init__(self):

def launch_job(self, job_meta: dict, fl_ctx: FLContext) -> JobHandleSpec:

new_env = os.environ.copy()
workspace_obj: Workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT)
args = fl_ctx.get_prop(FLContextKey.ARGS)
client = fl_ctx.get_prop(FLContextKey.SITE_OBJ)
job_id = job_meta.get(JobMetaKey.JOB_ID)
server_config = fl_ctx.get_prop(FLContextKey.SERVER_CONFIG)
if not server_config:
raise RuntimeError(f"missing {FLContextKey.SERVER_CONFIG} in FL context")
service = server_config[0].get("service", {})
if not isinstance(service, dict):
raise RuntimeError(f"expect server config data to be dict but got {type(service)}")

app_custom_folder = workspace_obj.get_app_custom_dir(job_id)
if app_custom_folder != "":
add_custom_dir_to_path(app_custom_folder, new_env)

command_options = ""
for t in args.set:
command_options += " " + t
command = (
f"{sys.executable} -m nvflare.private.fed.app.client.worker_process -m "
+ args.workspace
+ " -w "
+ (workspace_obj.get_startup_kit_dir())
+ " -t "
+ client.token
+ " -d "
+ client.ssid
+ " -n "
+ job_id
+ " -c "
+ client.client_name
+ " -p "
+ str(client.cell.get_internal_listener_url())
+ " -g "
+ service.get("target")
+ " -scheme "
+ service.get("scheme", "grpc")
+ " -s fed_client.json "
" --set" + command_options + " print_conf=True"
)
command, new_env = self.get_command(job_meta, fl_ctx)
# use os.setsid to create new process group ID
process = subprocess.Popen(shlex.split(command, True), preexec_fn=os.setsid, env=new_env)

self.logger.info("Worker child process ID: {}".format(process.pid))
self.logger.info("Launch the job in process ID: {}".format(process.pid))

return ProcessHandle(process)

Expand All @@ -118,3 +76,17 @@ def handle_event(self, event_type: str, fl_ctx: FLContext):
job_image = extract_job_image(job_meta, fl_ctx.get_identity_name())
if not job_image:
add_launcher(self, fl_ctx)

@abstractmethod
def get_command(self, launch_data, fl_ctx) -> (str, dict):
"""To generate the command to launcher the job in sub-process
Args:
fl_ctx: FLContext
launch_data: job launcher data
Returns:
launch command, environment dict
"""
pass
71 changes: 71 additions & 0 deletions nvflare/app_common/job_launcher/server_process_launcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys

from nvflare.apis.fl_constant import FLContextKey, JobConstants
from nvflare.apis.workspace import Workspace
from nvflare.app_common.job_launcher.process_launcher import ProcessJobLauncher
from nvflare.private.fed.utils.fed_utils import add_custom_dir_to_path


class ServerProcessJobLauncher(ProcessJobLauncher):
def get_command(self, launch_data, fl_ctx) -> (str, dict):
new_env = os.environ.copy()

workspace_obj: Workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT)
args = fl_ctx.get_prop(FLContextKey.ARGS)
server = fl_ctx.get_prop(FLContextKey.SITE_OBJ)
job_id = launch_data.get(JobConstants.JOB_ID)
restore_snapshot = fl_ctx.get_prop(FLContextKey.SNAPSHOT, False)

app_root = workspace_obj.get_app_dir(job_id)
cell = server.cell
server_state = server.server_state

app_custom_folder = workspace_obj.get_app_custom_dir(job_id)
if app_custom_folder != "":
add_custom_dir_to_path(app_custom_folder, new_env)

command_options = ""
for t in args.set:
command_options += " " + t

command = (
sys.executable
+ " -m nvflare.private.fed.app.server.runner_process -m "
+ args.workspace
+ " -s fed_server.json -r "
+ app_root
+ " -n "
+ str(job_id)
+ " -p "
+ str(cell.get_internal_listener_url())
+ " -u "
+ str(cell.get_root_url_for_child())
+ " --host "
+ str(server_state.host)
+ " --port "
+ str(server_state.service_port)
+ " --ssid "
+ str(server_state.ssid)
+ " --ha_mode "
+ str(server.ha_mode)
+ " --set"
+ command_options
+ " print_conf=True restore_snapshot="
+ str(restore_snapshot)
)

return command, new_env
129 changes: 104 additions & 25 deletions nvflare/app_opt/job_launcher/k8s_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.
import logging
import time
from abc import abstractmethod
from enum import Enum

from kubernetes import config
Expand Down Expand Up @@ -82,7 +83,7 @@ def __init__(self, job_id: str, api_instance: core_v1_api, job_config: dict, nam
"imagePullPolicy": "Always",
}
]
self.container_args_python_args_list = ["-u", "-m", "nvflare.private.fed.app.client.worker_process"]
self.container_args_python_args_list = ["-u", "-m", job_config.get("command")]
self.container_args_module_args_dict = {
"-m": None,
"-w": None,
Expand Down Expand Up @@ -218,39 +219,19 @@ def __init__(

def launch_job(self, job_meta: dict, fl_ctx: FLContext) -> JobHandleSpec:

workspace_obj: Workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT)
args = fl_ctx.get_prop(FLContextKey.ARGS)
client = fl_ctx.get_prop(FLContextKey.SITE_OBJ)
job_id = job_meta.get(JobConstants.JOB_ID)
server_config = fl_ctx.get_prop(FLContextKey.SERVER_CONFIG)
if not server_config:
raise RuntimeError(f"missing {FLContextKey.SERVER_CONFIG} in FL context")
service = server_config[0].get("service", {})
if not isinstance(service, dict):
raise RuntimeError(f"expect server config data to be dict but got {type(service)}")

self.logger.info(f"K8sJobLauncher start to launch job: {job_id} for client: {client.client_name}")
args = fl_ctx.get_prop(FLContextKey.ARGS)
job_image = extract_job_image(job_meta, fl_ctx.get_identity_name())
self.logger.info(f"launch job use image: {job_image}")
job_config = {
"name": job_id,
"image": job_image,
"container_name": f"container-{job_id}",
"command": self.get_command(),
"volume_mount_list": [{"name": self.workspace, "mountPath": self.mount_path}],
"volume_list": [{"name": self.workspace, "hostPath": {"path": self.root_hostpath, "type": "Directory"}}],
"module_args": {
"-m": args.workspace,
"-w": (workspace_obj.get_startup_kit_dir()),
"-t": client.token,
"-d": client.ssid,
"-n": job_id,
"-c": client.client_name,
"-p": "tcp://parent-pod:8004",
"-g": service.get("target"),
"-scheme": service.get("scheme", "grpc"),
"-s": "fed_client.json",
},
"set_list": args.set,
"module_args": self.get_module_args(job_id, fl_ctx),
"set_list": self.get_set_list(args, fl_ctx),
}

self.logger.info(f"launch job with k8s_launcher. Job_id:{job_id}")
Expand All @@ -273,3 +254,101 @@ def handle_event(self, event_type: str, fl_ctx: FLContext):
job_image = extract_job_image(job_meta, fl_ctx.get_identity_name())
if job_image:
add_launcher(self, fl_ctx)

@abstractmethod
def get_command(self):
"""To get the run command of the launcher

Returns: the command for the launcher process

"""
pass

@abstractmethod
def get_module_args(self, job_id, fl_ctx: FLContext):
"""To get the args to run the launcher

Args:
job_id: run job_id
fl_ctx: FLContext

Returns:

"""
pass

@abstractmethod
def get_set_list(self, args, fl_ctx: FLContext):
"""To get the command set_list

Args:
args: command args
fl_ctx: FLContext

Returns: set_list command options

"""
pass


class ClientK8sJobLauncher(K8sJobLauncher):
def get_command(self):
return "nvflare.private.fed.app.client.worker_process"

def get_module_args(self, job_id, fl_ctx: FLContext):
workspace_obj: Workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT)
args = fl_ctx.get_prop(FLContextKey.ARGS)
client = fl_ctx.get_prop(FLContextKey.SITE_OBJ)
server_config = fl_ctx.get_prop(FLContextKey.SERVER_CONFIG)
if not server_config:
raise RuntimeError(f"missing {FLContextKey.SERVER_CONFIG} in FL context")
service = server_config[0].get("service", {})
if not isinstance(service, dict):
raise RuntimeError(f"expect server config data to be dict but got {type(service)}")
self.logger.info(f"K8sJobLauncher start to launch job: {job_id} for client: {client.client_name}")

return {
"-m": args.workspace,
"-w": (workspace_obj.get_startup_kit_dir()),
"-t": client.token,
"-d": client.ssid,
"-n": job_id,
"-c": client.client_name,
"-p": str(client.cell.get_internal_listener_url()),
"-g": service.get("target"),
"-scheme": service.get("scheme", "grpc"),
"-s": "fed_client.json",
}

def get_set_list(self, args, fl_ctx: FLContext):
args.set.append("print_conf=True")
return args.set


class ServerK8sJobLauncher(K8sJobLauncher):
def get_command(self):
return "nvflare.private.fed.app.server.runner_process"

def get_module_args(self, job_id, fl_ctx: FLContext):
workspace_obj: Workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT)
args = fl_ctx.get_prop(FLContextKey.ARGS)
server = fl_ctx.get_prop(FLContextKey.SITE_OBJ)

return {
"-m": args.workspace,
"-s": "fed_server.json",
"-r": workspace_obj.get_app_dir(),
"-n": str(job_id),
"-p": str(server.cell.get_internal_listener_url()),
"-u": str(server.cell.get_root_url_for_child()),
"--host": str(server.server_state.host),
"--port": str(server.server_state.service_port),
"--ssid": str(server.server_state.ssid),
"--ha_mode": str(server.ha_mode),
}

def get_set_list(self, args, fl_ctx: FLContext):
restore_snapshot = fl_ctx.get_prop(FLContextKey.SNAPSHOT, False)
args.set.append("print_conf=True")
args.set.append("restore_snapshot=" + str(restore_snapshot))
return args.set
Loading
Loading