diff --git a/nvflare/app_common/tie/cli_applet.py b/nvflare/app_common/tie/cli_applet.py index ae9392b74e..e54b184e2c 100644 --- a/nvflare/app_common/tie/cli_applet.py +++ b/nvflare/app_common/tie/cli_applet.py @@ -76,6 +76,8 @@ def stop(self, timeout=0.0) -> int: if not mgr: raise RuntimeError("no process manager to stop") + self.logger.info(f"stopping applet: {timeout=}") + if timeout > 0: # wait for the applet to stop by itself start = time.time() @@ -84,10 +86,13 @@ def stop(self, timeout=0.0) -> int: if rc is not None: # already stopped self.logger.info(f"applet stopped ({rc=}) after {time.time() - start} seconds") - break + return rc time.sleep(0.1) rc = mgr.stop() + + self.logger.info(f"applet stopped: {rc=}") + if rc is None: self.logger.warning(f"killed the applet process after waiting {timeout} seconds") return -9 diff --git a/nvflare/app_common/tie/connector.py b/nvflare/app_common/tie/connector.py index 8afa86aedb..e319bd0af2 100644 --- a/nvflare/app_common/tie/connector.py +++ b/nvflare/app_common/tie/connector.py @@ -25,7 +25,7 @@ from nvflare.app_common.tie.applet import Applet from nvflare.app_common.tie.defs import Constant from nvflare.fuel.f3.cellnet.fqcn import FQCN -from nvflare.fuel.utils.validation_utils import check_object_type +from nvflare.fuel.utils.validation_utils import check_object_type, check_positive_number class Connector(ABC, FLComponent): @@ -35,9 +35,11 @@ class Connector(ABC, FLComponent): The Connector class defines commonly required methods for all Connector implementations. """ - def __init__(self): + def __init__(self, monitor_interval: float = 0.5): """Constructor of Connector""" FLComponent.__init__(self) + check_positive_number("monitor_interval", monitor_interval) + self.monitor_interval = monitor_interval self.abort_signal = None self.applet = None self.engine = None @@ -138,8 +140,7 @@ def _monitor(self, fl_ctx: FLContext, connector_stopped_cb): while True: if self.abort_signal.triggered: # asked to abort - self.stop(fl_ctx) - return + break stopped, rc = self._is_stopped() if stopped: @@ -147,7 +148,7 @@ def _monitor(self, fl_ctx: FLContext, connector_stopped_cb): connector_stopped_cb(rc, fl_ctx) return - time.sleep(0.1) + time.sleep(self.monitor_interval) def monitor(self, fl_ctx: FLContext, connector_stopped_cb): """Called by Controller/Executor to monitor the health of the connector. diff --git a/nvflare/app_common/tie/controller.py b/nvflare/app_common/tie/controller.py index 0ebb39b93e..25b085d1af 100644 --- a/nvflare/app_common/tie/controller.py +++ b/nvflare/app_common/tie/controller.py @@ -216,18 +216,13 @@ def start_controller(self, fl_ctx: FLContext): def _trigger_stop(self, fl_ctx: FLContext, error=None): # first trigger the abort_signal to tell all components (mainly the controller's control_flow and connector) # that check this signal to abort. - if self.abort_signal: - self.abort_signal.trigger(value=True) + self.abort_signal.trigger(value=True) # if there is error, call system_panic to terminate the job with proper status. # if no error, the job will end normally. if error: self.system_panic(reason=error, fl_ctx=fl_ctx) - def _is_stopped(self): - # check whether the abort signal is triggered - return self.abort_signal and self.abort_signal.triggered - def _update_client_status(self, fl_ctx: FLContext, op=None, client_done=False): """Update the status of the requesting client. @@ -303,7 +298,7 @@ def _handle_app_request(self, topic: str, request: Shareable, fl_ctx: FLContext) """ self.log_debug(fl_ctx, f"_handle_app_request {topic}") op = request.get_header(Constant.MSG_KEY_OP) - if self._is_stopped(): + if self.abort_signal and self.abort_signal.triggered: self.log_warning(fl_ctx, f"dropped app request ({op=}) since server is already stopped") return make_reply(ReturnCode.SERVICE_UNAVAILABLE) @@ -317,7 +312,7 @@ def _handle_app_request(self, topic: str, request: Shareable, fl_ctx: FLContext) self._trigger_stop(fl_ctx, process_error) return make_reply(ReturnCode.EXECUTION_EXCEPTION) - self.log_info(fl_ctx, f"received reply for app request '{op=}'") + self.log_debug(fl_ctx, f"received reply for app request '{op=}'") reply.set_header(Constant.MSG_KEY_OP, op) return reply @@ -458,7 +453,7 @@ def control_flow(self, abort_signal: Signal, fl_ctx: FLContext): # monitor client health # we periodically check job status until all clients are done or the system is stopped self.log_info(fl_ctx, "Waiting for clients to finish ...") - while not self._is_stopped(): + while not abort_signal.triggered: done = self._check_job_status(fl_ctx) if done: break @@ -468,8 +463,8 @@ def _app_stopped(self, rc, fl_ctx: FLContext): # This CB is called when app server is stopped error = None if rc != 0: - self.log_error(fl_ctx, f"App Server stopped abnormally with code {rc}") - error = "App server abnormal stop" + error = f"App server abnormally stopped: {rc=}" + self.log_error(fl_ctx, error) # the app server could stop at any moment, we trigger the abort_signal in case it is checked by any # other components diff --git a/nvflare/app_common/tie/defs.py b/nvflare/app_common/tie/defs.py index b06bb69042..3274a8acf2 100644 --- a/nvflare/app_common/tie/defs.py +++ b/nvflare/app_common/tie/defs.py @@ -39,5 +39,6 @@ class Constant: EXIT_CODE_CANT_START = 101 EXIT_CODE_FATAL_ERROR = 102 + EXIT_CODE_FAILED = 103 APP_CTX_FL_CONTEXT = "tie.fl_context" diff --git a/nvflare/app_common/tie/executor.py b/nvflare/app_common/tie/executor.py index f40bca9898..d86ebc4f93 100644 --- a/nvflare/app_common/tie/executor.py +++ b/nvflare/app_common/tie/executor.py @@ -138,6 +138,9 @@ def handle_event(self, event_type: str, fl_ctx: FLContext): self._notify_client_done(Constant.EXIT_CODE_FATAL_ERROR, fl_ctx) elif event_type == EventType.END_RUN: self.abort_signal.trigger(True) + if self.connector: + self.logger.info(f"stopping connector {type(self.connector)}") + self.connector.stop(fl_ctx) def execute(self, task_name: str, shareable: Shareable, fl_ctx: FLContext, abort_signal: Signal) -> Shareable: if task_name == self.configure_task_name: diff --git a/nvflare/app_common/tie/process_mgr.py b/nvflare/app_common/tie/process_mgr.py index 2feb29e97b..98bd584a2f 100644 --- a/nvflare/app_common/tie/process_mgr.py +++ b/nvflare/app_common/tie/process_mgr.py @@ -175,20 +175,24 @@ def stop(self) -> int: Returns: the exit code of the process. If killed, returns -9. """ + self.logger.info(f"stopping process: {self.cmd_desc.cmd}") rc = self.poll() if rc is None: # process is still alive try: + self.logger.info(f"still running - killing process: {self.cmd_desc.cmd}") self.process.kill() rc = -9 except: # ignore kill error pass + else: + self.logger.info(f"already done: {rc=}") # close the log file if any with self.file_lock: if self.log_file: - self.logger.debug("closed subprocess log file!") + self.logger.info("closed subprocess log file!") self.log_file.close() self.log_file = None return rc @@ -207,3 +211,29 @@ def start_process(cmd_desc: CommandDescriptor, fl_ctx: FLContext) -> ProcessMana mgr = ProcessManager(cmd_desc) mgr.start(fl_ctx) return mgr + + +def run_command(cmd_desc: CommandDescriptor) -> str: + env = os.environ.copy() + if cmd_desc.env: + env.update(cmd_desc.env) + + command_seq = shlex.split(cmd_desc.cmd) + p = subprocess.Popen( + command_seq, + stderr=subprocess.STDOUT, + cwd=cmd_desc.cwd, + env=env, + stdout=subprocess.PIPE, + ) + + output = [] + while True: + line = p.stdout.readline() + if not line: + break + + assert isinstance(line, bytes) + line = line.decode("utf-8") + output.append(line) + return "".join(output) diff --git a/nvflare/app_opt/flower/applet.py b/nvflare/app_opt/flower/applet.py index 137f8b451e..eab9a11068 100644 --- a/nvflare/app_opt/flower/applet.py +++ b/nvflare/app_opt/flower/applet.py @@ -11,17 +11,21 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import os +import json +import os.path +import threading import time +import tomli +import tomli_w + from nvflare.apis.fl_context import FLContext from nvflare.apis.workspace import Workspace from nvflare.app_common.tie.applet import Applet from nvflare.app_common.tie.cli_applet import CLIApplet from nvflare.app_common.tie.defs import Constant as TieConstant -from nvflare.app_common.tie.process_mgr import CommandDescriptor, ProcessManager, start_process +from nvflare.app_common.tie.process_mgr import CommandDescriptor, ProcessManager, run_command, start_process from nvflare.app_opt.flower.defs import Constant -from nvflare.fuel.f3.drivers.net_utils import get_open_tcp_port from nvflare.fuel.utils.grpc_utils import create_channel from nvflare.security.logging import secure_format_exception @@ -43,7 +47,8 @@ def get_command(self, ctx: dict) -> CommandDescriptor: Returns: CLI command for starting client app and name of log file. """ - addr = ctx.get(Constant.APP_CTX_SERVER_ADDR) + superlink_addr = ctx.get(Constant.APP_CTX_SUPERLINK_ADDR) + clientapp_api_addr = ctx.get(Constant.APP_CTX_CLIENTAPP_API_ADDR) fl_ctx = ctx.get(Constant.APP_CTX_FL_CONTEXT) if not isinstance(fl_ctx, FLContext): self.logger.error(f"expect APP_CTX_FL_CONTEXT to be FLContext but got {type(fl_ctx)}") @@ -56,11 +61,19 @@ def get_command(self, ctx: dict) -> CommandDescriptor: raise RuntimeError("invalid workspace") job_id = fl_ctx.get_job_id() - custom_dir = ws.get_app_custom_dir(job_id) app_dir = ws.get_app_dir(job_id) - if not os.path.isabs(custom_dir): - custom_dir = os.path.relpath(custom_dir, app_dir) - cmd = f"flower-supernode --insecure --grpc-adapter --superlink {addr} {custom_dir}" + + """ Example: + flower-supernode --insecure --grpc-adapter + --superlink 127.0.0.1:9092 + --clientappio-api-address 127.0.0.1:9094 + """ + + cmd = ( + f"flower-supernode --insecure --grpc-adapter " + f"--superlink {superlink_addr} " + f"--clientappio-api-address {clientapp_api_addr}" + ) # use app_dir as the cwd for flower's client app. # this is necessary for client_api to be used with the flower client app for metrics logging @@ -76,22 +89,30 @@ def __init__( self, database: str, superlink_ready_timeout: float, - server_app_args: list = None, + superlink_grace_period=1.0, ): """Constructor of FlowerServerApplet. Args: database: database spec to be used by the server app superlink_ready_timeout: how long to wait for the superlink process to become ready - server_app_args: an optional list that contains additional command args passed to flower server app + superlink_grace_period: how long to wait before stopping superlink after stopping the app """ Applet.__init__(self) - self._app_process_mgr = None self._superlink_process_mgr = None self.database = database self.superlink_ready_timeout = superlink_ready_timeout - self.server_app_args = server_app_args + self.superlink_grace_period = superlink_grace_period + self.run_id = None + self.last_check_status = None + self.last_check_time = None + self.last_check_stopped = False + self.flower_app_dir = None + self.flower_run_finished = False + self.flower_run_stopped = False # have we issued 'flwr stop'? + self.flower_run_rc = None self._start_error = False + self.stop_lock = threading.Lock() def _start_process(self, name: str, cmd_desc: CommandDescriptor, fl_ctx: FLContext) -> ProcessManager: self.logger.info(f"starting {name}: {cmd_desc.cmd}") @@ -101,6 +122,56 @@ def _start_process(self, name: str, cmd_desc: CommandDescriptor, fl_ctx: FLConte self.logger.error(f"exception starting applet: {secure_format_exception(ex)}") self._start_error = True + def _modify_flower_app_config(self, exec_api_addr: str): + """Currently the exec-api-address must be specified in pyproject.toml to be able to submit to the + superlink with "flwr run" command. + + Args: + exec_api_addr: + + Returns: + + """ + config_file = os.path.join(self.flower_app_dir, "pyproject.toml") + if not os.path.isfile(config_file): + raise RuntimeError(f"invalid flower app: missing {config_file}") + + with open(config_file, mode="rb") as fp: + config = tomli.load(fp) + + # add or modify address + tool = config.get("tool") + if not tool: + tool = {} + config["tool"] = tool + + flwr = tool.get("flwr") + if not flwr: + flwr = {} + tool["flwr"] = flwr + + fed = flwr.get("federations") + if not fed: + fed = {} + flwr["federations"] = fed + + default_mode = fed.get("default") + if not default_mode: + default_mode = "local-poc" + fed["default"] = default_mode + + mode_config = fed.get(default_mode) + if not mode_config: + mode_config = {} + fed[default_mode] = mode_config + + mode_config["address"] = exec_api_addr + mode_config["insecure"] = True + + # recreate the app config + with open(config_file, mode="wb") as fp: + tomli_w.dump(config, fp) + def start(self, app_ctx: dict): """Start the applet. @@ -119,12 +190,9 @@ def start(self, app_ctx: dict): """ # try to start superlink first - driver_port = get_open_tcp_port(resources={}) - if not driver_port: - raise RuntimeError("failed to get a port for Flower driver") - driver_addr = f"127.0.0.1:{driver_port}" - - server_addr = app_ctx.get(Constant.APP_CTX_SERVER_ADDR) + serverapp_api_addr = app_ctx.get(Constant.APP_CTX_SERVERAPP_API_ADDR) + fleet_api_addr = app_ctx.get(Constant.APP_CTX_FLEET_API_ADDR) + exec_api_addr = app_ctx.get(Constant.APP_CTX_EXEC_API_ADDR) fl_ctx = app_ctx.get(Constant.APP_CTX_FL_CONTEXT) if not isinstance(fl_ctx, FLContext): self.logger.error(f"expect APP_CTX_FL_CONTEXT to be FLContext but got {type(fl_ctx)}") @@ -137,15 +205,25 @@ def start(self, app_ctx: dict): raise RuntimeError("invalid workspace") custom_dir = ws.get_app_custom_dir(fl_ctx.get_job_id()) + self.flower_app_dir = custom_dir + + self._modify_flower_app_config(exec_api_addr) db_arg = "" if self.database: db_arg = f"--database {self.database}" + """ Example: + flower-superlink --insecure --fleet-api-type grpc-adapter + --serverappio-api-address 127.0.0.1:9091 + --fleet-api-address 127.0.0.1:9092 + --exec-api-address 127.0.0.1:9093 + """ superlink_cmd = ( f"flower-superlink --insecure --fleet-api-type grpc-adapter {db_arg} " - f"--fleet-api-address {server_addr} " - f"--driver-api-address {driver_addr}" + f"--serverappio-api-address {serverapp_api_addr} " + f"--fleet-api-address {fleet_api_addr} " + f"--exec-api-address {exec_api_addr}" ) cmd_desc = CommandDescriptor(cmd=superlink_cmd, log_file_name="superlink_log.txt", stdout_msg_prefix="FLWR-SL") @@ -154,35 +232,57 @@ def start(self, app_ctx: dict): if not self._superlink_process_mgr: raise RuntimeError("cannot start superlink process") - # wait until superlink's port is ready before starting server app - # note: the server app will connect to driver_addr, not server_addr + # wait until superlink's fleet_api_addr is ready before starting server app + # note: the server app will connect to serverapp_api_addr, not fleet_api_addr start_time = time.time() create_channel( - server_addr=driver_addr, + server_addr=fleet_api_addr, grpc_options=None, ready_timeout=self.superlink_ready_timeout, test_only=True, ) self.logger.info(f"superlink is ready for server app in {time.time() - start_time} seconds") - # start the server app - args_str = "" - if self.server_app_args: - args_str = " ".join(self.server_app_args) + # submitting the server app using "flwr run" command + # flwr_run_cmd = f"flwr run --format json -c 'address={exec_api_addr}' {custom_dir}" + flwr_run_cmd = f"flwr run --format json {self.flower_app_dir}" + run_info = self._run_flower_command(flwr_run_cmd) + run_id = run_info.get("run-id") + if not run_id: + raise RuntimeError(f"invalid result from command '{flwr_run_cmd}': missing run-id") + + self.logger.info(f"submitted Flower App and got run id {run_id}") + self.run_id = run_id + + def _run_flower_command(self, command: str): + self.logger.info(f"running flower command: {command}") + cmd_desc = CommandDescriptor(cmd=command) + reply = run_command(cmd_desc) + if not isinstance(reply, str): + raise RuntimeError(f"failed to run command '{command}': expect reply to be str but got {type(reply)}") + + self.logger.info(f"flower command {command}: {reply=}") + # the reply must be a json str + try: + result = json.loads(reply) + except Exception as ex: + err = f"invalid result from command '{command}': {secure_format_exception(ex)}" + self.logger.error(err) + raise RuntimeError(err) + + if not isinstance(result, dict): + err = f"invalid result from command '{command}': expect dict but got {type(result)}" + self.logger.error(err) + raise RuntimeError(err) - app_cmd = f"flower-server-app --insecure --superlink {driver_addr} {args_str} {custom_dir}" - cmd_desc = CommandDescriptor( - cmd=app_cmd, - log_file_name="server_app_log.txt", - stdout_msg_prefix="FLWR-SA", - ) + success = result.get("success", False) + if not success: + err = f"failed command '{command}': {success=}" + self.logger.error(err) + raise RuntimeError(err) - self._app_process_mgr = self._start_process(name="server_app", cmd_desc=cmd_desc, fl_ctx=fl_ctx) - if not self._app_process_mgr: - # stop the superlink - self._superlink_process_mgr.stop() - self._superlink_process_mgr = None - raise RuntimeError("cannot start server_app process") + self.logger.info(f"result of {command}: {result}") + return result @staticmethod def _stop_process(p: ProcessManager) -> int: @@ -193,7 +293,7 @@ def _stop_process(p: ProcessManager) -> int: return p.stop() def stop(self, timeout=0.0) -> int: - """Stop the server applet's superlink and server app processes. + """Stop the server applet's superlink. Args: timeout: how long to wait before forcefully stopping (kill) the process. @@ -203,14 +303,25 @@ def stop(self, timeout=0.0) -> int: Returns: """ - rc = self._stop_process(self._app_process_mgr) - self._app_process_mgr = None + with self.stop_lock: + if self.run_id and not self.flower_run_stopped and not self.flower_run_finished: + # stop the server app + # we may not be able to issue 'flwr stop' more than once! + self.flower_run_stopped = True + flwr_stop_cmd = f"flwr stop --format json {self.run_id} {self.flower_app_dir}" + try: + self._run_flower_command(flwr_stop_cmd) + except Exception as ex: + # ignore exception + self.logger.error(f"exception running '{flwr_stop_cmd}': {secure_format_exception(ex)}") + + # wait a while to let superlink and supernodes gracefully stop the app + time.sleep(self.superlink_grace_period) - self._stop_process(self._superlink_process_mgr) - self._superlink_process_mgr = None - - # return the rc of the server app! - return rc + # stop the superlink + self._stop_process(self._superlink_process_mgr) + self._superlink_process_mgr = None + return 0 @staticmethod def _is_process_stopped(p: ProcessManager): @@ -223,6 +334,50 @@ def _is_process_stopped(p: ProcessManager): else: return True, 0 + def _check_flower_run_status(self): + # check whether the app is finished + flwr_ls_cmd = f"flwr ls --format json {self.flower_app_dir}" + try: + run_info = self._run_flower_command(flwr_ls_cmd) + except Exception as ex: + self.logger.error(f"exception running '{flwr_ls_cmd}': {secure_format_exception(ex)}") + return True, TieConstant.EXIT_CODE_FATAL_ERROR + + runs = run_info.get("runs") + if not runs: + # the app is no longer there + return True, 0 + + if not isinstance(runs, list): + self.logger.error(f"invalid result from command '{flwr_ls_cmd}': expect run list but got {type(runs)}") + return True, TieConstant.EXIT_CODE_FATAL_ERROR + + run = runs[0] + if not isinstance(run, dict): + self.logger.error(f"invalid result from command '{flwr_ls_cmd}': expect run to be dict but got {type(run)}") + return True, TieConstant.EXIT_CODE_FATAL_ERROR + + status = run.get("status") + if not status: + self.logger.error(f"invalid result from command '{flwr_ls_cmd}': missing status from {run}") + return True, TieConstant.EXIT_CODE_FATAL_ERROR + + if not isinstance(status, str): + self.logger.error(f"invalid result from command '{flwr_ls_cmd}': bad status value '{status}'") + return True, TieConstant.EXIT_CODE_FATAL_ERROR + + if status.startswith("finished"): + self.logger.info(f"Flower Run {self.run_id} finished: {status=}") + self.flower_run_finished = True + if status.endswith("completed"): + rc = 0 + else: + rc = TieConstant.EXIT_CODE_FAILED + self.flower_run_rc = rc + return True, rc + else: + return False, 0 + def is_stopped(self) -> (bool, int): """Check whether the server applet is already stopped @@ -234,21 +389,17 @@ def is_stopped(self) -> (bool, int): if self._start_error: return True, TieConstant.EXIT_CODE_CANT_START - # check server app - app_stopped, app_rc = self._is_process_stopped(self._app_process_mgr) - if app_stopped: - self._app_process_mgr = None - superlink_stopped, superlink_rc = self._is_process_stopped(self._superlink_process_mgr) if superlink_stopped: self._superlink_process_mgr = None + return True, superlink_rc - if app_stopped or superlink_stopped: - self.stop() + if self.flower_run_finished: + return True, self.flower_run_rc - if app_stopped: - return True, app_rc - elif superlink_stopped: - return True, superlink_rc - else: - return False, 0 + with self.stop_lock: + if not self.last_check_time or time.time() - self.last_check_time >= 2.0: + self.last_check_stopped, self.last_check_status = self._check_flower_run_status() + self.last_check_time = time.time() + + return self.last_check_stopped, self.last_check_status diff --git a/nvflare/app_opt/flower/controller.py b/nvflare/app_opt/flower/controller.py index 3739038258..9762cfc54f 100644 --- a/nvflare/app_opt/flower/controller.py +++ b/nvflare/app_opt/flower/controller.py @@ -17,7 +17,7 @@ from nvflare.app_common.tie.defs import Constant as TieConstant from nvflare.app_opt.flower.applet import FlowerServerApplet from nvflare.app_opt.flower.connectors.grpc_server_connector import GrpcServerConnector -from nvflare.fuel.utils.validation_utils import check_object_type, check_positive_number +from nvflare.fuel.utils.validation_utils import check_positive_number from .defs import Constant @@ -27,8 +27,9 @@ def __init__( self, num_rounds=1, database: str = "", - server_app_args: list = None, superlink_ready_timeout: float = 10.0, + superlink_grace_period: float = 2.0, + monitor_interval: float = 0.5, configure_task_name=TieConstant.CONFIG_TASK_NAME, configure_task_timeout=TieConstant.CONFIG_TASK_TIMEOUT, start_task_name=TieConstant.START_TASK_NAME, @@ -43,8 +44,8 @@ def __init__( Args: num_rounds: number of rounds. Not used in this version. database: database name - server_app_args: additional server app CLI args superlink_ready_timeout: how long to wait for the superlink to become ready before starting server app + superlink_grace_period: how long to wait before stopping superlink after stopping the app configure_task_name: name of the config task configure_task_timeout: max time allowed for config task to complete start_task_name: name of the start task @@ -66,26 +67,27 @@ def __init__( ) check_positive_number("superlink_ready_timeout", superlink_ready_timeout) - - if server_app_args: - check_object_type("server_app_args", server_app_args, list) + check_positive_number("superlink_grace_period", superlink_grace_period) + check_positive_number("monitor_interval", monitor_interval) self.num_rounds = num_rounds self.database = database - self.server_app_args = server_app_args + self.superlink_grace_period = superlink_grace_period self.superlink_ready_timeout = superlink_ready_timeout self.int_client_grpc_options = int_client_grpc_options + self.monitor_interval = monitor_interval def get_connector(self, fl_ctx: FLContext): return GrpcServerConnector( int_client_grpc_options=self.int_client_grpc_options, + monitor_interval=self.monitor_interval, ) def get_applet(self, fl_ctx: FLContext): return FlowerServerApplet( database=self.database, superlink_ready_timeout=self.superlink_ready_timeout, - server_app_args=self.server_app_args, + superlink_grace_period=self.superlink_grace_period, ) def get_client_config_params(self, fl_ctx: FLContext) -> dict: diff --git a/nvflare/app_opt/flower/defs.py b/nvflare/app_opt/flower/defs.py index f9011c8ee8..f7c55c2011 100644 --- a/nvflare/app_opt/flower/defs.py +++ b/nvflare/app_opt/flower/defs.py @@ -39,8 +39,13 @@ class Constant: MAX_CLIENT_OP_INTERVAL = TieConstant.MAX_CLIENT_OP_INTERVAL WORKFLOW_PROGRESS_TIMEOUT = TieConstant.WORKFLOW_PROGRESS_TIMEOUT - APP_CTX_SERVER_ADDR = "flower_server_addr" - APP_CTX_PORT = "flower_port" + APP_CTX_SERVERAPP_API_ADDR = "flower_serverapp_api_addr" + APP_CTX_FLEET_API_ADDR = "flower_fleet_api_addr" + APP_CTX_EXEC_API_ADDR = "flower_exec_api_addr" + + APP_CTX_CLIENTAPP_API_ADDR = "flower_clientapp_api_addr" + APP_CTX_SUPERLINK_ADDR = "flower_superlink_addr" + APP_CTX_CLIENT_NAME = "flower_client_name" APP_CTX_NUM_ROUNDS = "flower_num_rounds" APP_CTX_FL_CONTEXT = TieConstant.APP_CTX_FL_CONTEXT diff --git a/nvflare/app_opt/flower/flower_job.py b/nvflare/app_opt/flower/flower_job.py index adce76bc26..54848f7125 100644 --- a/nvflare/app_opt/flower/flower_job.py +++ b/nvflare/app_opt/flower/flower_job.py @@ -77,7 +77,6 @@ def __init__( controller = FlowerController( database=database, - server_app_args=server_app_args, superlink_ready_timeout=superlink_ready_timeout, configure_task_timeout=configure_task_timeout, start_task_timeout=start_task_timeout, diff --git a/nvflare/app_opt/flower/grpc_client.py b/nvflare/app_opt/flower/grpc_client.py index b1600d1844..621e11ffde 100644 --- a/nvflare/app_opt/flower/grpc_client.py +++ b/nvflare/app_opt/flower/grpc_client.py @@ -72,7 +72,7 @@ def send_request(self, request: pb2.MessageContainer): Returns: a pb2.MessageContainer object """ - self.logger.info(f"sending {len(request.grpc_message_content)} bytes: {request.grpc_message_name=}") + self.logger.debug(f"sending {len(request.grpc_message_content)} bytes: {request.grpc_message_name=}") try: result = self.stub.SendReceive(request) except Exception as ex: