Skip to content

Commit

Permalink
dev
Browse files Browse the repository at this point in the history
  • Loading branch information
yanchengnv committed Feb 10, 2025
1 parent e9c5749 commit 6f30478
Show file tree
Hide file tree
Showing 11 changed files with 282 additions and 90 deletions.
7 changes: 6 additions & 1 deletion nvflare/app_common/tie/cli_applet.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ def stop(self, timeout=0.0) -> int:
if not mgr:
raise RuntimeError("no process manager to stop")

self.logger.info(f"stopping applet: {timeout=}")

if timeout > 0:
# wait for the applet to stop by itself
start = time.time()
Expand All @@ -84,10 +86,13 @@ def stop(self, timeout=0.0) -> int:
if rc is not None:
# already stopped
self.logger.info(f"applet stopped ({rc=}) after {time.time() - start} seconds")
break
return rc
time.sleep(0.1)

rc = mgr.stop()

self.logger.info(f"applet stopped: {rc=}")

if rc is None:
self.logger.warning(f"killed the applet process after waiting {timeout} seconds")
return -9
Expand Down
11 changes: 6 additions & 5 deletions nvflare/app_common/tie/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from nvflare.app_common.tie.applet import Applet
from nvflare.app_common.tie.defs import Constant
from nvflare.fuel.f3.cellnet.fqcn import FQCN
from nvflare.fuel.utils.validation_utils import check_object_type
from nvflare.fuel.utils.validation_utils import check_object_type, check_positive_number


class Connector(ABC, FLComponent):
Expand All @@ -35,9 +35,11 @@ class Connector(ABC, FLComponent):
The Connector class defines commonly required methods for all Connector implementations.
"""

def __init__(self):
def __init__(self, monitor_interval: float = 0.5):
"""Constructor of Connector"""
FLComponent.__init__(self)
check_positive_number("monitor_interval", monitor_interval)
self.monitor_interval = monitor_interval
self.abort_signal = None
self.applet = None
self.engine = None
Expand Down Expand Up @@ -138,16 +140,15 @@ def _monitor(self, fl_ctx: FLContext, connector_stopped_cb):
while True:
if self.abort_signal.triggered:
# asked to abort
self.stop(fl_ctx)
return
break

stopped, rc = self._is_stopped()
if stopped:
# connector already stopped - notify the caller
connector_stopped_cb(rc, fl_ctx)
return

time.sleep(0.1)
time.sleep(self.monitor_interval)

def monitor(self, fl_ctx: FLContext, connector_stopped_cb):
"""Called by Controller/Executor to monitor the health of the connector.
Expand Down
17 changes: 6 additions & 11 deletions nvflare/app_common/tie/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,18 +216,13 @@ def start_controller(self, fl_ctx: FLContext):
def _trigger_stop(self, fl_ctx: FLContext, error=None):
# first trigger the abort_signal to tell all components (mainly the controller's control_flow and connector)
# that check this signal to abort.
if self.abort_signal:
self.abort_signal.trigger(value=True)
self.abort_signal.trigger(value=True)

# if there is error, call system_panic to terminate the job with proper status.
# if no error, the job will end normally.
if error:
self.system_panic(reason=error, fl_ctx=fl_ctx)

def _is_stopped(self):
# check whether the abort signal is triggered
return self.abort_signal and self.abort_signal.triggered

def _update_client_status(self, fl_ctx: FLContext, op=None, client_done=False):
"""Update the status of the requesting client.
Expand Down Expand Up @@ -303,7 +298,7 @@ def _handle_app_request(self, topic: str, request: Shareable, fl_ctx: FLContext)
"""
self.log_debug(fl_ctx, f"_handle_app_request {topic}")
op = request.get_header(Constant.MSG_KEY_OP)
if self._is_stopped():
if self.abort_signal and self.abort_signal.triggered:
self.log_warning(fl_ctx, f"dropped app request ({op=}) since server is already stopped")
return make_reply(ReturnCode.SERVICE_UNAVAILABLE)

Expand All @@ -317,7 +312,7 @@ def _handle_app_request(self, topic: str, request: Shareable, fl_ctx: FLContext)
self._trigger_stop(fl_ctx, process_error)
return make_reply(ReturnCode.EXECUTION_EXCEPTION)

self.log_info(fl_ctx, f"received reply for app request '{op=}'")
self.log_debug(fl_ctx, f"received reply for app request '{op=}'")
reply.set_header(Constant.MSG_KEY_OP, op)
return reply

Expand Down Expand Up @@ -458,7 +453,7 @@ def control_flow(self, abort_signal: Signal, fl_ctx: FLContext):
# monitor client health
# we periodically check job status until all clients are done or the system is stopped
self.log_info(fl_ctx, "Waiting for clients to finish ...")
while not self._is_stopped():
while not abort_signal.triggered:
done = self._check_job_status(fl_ctx)
if done:
break
Expand All @@ -468,8 +463,8 @@ def _app_stopped(self, rc, fl_ctx: FLContext):
# This CB is called when app server is stopped
error = None
if rc != 0:
self.log_error(fl_ctx, f"App Server stopped abnormally with code {rc}")
error = "App server abnormal stop"
error = f"App server abnormally stopped: {rc=}"
self.log_error(fl_ctx, error)

# the app server could stop at any moment, we trigger the abort_signal in case it is checked by any
# other components
Expand Down
1 change: 1 addition & 0 deletions nvflare/app_common/tie/defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,6 @@ class Constant:

EXIT_CODE_CANT_START = 101
EXIT_CODE_FATAL_ERROR = 102
EXIT_CODE_FAILED = 103

APP_CTX_FL_CONTEXT = "tie.fl_context"
3 changes: 3 additions & 0 deletions nvflare/app_common/tie/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ def handle_event(self, event_type: str, fl_ctx: FLContext):
self._notify_client_done(Constant.EXIT_CODE_FATAL_ERROR, fl_ctx)
elif event_type == EventType.END_RUN:
self.abort_signal.trigger(True)
if self.connector:
self.logger.info(f"stopping connector {type(self.connector)}")
self.connector.stop(fl_ctx)

def execute(self, task_name: str, shareable: Shareable, fl_ctx: FLContext, abort_signal: Signal) -> Shareable:
if task_name == self.configure_task_name:
Expand Down
32 changes: 31 additions & 1 deletion nvflare/app_common/tie/process_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,20 +175,24 @@ def stop(self) -> int:
Returns: the exit code of the process. If killed, returns -9.
"""
self.logger.info(f"stopping process: {self.cmd_desc.cmd}")
rc = self.poll()
if rc is None:
# process is still alive
try:
self.logger.info(f"still running - killing process: {self.cmd_desc.cmd}")
self.process.kill()
rc = -9
except:
# ignore kill error
pass
else:
self.logger.info(f"already done: {rc=}")

# close the log file if any
with self.file_lock:
if self.log_file:
self.logger.debug("closed subprocess log file!")
self.logger.info("closed subprocess log file!")
self.log_file.close()
self.log_file = None
return rc
Expand All @@ -207,3 +211,29 @@ def start_process(cmd_desc: CommandDescriptor, fl_ctx: FLContext) -> ProcessMana
mgr = ProcessManager(cmd_desc)
mgr.start(fl_ctx)
return mgr


def run_command(cmd_desc: CommandDescriptor) -> str:
env = os.environ.copy()
if cmd_desc.env:
env.update(cmd_desc.env)

command_seq = shlex.split(cmd_desc.cmd)
p = subprocess.Popen(
command_seq,
stderr=subprocess.STDOUT,
cwd=cmd_desc.cwd,
env=env,
stdout=subprocess.PIPE,
)

output = []
while True:
line = p.stdout.readline()
if not line:
break

assert isinstance(line, bytes)
line = line.decode("utf-8")
output.append(line)
return "".join(output)
Loading

0 comments on commit 6f30478

Please sign in to comment.