From 2dad0bde7d8f8d6f451154c5f7e8ef454ba056fc Mon Sep 17 00:00:00 2001 From: Yan Cheng <58191769+yanchengnv@users.noreply.github.com> Date: Fri, 17 Nov 2023 15:37:36 -0500 Subject: [PATCH] Support sys vars for job config and support parameterized template in job config (#2145) * support sys vars for job config * address pr reviews * support os env vars used in job config * support os env vars used in job config --------- Co-authored-by: Chester Chen <512707+chesterxgchen@users.noreply.github.com> --- nvflare/apis/fl_constant.py | 19 ++++ nvflare/fuel/f3/cellnet/core_cell.py | 3 + nvflare/fuel/flare_api/flare_api.py | 44 +++++++- nvflare/fuel/hci/server/authz.py | 3 - nvflare/fuel/utils/config_service.py | 2 + nvflare/fuel/utils/wfconf.py | 104 +++++++++++++++++- nvflare/private/defs.py | 1 + .../private/fed/app/client/worker_process.py | 1 + nvflare/private/fed/client/client_engine.py | 9 +- nvflare/private/fed/client/client_executor.py | 6 + .../private/fed/client/client_json_config.py | 26 ++++- .../fed/client/client_req_processors.py | 3 +- nvflare/private/fed/client/sys_cmd.py | 28 +++++ nvflare/private/fed/server/cmd_utils.py | 4 +- .../private/fed/server/server_json_config.py | 16 +++ nvflare/private/fed/server/sys_cmd.py | 21 ++++ nvflare/private/fed_json_config.py | 11 +- nvflare/private/json_configer.py | 29 ++++- nvflare/security/security.py | 1 + 19 files changed, 304 insertions(+), 27 deletions(-) diff --git a/nvflare/apis/fl_constant.py b/nvflare/apis/fl_constant.py index fa8dbd9a7e..9e73faebe0 100644 --- a/nvflare/apis/fl_constant.py +++ b/nvflare/apis/fl_constant.py @@ -172,6 +172,7 @@ class FLContextKey(object): COMPONENT_NODE = "__component_node__" CONFIG_CTX = "__config_ctx__" FILTER_DIRECTION = "__filter_dir__" + ROOT_URL = "__root_url__" # the URL for accessing the FL Server class ReservedTopic(object): @@ -213,6 +214,7 @@ class AdminCommandNames(object): AUX_COMMAND = "aux_command" SYS_INFO = "sys_info" REPORT_RESOURCES = "report_resources" + REPORT_ENV = "report_env" SHOW_SCOPES = "show_scopes" CALL = "call" SHELL_PWD = "pwd" @@ -334,6 +336,8 @@ class SystemComponents(object): APP_DEPLOYER = "app_deployer" DEFAULT_APP_DEPLOYER = "default_app_deployer" JOB_META_VALIDATOR = "job_meta_validator" + FED_CLIENT = "fed_client" + RUN_MANAGER = "run_manager" class JobConstants: @@ -428,3 +432,18 @@ class ConfigVarName: RUNNER_SYNC_TIMEOUT = "runner_sync_timeout" MAX_RUNNER_SYNC_TRIES = "max_runner_sync_tries" + + +class SystemVarName: + """ + These vars are automatically generated by FLARE and can be referenced in job config (config_fed_client and + config_fed_server). For example, you can reference SITE_NAME as "{SITE_NAME}" in your config. + + To avoid potential conflict with user-defined var names, these var names are in UPPER CASE. + """ + + SITE_NAME = "SITE_NAME" # name of client site or server + WORKSPACE = "WORKSPACE" # directory of the workspace + JOB_ID = "JOB_ID" # Job ID + ROOT_URL = "ROOT_URL" # the URL of the Service Provider (server) + SECURE_MODE = "SECURE_MODE" # whether the system is running in secure mode diff --git a/nvflare/fuel/f3/cellnet/core_cell.py b/nvflare/fuel/f3/cellnet/core_cell.py index 0648f94033..2e4f9695a1 100644 --- a/nvflare/fuel/f3/cellnet/core_cell.py +++ b/nvflare/fuel/f3/cellnet/core_cell.py @@ -2053,3 +2053,6 @@ def _is_my_sub(self, candidate_info: FqcnInfo) -> int: if candidate_info.is_root and not candidate_info.is_on_server: return self.SUB_TYPE_CLIENT return self.SUB_TYPE_NONE + + def is_secure(self): + return self.secure diff --git a/nvflare/fuel/flare_api/flare_api.py b/nvflare/fuel/flare_api/flare_api.py index e3ba0f2b85..69843e7c61 100644 --- a/nvflare/fuel/flare_api/flare_api.py +++ b/nvflare/fuel/flare_api/flare_api.py @@ -158,11 +158,10 @@ def _do_command(self, command: str, enforce_meta=True): if not isinstance(result, dict): raise InternalError(f"result from server must be dict but got {type(result)}") - # check meta status first + # Check meta status if available + # There are still some commands that do not return meta. But for commands that do return meta, we will check + # its meta status first. meta = result.get(ResultKey.META, None) - if enforce_meta and not meta: - raise InternalError("missing meta from result") - if meta: if not isinstance(meta, dict): raise InternalError(f"meta must be dict but got {type(meta)}") @@ -194,6 +193,8 @@ def _do_command(self, command: str, enforce_meta=True): elif cmd_status != MetaStatusValue.OK: raise InternalError(f"{cmd_status}: {info}") + # Then check API Status. There are cases that a command does not return meta or ran into errors before + # setting meta. Even if the command does return meta, still need to make sure APIStatus is good. status = result.get(ResultKey.STATUS, None) if not status: raise InternalError("missing status in result") @@ -212,6 +213,10 @@ def _do_command(self, command: str, enforce_meta=True): details = result.get(ResultKey.DETAILS, "") raise RuntimeError(f"runtime error encountered: {status}: {details}") + if enforce_meta and not meta: + raise InternalError("missing meta from result") + + # both API Status and Meta are okay return result @staticmethod @@ -260,6 +265,7 @@ def submit_job(self, job_definition_path: str) -> str: if not os.path.isdir(job_definition_path): if os.path.isdir(os.path.join(self.upload_dir, job_definition_path)): job_definition_path = os.path.join(self.upload_dir, job_definition_path) + job_definition_path = os.path.abspath(job_definition_path) else: raise InvalidJobDefinition(f"job_definition_path '{job_definition_path}' is not a valid folder") @@ -802,6 +808,36 @@ def get_connected_client_list(self) -> List[ClientInfo]: sys_info = self.get_system_info() return sys_info.client_info + def get_client_env(self, client_names=None): + """Get running environment values for specified clients. The env includes values of client name, + workspace directory, root url of the FL server, and secure mode or not. + + These values can be used for 3rd-party system configuration (e.g. CellPipe to connect to the FLARE system). + + Args: + client_names: clients to get env from. None means all clients. + + Returns: list of env info for specified clients. + + Raises: InvalidTarget exception, if no clients are connected or an invalid client name is specified + + """ + if not client_names: + command = AdminCommandNames.REPORT_ENV + else: + if isinstance(client_names, str): + client_names = [client_names] + elif not isinstance(client_names, list): + raise ValueError(f"client_names must be str or list of str but got {type(client_names)}") + command = AdminCommandNames.REPORT_ENV + " " + " ".join(client_names) + + result = self._do_command(command) + meta = result[ResultKey.META] + client_envs = meta.get(MetaKey.CLIENTS) + if not client_envs: + raise RuntimeError(f"missing {MetaKey.CLIENTS} from meta") + return client_envs + def monitor_job( self, job_id: str, timeout: float = 0.0, poll_interval: float = 2.0, cb=None, *cb_args, **cb_kwargs ) -> MonitorReturnCode: diff --git a/nvflare/fuel/hci/server/authz.py b/nvflare/fuel/hci/server/authz.py index 000947eae7..e69af60998 100644 --- a/nvflare/fuel/hci/server/authz.py +++ b/nvflare/fuel/hci/server/authz.py @@ -62,9 +62,6 @@ def pre_command(self, conn: Connection, args: List[str]): return True if return_code == PreAuthzReturnCode.ERROR: - conn.append_error( - "Authorization error", meta=make_meta(MetaStatusValue.NOT_AUTHORIZED, "Authorization error") - ) return False # authz required - the command name is the name of the right to be checked! diff --git a/nvflare/fuel/utils/config_service.py b/nvflare/fuel/utils/config_service.py index e124470587..dbb4a6c97e 100644 --- a/nvflare/fuel/utils/config_service.py +++ b/nvflare/fuel/utils/config_service.py @@ -49,6 +49,8 @@ def search_file(file_basename: str, dirs: List[str]) -> Union[None, str]: Returns: the full path of the file, if found; None if not found """ + if isinstance(dirs, str): + dirs = [dirs] for d in dirs: f = find_file_in_dir(file_basename, d) if f: diff --git a/nvflare/fuel/utils/wfconf.py b/nvflare/fuel/utils/wfconf.py index 328e967ab3..f2356c5469 100644 --- a/nvflare/fuel/utils/wfconf.py +++ b/nvflare/fuel/utils/wfconf.py @@ -21,6 +21,7 @@ from nvflare.fuel.common.excepts import ConfigError from nvflare.security.logging import secure_format_exception +from .argument_utils import parse_vars from .class_utils import ModuleScanner, get_class, instantiate_class from .dict_utils import extract_first_level_primitive, merge_dict from .json_scanner import JsonObjectProcessor, JsonScanner, Node @@ -38,10 +39,22 @@ def __init__(self): class _EnvUpdater(JsonObjectProcessor): def __init__(self, vs, element_filter=None): JsonObjectProcessor.__init__(self) - self.vars = vs if element_filter is not None and not callable(element_filter): raise ValueError("element_filter must be a callable function but got {}.".format(type(element_filter))) + self.vars = copy.copy(vs) + + # make all os env vars available for config + env_vars = dict(os.environ) + if env_vars: + for k, v in env_vars.items(): + # when referencing os env var, must use a $ sign prefix! + var_name = "$" + k + if var_name not in self.vars: + # only use env var when it is not locally defined! + self.vars[var_name] = v + self.element_filter = element_filter + self.num_updated = 0 def process_element(self, node: Node): element = node.element @@ -58,14 +71,97 @@ def process_element(self, node: Node): parent_element[node.key] = element def substitute(self, element: str): - a = re.split("{|}", element) - if len(a) == 3 and a[0] == "" and a[2] == "": - element = self.vars.get(a[1], None) + original_value = element + + # Check for Simple Variable Ref (SVR) + # SVR is resolved to an object that is derived from the variable definition. + # If the variable def also contains refs, all such refs will also be resolved. + # If the variable def contains local vars, they are also resolved with the values from the ref. + # There are two kinds of SVR: + # - Simple ref that contains a single var name: {var_name} + # - Invoke a definition that contains local vars: {@var_name:n1=v1:n2=v2:...} + # The "@var_name" is a def that contains local vars n1, n2, ... + # When invoking such def, local var values could also be refs: {@var_name:n1={varp_name}} + is_svr = False + exp = element.strip() + if exp.startswith("{@") and exp.endswith("}"): + # this is a ref with local vars + is_svr = True + exp = exp[1 : len(exp) - 1] + else: + a = re.split("{|}", exp) + if len(a) == 3 and a[0] == "" and a[2] == "": + is_svr = True + exp = a[1] + + if is_svr: + parts = exp.split(":") + var_name = parts[0] + params = [] + for i, p in enumerate(parts): + if i > 0: + params.append(p) + + if params: + # the var_name must reference a dict + local_vars = parse_vars(params) + item = self.vars.get(var_name) + if item: + if isinstance(item, dict): + # scan the item to resolve var refs + new_item = copy.deepcopy(item) + scanner = JsonScanner(new_item) + new_vars = copy.copy(self.vars) + new_vars.update(local_vars) + resolve_var_refs(scanner, new_vars) + element = new_item + else: + raise ConfigError( + f"bad parameterized expression '{element}': {var_name} must be dict but got {type(item)}" + ) + else: + raise ConfigError(f"bad parameterized expression '{element}': {var_name} is not defined") + else: + # this is a single var without params + element = self.vars.get(var_name, None) else: element = element.format(**self.vars) + if element != original_value: + self.num_updated += 1 return element +def resolve_var_refs(scanner: JsonScanner, var_values: dict): + """Resolve var references in the config contained in the scanner + + Args: + scanner: the scanner that contains config data to be resolved + var_values: the dict that contains var values. + + Returns: None + + """ + updater = _EnvUpdater(var_values) + max_rounds = 20 + num_rounds = 0 + + # var_values may contain multi-level refs (value contains refs to other vars) + # we keep scanning and resolving refs until all refs are resolved, or we reached max number of rounds. + # The max rounds could be reached either because there are cyclic refs or the ref level is too deep. + while True: + scanner.scan(updater) + num_rounds += 1 + if updater.num_updated == 0: + # nothing was resolved - we have resolved everything. + break + else: + # prepare for the next round + if num_rounds > max_rounds: + # cyclic refs or nest level too deep. + raise ConfigError(f"item de-ref exceeds {max_rounds} rounds - cyclic refs or ref level too deep") + updater.num_updated = 0 + + class Configurator(JsonObjectProcessor): def __init__( self, diff --git a/nvflare/private/defs.py b/nvflare/private/defs.py index 4be5c80652..d842f1347e 100644 --- a/nvflare/private/defs.py +++ b/nvflare/private/defs.py @@ -88,6 +88,7 @@ class SysCommandTopic(object): SYS_INFO = "sys.info" SHELL = "sys.shell" REPORT_RESOURCES = "resource_manager.report_resources" + REPORT_ENV = "sys.report_env" class ControlCommandTopic(object): diff --git a/nvflare/private/fed/app/client/worker_process.py b/nvflare/private/fed/app/client/worker_process.py index 1823ed74e2..898fdce99e 100644 --- a/nvflare/private/fed/app/client/worker_process.py +++ b/nvflare/private/fed/app/client/worker_process.py @@ -149,6 +149,7 @@ def parse_arguments(): parser.add_argument("--client_name", "-c", type=str, help="client name", required=True) # parser.add_argument("--listen_port", "-p", type=str, help="listen port", required=True) parser.add_argument("--sp_target", "-g", type=str, help="Sp target", required=True) + parser.add_argument("--sp_scheme", "-scheme", type=str, help="Sp connection scheme", required=True) parser.add_argument("--parent_url", "-p", type=str, help="parent_url", required=True) parser.add_argument( "--fed_client", "-s", type=str, help="an aggregation server specification json file", required=True diff --git a/nvflare/private/fed/client/client_engine.py b/nvflare/private/fed/client/client_engine.py index 7d622b40f2..4e1978978a 100644 --- a/nvflare/private/fed/client/client_engine.py +++ b/nvflare/private/fed/client/client_engine.py @@ -21,7 +21,7 @@ from nvflare.apis.event_type import EventType from nvflare.apis.fl_component import FLComponent -from nvflare.apis.fl_constant import MachineStatus, SystemComponents, WorkspaceConstants +from nvflare.apis.fl_constant import FLContextKey, MachineStatus, SystemComponents, WorkspaceConstants from nvflare.apis.fl_context import FLContext, FLContextManager from nvflare.apis.workspace import Workspace from nvflare.fuel.utils.network_utils import get_open_ports @@ -74,6 +74,9 @@ def __init__(self, client: FederatedClient, args, rank, workers=5): private_stickers={ SystemComponents.DEFAULT_APP_DEPLOYER: AppDeployer(), SystemComponents.JOB_META_VALIDATOR: JobMetaValidator(), + SystemComponents.FED_CLIENT: client, + FLContextKey.SECURE_MODE: self.client.secure_train, + FLContextKey.WORKSPACE_ROOT: args.workspace, }, ) @@ -149,6 +152,7 @@ def start_app( open_port = get_open_ports(1)[0] + server_config = list(self.client.servers.values())[0] self.client_executor.start_app( self.client, job_id, @@ -158,7 +162,8 @@ def start_app( allocated_resource, token, resource_manager, - list(self.client.servers.values())[0]["target"], + target=server_config["target"], + scheme=server_config.get("scheme", "grpc"), ) return "Start the client app..." diff --git a/nvflare/private/fed/client/client_executor.py b/nvflare/private/fed/client/client_executor.py index 3727fb3875..9ced2ba818 100644 --- a/nvflare/private/fed/client/client_executor.py +++ b/nvflare/private/fed/client/client_executor.py @@ -46,6 +46,7 @@ def start_app( token, resource_manager, target: str, + scheme: str, ): """Starts the client app. @@ -59,6 +60,7 @@ def start_app( token: token from resource manager resource_manager: resource manager target: SP target location + scheme: SP target connection scheme """ pass @@ -147,6 +149,7 @@ def start_app( token, resource_manager: ResourceManagerSpec, target: str, + scheme: str, ): """Starts the app. @@ -160,6 +163,7 @@ def start_app( token: token from resource manager resource_manager: resource manager target: SP target location + scheme: SP connection scheme """ new_env = os.environ.copy() if app_custom_folder != "": @@ -185,6 +189,8 @@ def start_app( + str(client.cell.get_internal_listener_url()) + " -g " + target + + " -scheme " + + scheme + " -s fed_client.json " " --set" + command_options + " print_conf=True" ) diff --git a/nvflare/private/fed/client/client_json_config.py b/nvflare/private/fed/client/client_json_config.py index 785567b61d..22a14f80e2 100644 --- a/nvflare/private/fed/client/client_json_config.py +++ b/nvflare/private/fed/client/client_json_config.py @@ -16,6 +16,7 @@ from nvflare.apis.executor import Executor from nvflare.apis.fl_component import FLComponent +from nvflare.apis.fl_constant import SystemVarName from nvflare.fuel.utils.argument_utils import parse_vars from nvflare.fuel.utils.config_service import ConfigService from nvflare.fuel.utils.json_scanner import Node @@ -49,6 +50,25 @@ def __init__(self, config_file_name: str, args, app_root: str, kv_list=None, exc base_pkgs = FL_PACKAGES module_names = FL_MODULES + if kv_list: + assert isinstance(kv_list, list), "cmd_vars must be list, but got {}".format(type(kv_list)) + self.cmd_vars = parse_vars(kv_list) + else: + self.cmd_vars = {} + + # determine the values of variables that can be used in job config. + sp_scheme = args.sp_scheme + sp_target = args.sp_target + sp_url = f"{sp_scheme}://{sp_target}" + + sys_vars = { + SystemVarName.JOB_ID: args.job_id, + SystemVarName.SITE_NAME: args.client_name, + SystemVarName.WORKSPACE: args.workspace, + SystemVarName.ROOT_URL: sp_url, + SystemVarName.SECURE_MODE: self.cmd_vars.get("secure_train", True), + } + FedJsonConfigurator.__init__( self, config_file_name=config_file_name, @@ -56,13 +76,9 @@ def __init__(self, config_file_name: str, args, app_root: str, kv_list=None, exc module_names=module_names, exclude_libs=exclude_libs, is_server=False, + sys_vars=sys_vars, ) - if kv_list: - assert isinstance(kv_list, list), "cmd_vars must be list, but got {}".format(type(kv_list)) - self.cmd_vars = parse_vars(kv_list) - else: - self.cmd_vars = {} self.config_files = [config_file_name] self.runner_config = None diff --git a/nvflare/private/fed/client/client_req_processors.py b/nvflare/private/fed/client/client_req_processors.py index f81f94ae49..ee4acca897 100644 --- a/nvflare/private/fed/client/client_req_processors.py +++ b/nvflare/private/fed/client/client_req_processors.py @@ -15,7 +15,7 @@ from .info_coll_cmd import ClientInfoProcessor from .scheduler_cmds import CancelResourceProcessor, CheckResourceProcessor, ReportResourcesProcessor, StartJobProcessor from .shell_cmd import ShellCommandProcessor -from .sys_cmd import SysInfoProcessor +from .sys_cmd import ReportEnvProcessor, SysInfoProcessor from .training_cmds import ( # StartClientMGpuProcessor,; SetRunNumberProcessor, AbortAppProcessor, AbortTaskProcessor, @@ -49,6 +49,7 @@ class ClientRequestProcessors: CheckResourceProcessor(), CancelResourceProcessor(), ReportResourcesProcessor(), + ReportEnvProcessor(), ] @staticmethod diff --git a/nvflare/private/fed/client/sys_cmd.py b/nvflare/private/fed/client/sys_cmd.py index 83001f5839..1bf20fd0c6 100644 --- a/nvflare/private/fed/client/sys_cmd.py +++ b/nvflare/private/fed/client/sys_cmd.py @@ -22,6 +22,8 @@ except ImportError: pynvml = None +from nvflare.apis.fl_constant import FLContextKey, SystemComponents +from nvflare.apis.fl_context import FLContext from nvflare.private.admin_defs import Message from nvflare.private.defs import SysCommandTopic from nvflare.private.fed.client.admin import RequestProcessor @@ -52,3 +54,29 @@ def process(self, req: Message, app_ctx) -> Message: print("return sys_info") print(infos) return message + + +class ReportEnvProcessor(RequestProcessor): + def get_topics(self) -> [str]: + return [SysCommandTopic.REPORT_ENV] + + def process(self, req: Message, app_ctx) -> Message: + engine = app_ctx + fl_ctx = engine.new_context() + assert isinstance(fl_ctx, FLContext) + site_name = fl_ctx.get_identity_name() + workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_ROOT) + secure_mode = fl_ctx.get_prop(FLContextKey.SECURE_MODE) + fed_client = fl_ctx.get_prop(SystemComponents.FED_CLIENT) + root_url = "" + if fed_client: + cell = fed_client.cell + root_url = cell.get_root_url_for_child() + env = { + "site_name": site_name, + "workspace": workspace, + "secure_mode": secure_mode, + "root_url": root_url, + } + message = Message(topic="reply_" + req.topic, body=json.dumps(env)) + return message diff --git a/nvflare/private/fed/server/cmd_utils.py b/nvflare/private/fed/server/cmd_utils.py index 2a48aba052..d2d7bd6bfd 100644 --- a/nvflare/private/fed/server/cmd_utils.py +++ b/nvflare/private/fed/server/cmd_utils.py @@ -46,7 +46,7 @@ def authorize_client_operation(self, conn: Connection, args: List[str]) -> PreAu err = self.validate_command_targets(conn, auth_args[1:]) if err: - conn.append_error(err) + conn.append_error(err, meta=make_meta(MetaStatusValue.INVALID_TARGET, info=err)) return PreAuthzReturnCode.ERROR return PreAuthzReturnCode.REQUIRE_AUTHZ @@ -125,7 +125,7 @@ def must_be_project_admin(self, conn: Connection, args: List[str]): def authorize_server_operation(self, conn: Connection, args: List[str]): err = self.validate_command_targets(conn, args[1:]) if err: - conn.append_error(err) + conn.append_error(err, meta=make_meta(MetaStatusValue.INVALID_TARGET, info=err)) return PreAuthzReturnCode.ERROR target_type = conn.get_prop(self.TARGET_TYPE) diff --git a/nvflare/private/fed/server/server_json_config.py b/nvflare/private/fed/server/server_json_config.py index 14a8601530..296c2262a1 100644 --- a/nvflare/private/fed/server/server_json_config.py +++ b/nvflare/private/fed/server/server_json_config.py @@ -15,6 +15,7 @@ import re from nvflare.apis.fl_component import FLComponent +from nvflare.apis.fl_constant import SystemVarName from nvflare.apis.responder import Responder from nvflare.fuel.utils.argument_utils import parse_vars from nvflare.fuel.utils.config_service import ConfigService @@ -55,6 +56,20 @@ def __init__(self, config_file_name: str, args, app_root: str, kv_list=None, exc base_pkgs = FL_PACKAGES module_names = FL_MODULES + if kv_list: + assert isinstance(kv_list, list), "cmd_vars must be list, but got {}".format(type(kv_list)) + self.cmd_vars = parse_vars(kv_list) + else: + self.cmd_vars = {} + + sys_vars = { + SystemVarName.JOB_ID: args.job_id, + SystemVarName.SITE_NAME: "server", + SystemVarName.WORKSPACE: args.workspace, + SystemVarName.ROOT_URL: args.root_url, + SystemVarName.SECURE_MODE: self.cmd_vars.get("secure_train", True), + } + FedJsonConfigurator.__init__( self, config_file_name=config_file_name, @@ -62,6 +77,7 @@ def __init__(self, config_file_name: str, args, app_root: str, kv_list=None, exc module_names=module_names, exclude_libs=exclude_libs, is_server=True, + sys_vars=sys_vars, ) if kv_list: diff --git a/nvflare/private/fed/server/sys_cmd.py b/nvflare/private/fed/server/sys_cmd.py index d7a7cbda64..c684e08073 100644 --- a/nvflare/private/fed/server/sys_cmd.py +++ b/nvflare/private/fed/server/sys_cmd.py @@ -18,6 +18,7 @@ import psutil from nvflare.fuel.hci.conn import Connection +from nvflare.fuel.hci.proto import MetaKey from nvflare.fuel.hci.reg import CommandModule, CommandModuleSpec, CommandSpec from nvflare.private.admin_defs import MsgHeader, ReturnCode from nvflare.private.defs import SysCommandTopic @@ -67,6 +68,14 @@ def get_spec(self): authz_func=self.authorize_server_operation, visible=True, ), + CommandSpec( + name="report_env", + description="get env info of a client", + usage="report_env ", + handler_func=self.report_env, + authz_func=self.authorize_client_operation, + visible=True, + ), ], ) @@ -154,3 +163,15 @@ def report_resources(self, conn: Connection, args: List[str]): table = conn.append_table(["Sites", "Resources"]) for k, v in site_resources.items(): table.add_row([str(k), str(v)]) + + def report_env(self, conn: Connection, args: List[str]): + message = new_message(conn, topic=SysCommandTopic.REPORT_ENV, body="", require_authz=True) + replies = self.send_request_to_clients(conn, message) + if not replies: + conn.append_error("no responses from clients") + return + site_resources = _parse_replies(conn, replies) + + table = conn.append_table(["Sites", "Env"], name=MetaKey.CLIENTS) + for k, v in site_resources.items(): + table.add_row([str(k), str(v)], meta=v) diff --git a/nvflare/private/fed_json_config.py b/nvflare/private/fed_json_config.py index 2c79bea183..11b6518762 100644 --- a/nvflare/private/fed_json_config.py +++ b/nvflare/private/fed_json_config.py @@ -34,7 +34,15 @@ def validate_direction(cls, direction): class FedJsonConfigurator(JsonConfigurator): - def __init__(self, config_file_name: str, base_pkgs: [str], module_names: [str], exclude_libs=True, is_server=True): + def __init__( + self, + config_file_name: str, + base_pkgs: [str], + module_names: [str], + exclude_libs=True, + is_server=True, + sys_vars=None, + ): """To init the FedJsonConfigurator. Args: @@ -49,6 +57,7 @@ def __init__(self, config_file_name: str, base_pkgs: [str], module_names: [str], base_pkgs=base_pkgs, module_names=module_names, exclude_libs=exclude_libs, + sys_vars=sys_vars, ) self.format_version = None diff --git a/nvflare/private/json_configer.py b/nvflare/private/json_configer.py index ecbd630fe7..3a987fde69 100644 --- a/nvflare/private/json_configer.py +++ b/nvflare/private/json_configer.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import copy from typing import List, Union from nvflare.fuel.common.excepts import ComponentNotAuthorized, ConfigError @@ -19,9 +19,9 @@ from nvflare.fuel.utils.component_builder import ComponentBuilder from nvflare.fuel.utils.config_factory import ConfigFactory from nvflare.fuel.utils.config_service import ConfigService -from nvflare.fuel.utils.dict_utils import augment, extract_first_level_primitive +from nvflare.fuel.utils.dict_utils import augment from nvflare.fuel.utils.json_scanner import JsonObjectProcessor, JsonScanner, Node -from nvflare.fuel.utils.wfconf import _EnvUpdater +from nvflare.fuel.utils.wfconf import resolve_var_refs from nvflare.security.logging import secure_format_exception @@ -40,6 +40,7 @@ def __init__( module_names: List[str], exclude_libs=True, num_passes=1, + sys_vars=None, ): """To init the JsonConfigurator. @@ -49,6 +50,7 @@ def __init__( module_names: module names need to be scanned exclude_libs: True/False to exclude the libs folder num_passes: number of passes to parsing the config + sys_vars: system vars """ JsonObjectProcessor.__init__(self) @@ -70,6 +72,7 @@ def __init__( self.config_file_names = config_files self.num_passes = num_passes + self.sys_vars = sys_vars self.module_scanner = ModuleScanner(base_pkgs, module_names, exclude_libs) self.config_ctx = None @@ -107,8 +110,24 @@ def _do_configure(self): config_ctx.config_json = self.config_data self.config_ctx = config_ctx - all_vars = extract_first_level_primitive(self.config_data) - self.json_scanner.scan(_EnvUpdater(all_vars)) + # every item could be used as reference + all_vars = copy.deepcopy(self.config_data) + + # Remove parameterized items from config data since such items could only be used as refs and cannot be + # part of config. After they are removed from config data, they will not be resolved until they are invoked. + parameterized_items = [] + for k in self.config_data.keys(): + if isinstance(k, str) and k.startswith("@"): + parameterized_items.append(k) + for k in parameterized_items: + self.config_data.pop(k) + + # Add env_vars to all_vars. If there are conflicts, env_vars take precedence. + if self.sys_vars: + all_vars.update(self.sys_vars) + + # resolve var references + resolve_var_refs(self.json_scanner, all_vars) self.start_config(self.config_ctx) diff --git a/nvflare/security/security.py b/nvflare/security/security.py index 05cd39dfd8..903a053f40 100644 --- a/nvflare/security/security.py +++ b/nvflare/security/security.py @@ -42,6 +42,7 @@ class CommandCategory(object): AC.GET_JOB_META: CommandCategory.VIEW, AC.SYS_INFO: CommandCategory.OPERATE, AC.REPORT_RESOURCES: CommandCategory.OPERATE, + AC.REPORT_ENV: CommandCategory.OPERATE, AC.RESTART: CommandCategory.OPERATE, AC.SHUTDOWN: CommandCategory.OPERATE, AC.REMOVE_CLIENT: CommandCategory.OPERATE,