From b5ff6adf1f1caad72065842acf56413b1f42244c Mon Sep 17 00:00:00 2001 From: GlassOfWhiskey Date: Sun, 26 Nov 2023 22:48:32 +0100 Subject: [PATCH] Add warning messages for default connector params This commit adds two warning messages: - When a `QueueManagerConnector` is instantiated with the default value of 1 for the `maxConcurrentJobs` parameter; - When an `SSHConnector` or a `QueueManagerConnector` defines at least one service template, but none is selected to execute a step. This commit also substitutes the deprecated `logging.WARN` with the preferred `logging.WARNING` constant. --- streamflow/config/config.py | 2 +- streamflow/cwl/main.py | 2 +- streamflow/cwl/runner.py | 2 +- streamflow/cwl/translator.py | 2 +- streamflow/deployment/connector/container.py | 4 ++-- streamflow/deployment/connector/kubernetes.py | 4 ++-- .../deployment/connector/queue_manager.py | 17 +++++++++++++++-- streamflow/deployment/connector/ssh.py | 9 ++++++++- streamflow/deployment/manager.py | 2 +- streamflow/deployment/template.py | 3 +++ streamflow/deployment/utils.py | 4 ++-- streamflow/ext/plugin.py | 2 +- streamflow/main.py | 2 +- streamflow/provenance/run_crate.py | 4 ++-- 14 files changed, 41 insertions(+), 18 deletions(-) diff --git a/streamflow/config/config.py b/streamflow/config/config.py index 88a44343a..58ff52b5d 100644 --- a/streamflow/config/config.py +++ b/streamflow/config/config.py @@ -41,7 +41,7 @@ def __init__(self, name: str, config: MutableMapping[str, Any]) -> None: if not self.deplyoments: self.deplyoments = config.get("models", {}) if self.deplyoments: - if logger.isEnabledFor(logging.WARN): + if logger.isEnabledFor(logging.WARNING): logger.warning( "The `models` keyword is deprecated and will be removed in StreamFlow 0.3.0. " "Use `deployments` instead." diff --git a/streamflow/cwl/main.py b/streamflow/cwl/main.py index c6fa1f303..57d1d3b4b 100644 --- a/streamflow/cwl/main.py +++ b/streamflow/cwl/main.py @@ -63,7 +63,7 @@ async def main( # Configure log level if args.quiet: # noinspection PyProtectedMember - cwltool.loghandler._logger.setLevel(logging.WARN) + cwltool.loghandler._logger.setLevel(logging.WARNING) # Load CWL workflow definition cwl_definition, loading_context = load_cwl_workflow(cwl_args[0]) if len(cwl_args) == 2: diff --git a/streamflow/cwl/runner.py b/streamflow/cwl/runner.py index ab931c044..b6c964c2d 100644 --- a/streamflow/cwl/runner.py +++ b/streamflow/cwl/runner.py @@ -105,7 +105,7 @@ def main(args) -> int: print(f"StreamFlow version {VERSION}") return 0 if args.quiet: - logger.setLevel(logging.WARN) + logger.setLevel(logging.WARNING) elif args.debug: logger.setLevel(logging.DEBUG) asyncio.run(_async_main(args)) diff --git a/streamflow/cwl/translator.py b/streamflow/cwl/translator.py index c787e93a5..4e1edaea8 100644 --- a/streamflow/cwl/translator.py +++ b/streamflow/cwl/translator.py @@ -1532,7 +1532,7 @@ def _translate_command_line_tool( ) # Otherwise, throw a warning and skip the DockerRequirement conversion else: - if logger.isEnabledFor(logging.WARN): + if logger.isEnabledFor(logging.WARNING): logger.warning( f"Skipping DockerRequirement conversion for step `{name_prefix}` " f"when executing on `{target.deployment.name}` deployment." diff --git a/streamflow/deployment/connector/container.py b/streamflow/deployment/connector/container.py index 34b1b4cfa..a5a6fec99 100644 --- a/streamflow/deployment/connector/container.py +++ b/streamflow/deployment/connector/container.py @@ -122,7 +122,7 @@ def __init__( if cacheSize is None: cacheSize = resourcesCacheSize if cacheSize is not None: - if logger.isEnabledFor(logging.WARN): + if logger.isEnabledFor(logging.WARNING): logger.warning( "The `resourcesCacheSize` keyword is deprecated and will be removed in StreamFlow 0.3.0. " "Use `locationsCacheSize` instead." @@ -133,7 +133,7 @@ def __init__( if cacheTTL is None: cacheTTL = resourcesCacheTTL if cacheTTL is not None: - if logger.isEnabledFor(logging.WARN): + if logger.isEnabledFor(logging.WARNING): logger.warning( "The `resourcesCacheTTL` keyword is deprecated and will be removed in StreamFlow 0.3.0. " "Use `locationsCacheTTL` instead." diff --git a/streamflow/deployment/connector/kubernetes.py b/streamflow/deployment/connector/kubernetes.py index 114b0309a..f14ddbf86 100644 --- a/streamflow/deployment/connector/kubernetes.py +++ b/streamflow/deployment/connector/kubernetes.py @@ -188,7 +188,7 @@ def __init__( if cacheSize is None: cacheSize = resourcesCacheSize if cacheSize is not None: - if logger.isEnabledFor(logging.WARN): + if logger.isEnabledFor(logging.WARNING): logger.warning( "The `resourcesCacheSize` keyword is deprecated and will be removed in StreamFlow 0.3.0. " "Use `locationsCacheSize` instead." @@ -199,7 +199,7 @@ def __init__( if cacheTTL is None: cacheTTL = resourcesCacheTTL if cacheTTL is not None: - if logger.isEnabledFor(logging.WARN): + if logger.isEnabledFor(logging.WARNING): logger.warning( "The `resourcesCacheTTL` keyword is deprecated and will be removed in StreamFlow 0.3.0. " "Use `locationsCacheTTL` instead." diff --git a/streamflow/deployment/connector/queue_manager.py b/streamflow/deployment/connector/queue_manager.py index dee3728e6..ab5729be5 100644 --- a/streamflow/deployment/connector/queue_manager.py +++ b/streamflow/deployment/connector/queue_manager.py @@ -340,7 +340,7 @@ def __init__( ) -> None: self._inner_ssh_connector: bool = False if hostname is not None: - if logger.isEnabledFor(logging.WARN): + if logger.isEnabledFor(logging.WARNING): logger.warning( "Inline SSH options are deprecated and will be removed in StreamFlow 0.3.0. " f"Define a standalone `SSHConnector` and link the `{self.__class__.__name__}` " @@ -374,7 +374,7 @@ def __init__( with open(os.path.join(self.config_dir, service.file)) as f: files_map[name] = f.read() if file is not None: - if logger.isEnabledFor(logging.WARN): + if logger.isEnabledFor(logging.WARNING): logger.warning( "The `file` keyword is deprecated and will be removed in StreamFlow 0.3.0. " "Use `services` instead." @@ -389,6 +389,12 @@ def __init__( template_map=files_map, ) self.maxConcurrentJobs: int = maxConcurrentJobs + if logger.isEnabledFor(logging.WARNING): + logger.warning( + "The `maxConcurrentJobs` parameter is set to the default value 1, which prevents " + "multiple jobs to be concurrently submitted to the queue manager. Consider raising " + "this value to improve performance." + ) self.pollingInterval: int = pollingInterval self.scheduledJobs: MutableSequence[str] = [] self.jobsCache: cachetools.Cache = cachetools.TTLCache( @@ -500,6 +506,13 @@ async def run( ) ) command = utils.encode_command(command) + if logger.isEnabledFor(logging.WARNING): + if not self.template_map.is_empty() and location.service is None: + logger.warning( + f"Deployment {self.deployment_name} contains some service definitions, " + f"but none of them has been specified to execute job {job_name}. Execution " + f"will fall back to the default template." + ) command = self.template_map.get_command( command=command, template=location.service, diff --git a/streamflow/deployment/connector/ssh.py b/streamflow/deployment/connector/ssh.py index e8092260e..ffebaa622 100644 --- a/streamflow/deployment/connector/ssh.py +++ b/streamflow/deployment/connector/ssh.py @@ -283,7 +283,7 @@ def __init__( with open(os.path.join(self.config_dir, service)) as f: services_map[name] = f.read() if file is not None: - if logger.isEnabledFor(logging.WARN): + if logger.isEnabledFor(logging.WARNING): logger.warning( "The `file` keyword is deprecated and will be removed in StreamFlow 0.3.0. " "Use `services` instead." @@ -717,6 +717,13 @@ async def run( job_name=job_name, ) if job_name is not None: + if logger.isEnabledFor(logging.WARNING): + if not self.template_map.is_empty() and location.service is None: + logger.warning( + f"Deployment {self.deployment_name} contains some service definitions, " + f"but none of them has been specified to execute job {job_name}. Execution " + f"will fall back to the default template." + ) command = self.template_map.get_command( command=command, template=location.service, diff --git a/streamflow/deployment/manager.py b/streamflow/deployment/manager.py index e56634c44..5cac0c5f8 100644 --- a/streamflow/deployment/manager.py +++ b/streamflow/deployment/manager.py @@ -146,7 +146,7 @@ async def _inner_deploy( # If it is not a ConnectorWrapper, do nothing else: if deployment_config.wraps is not None: - if logger.isEnabledFor(logging.WARN): + if logger.isEnabledFor(logging.WARNING): logger.warning( f"The `wraps` directive has no effect on deployment {deployment_config.name}, " f"as the `{deployment_config.type}` connector does not inherit from the ConnectorWrapper class." diff --git a/streamflow/deployment/template.py b/streamflow/deployment/template.py index 81d1c6358..870982017 100644 --- a/streamflow/deployment/template.py +++ b/streamflow/deployment/template.py @@ -32,3 +32,6 @@ def get_command( streamflow_workdir=workdir, **kwargs, ) + + def is_empty(self) -> bool: + return len(self.templates) == 1 diff --git a/streamflow/deployment/utils.py b/streamflow/deployment/utils.py index 180990d05..75548630f 100644 --- a/streamflow/deployment/utils.py +++ b/streamflow/deployment/utils.py @@ -30,7 +30,7 @@ def get_binding_config( target_deployment = workflow_config.deplyoments[target["deployment"]] else: target_deployment = workflow_config.deplyoments[target["model"]] - if logger.isEnabledFor(logging.WARN): + if logger.isEnabledFor(logging.WARNING): logger.warning( "The `model` keyword is deprecated and will be removed in StreamFlow 0.3.0. " "Use `deployment` instead." @@ -39,7 +39,7 @@ def get_binding_config( if locations is None: locations = target.get("resources") if locations is not None: - if logger.isEnabledFor(logging.WARN): + if logger.isEnabledFor(logging.WARNING): logger.warning( "The `resources` keyword is deprecated and will be removed in StreamFlow 0.3.0. " "Use `locations` instead." diff --git a/streamflow/ext/plugin.py b/streamflow/ext/plugin.py index f32a84fa8..cd33e85e7 100644 --- a/streamflow/ext/plugin.py +++ b/streamflow/ext/plugin.py @@ -48,7 +48,7 @@ def _register(self, name: str, cls: type, extension_point: str): } ) if name in extension_points[extension_point]: - if logger.isEnabledFor(logging.WARN): + if logger.isEnabledFor(logging.WARNING): logger.warning( "{} is already installed and will be overridden by {}".format( name, self.__class__.__module__ + "." + self.__class__.__name__ diff --git a/streamflow/main.py b/streamflow/main.py index 17899c6e7..6d6bcda44 100644 --- a/streamflow/main.py +++ b/streamflow/main.py @@ -260,7 +260,7 @@ def main(args): asyncio.run(_async_report(args)) elif args.context == "run": if args.quiet: - logger.setLevel(logging.WARN) + logger.setLevel(logging.WARNING) elif args.debug: logger.setLevel(logging.DEBUG) if args.color and hasattr(sys.stdout, "isatty") and sys.stdout.isatty(): diff --git a/streamflow/provenance/run_crate.py b/streamflow/provenance/run_crate.py index 373a0d775..bc95eb872 100644 --- a/streamflow/provenance/run_crate.py +++ b/streamflow/provenance/run_crate.py @@ -570,7 +570,7 @@ async def add_file(self, file: MutableMapping[str, str]) -> None: dst = file.get("dst", posixpath.sep) dst_parent = posixpath.dirname(posixpath.normpath(dst)) if src in self.files_map: - if logger.isEnabledFor(logging.WARN): + if logger.isEnabledFor(logging.WARNING): logger.warning(f"File {src} is already present in the archive.") else: if os.path.isfile(os.path.realpath(src)): @@ -679,7 +679,7 @@ async def add_property(self, key: str, value: str): f"Token `{k}` of key `{key}` does not exist in archive manifest." ) current_obj = current_obj[k] - if logger.isEnabledFor(logging.WARN): + if logger.isEnabledFor(logging.WARNING): if keys[-1] in current_obj: logger.warning( f"Key {key} already exists in archive manifest and will be overridden."