Skip to content

Commit

Permalink
Add warning messages for default connector params
Browse files Browse the repository at this point in the history
This commit adds two warning messages:
- When a `QueueManagerConnector` is instantiated with the default value
  of 1 for the `maxConcurrentJobs` parameter;
- When an `SSHConnector` or a `QueueManagerConnector` defines at least
  one service template, but none is selected to execute a step.

This commit also substitutes the deprecated `logging.WARN` with the
preferred `logging.WARNING` constant.
  • Loading branch information
GlassOfWhiskey committed Nov 26, 2023
1 parent 2762b17 commit b5ff6ad
Show file tree
Hide file tree
Showing 14 changed files with 41 additions and 18 deletions.
2 changes: 1 addition & 1 deletion streamflow/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def __init__(self, name: str, config: MutableMapping[str, Any]) -> None:
if not self.deplyoments:
self.deplyoments = config.get("models", {})
if self.deplyoments:
if logger.isEnabledFor(logging.WARN):
if logger.isEnabledFor(logging.WARNING):
logger.warning(
"The `models` keyword is deprecated and will be removed in StreamFlow 0.3.0. "
"Use `deployments` instead."
Expand Down
2 changes: 1 addition & 1 deletion streamflow/cwl/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ async def main(
# Configure log level
if args.quiet:
# noinspection PyProtectedMember
cwltool.loghandler._logger.setLevel(logging.WARN)
cwltool.loghandler._logger.setLevel(logging.WARNING)
# Load CWL workflow definition
cwl_definition, loading_context = load_cwl_workflow(cwl_args[0])
if len(cwl_args) == 2:
Expand Down
2 changes: 1 addition & 1 deletion streamflow/cwl/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def main(args) -> int:
print(f"StreamFlow version {VERSION}")
return 0
if args.quiet:
logger.setLevel(logging.WARN)
logger.setLevel(logging.WARNING)
elif args.debug:
logger.setLevel(logging.DEBUG)
asyncio.run(_async_main(args))
Expand Down
2 changes: 1 addition & 1 deletion streamflow/cwl/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1532,7 +1532,7 @@ def _translate_command_line_tool(
)
# Otherwise, throw a warning and skip the DockerRequirement conversion
else:
if logger.isEnabledFor(logging.WARN):
if logger.isEnabledFor(logging.WARNING):
logger.warning(
f"Skipping DockerRequirement conversion for step `{name_prefix}` "
f"when executing on `{target.deployment.name}` deployment."
Expand Down
4 changes: 2 additions & 2 deletions streamflow/deployment/connector/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def __init__(
if cacheSize is None:
cacheSize = resourcesCacheSize
if cacheSize is not None:
if logger.isEnabledFor(logging.WARN):
if logger.isEnabledFor(logging.WARNING):
logger.warning(
"The `resourcesCacheSize` keyword is deprecated and will be removed in StreamFlow 0.3.0. "
"Use `locationsCacheSize` instead."
Expand All @@ -133,7 +133,7 @@ def __init__(
if cacheTTL is None:
cacheTTL = resourcesCacheTTL
if cacheTTL is not None:
if logger.isEnabledFor(logging.WARN):
if logger.isEnabledFor(logging.WARNING):
logger.warning(
"The `resourcesCacheTTL` keyword is deprecated and will be removed in StreamFlow 0.3.0. "
"Use `locationsCacheTTL` instead."
Expand Down
4 changes: 2 additions & 2 deletions streamflow/deployment/connector/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def __init__(
if cacheSize is None:
cacheSize = resourcesCacheSize
if cacheSize is not None:
if logger.isEnabledFor(logging.WARN):
if logger.isEnabledFor(logging.WARNING):
logger.warning(
"The `resourcesCacheSize` keyword is deprecated and will be removed in StreamFlow 0.3.0. "
"Use `locationsCacheSize` instead."
Expand All @@ -199,7 +199,7 @@ def __init__(
if cacheTTL is None:
cacheTTL = resourcesCacheTTL
if cacheTTL is not None:
if logger.isEnabledFor(logging.WARN):
if logger.isEnabledFor(logging.WARNING):
logger.warning(
"The `resourcesCacheTTL` keyword is deprecated and will be removed in StreamFlow 0.3.0. "
"Use `locationsCacheTTL` instead."
Expand Down
17 changes: 15 additions & 2 deletions streamflow/deployment/connector/queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ def __init__(
) -> None:
self._inner_ssh_connector: bool = False
if hostname is not None:
if logger.isEnabledFor(logging.WARN):
if logger.isEnabledFor(logging.WARNING):
logger.warning(
"Inline SSH options are deprecated and will be removed in StreamFlow 0.3.0. "
f"Define a standalone `SSHConnector` and link the `{self.__class__.__name__}` "
Expand Down Expand Up @@ -374,7 +374,7 @@ def __init__(
with open(os.path.join(self.config_dir, service.file)) as f:
files_map[name] = f.read()
if file is not None:
if logger.isEnabledFor(logging.WARN):
if logger.isEnabledFor(logging.WARNING):
logger.warning(
"The `file` keyword is deprecated and will be removed in StreamFlow 0.3.0. "
"Use `services` instead."
Expand All @@ -389,6 +389,12 @@ def __init__(
template_map=files_map,
)
self.maxConcurrentJobs: int = maxConcurrentJobs
if logger.isEnabledFor(logging.WARNING):
logger.warning(
"The `maxConcurrentJobs` parameter is set to the default value 1, which prevents "
"multiple jobs to be concurrently submitted to the queue manager. Consider raising "
"this value to improve performance."
)
self.pollingInterval: int = pollingInterval
self.scheduledJobs: MutableSequence[str] = []
self.jobsCache: cachetools.Cache = cachetools.TTLCache(
Expand Down Expand Up @@ -500,6 +506,13 @@ async def run(
)
)
command = utils.encode_command(command)
if logger.isEnabledFor(logging.WARNING):
if not self.template_map.is_empty() and location.service is None:
logger.warning(
f"Deployment {self.deployment_name} contains some service definitions, "
f"but none of them has been specified to execute job {job_name}. Execution "
f"will fall back to the default template."
)
command = self.template_map.get_command(
command=command,
template=location.service,
Expand Down
9 changes: 8 additions & 1 deletion streamflow/deployment/connector/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ def __init__(
with open(os.path.join(self.config_dir, service)) as f:
services_map[name] = f.read()
if file is not None:
if logger.isEnabledFor(logging.WARN):
if logger.isEnabledFor(logging.WARNING):
logger.warning(
"The `file` keyword is deprecated and will be removed in StreamFlow 0.3.0. "
"Use `services` instead."
Expand Down Expand Up @@ -717,6 +717,13 @@ async def run(
job_name=job_name,
)
if job_name is not None:
if logger.isEnabledFor(logging.WARNING):
if not self.template_map.is_empty() and location.service is None:
logger.warning(
f"Deployment {self.deployment_name} contains some service definitions, "
f"but none of them has been specified to execute job {job_name}. Execution "
f"will fall back to the default template."
)
command = self.template_map.get_command(
command=command,
template=location.service,
Expand Down
2 changes: 1 addition & 1 deletion streamflow/deployment/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ async def _inner_deploy(
# If it is not a ConnectorWrapper, do nothing
else:
if deployment_config.wraps is not None:
if logger.isEnabledFor(logging.WARN):
if logger.isEnabledFor(logging.WARNING):
logger.warning(
f"The `wraps` directive has no effect on deployment {deployment_config.name}, "
f"as the `{deployment_config.type}` connector does not inherit from the ConnectorWrapper class."
Expand Down
3 changes: 3 additions & 0 deletions streamflow/deployment/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ def get_command(
streamflow_workdir=workdir,
**kwargs,
)

def is_empty(self) -> bool:
return len(self.templates) == 1
4 changes: 2 additions & 2 deletions streamflow/deployment/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def get_binding_config(
target_deployment = workflow_config.deplyoments[target["deployment"]]
else:
target_deployment = workflow_config.deplyoments[target["model"]]
if logger.isEnabledFor(logging.WARN):
if logger.isEnabledFor(logging.WARNING):
logger.warning(
"The `model` keyword is deprecated and will be removed in StreamFlow 0.3.0. "
"Use `deployment` instead."
Expand All @@ -39,7 +39,7 @@ def get_binding_config(
if locations is None:
locations = target.get("resources")
if locations is not None:
if logger.isEnabledFor(logging.WARN):
if logger.isEnabledFor(logging.WARNING):
logger.warning(
"The `resources` keyword is deprecated and will be removed in StreamFlow 0.3.0. "
"Use `locations` instead."
Expand Down
2 changes: 1 addition & 1 deletion streamflow/ext/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def _register(self, name: str, cls: type, extension_point: str):
}
)
if name in extension_points[extension_point]:
if logger.isEnabledFor(logging.WARN):
if logger.isEnabledFor(logging.WARNING):
logger.warning(
"{} is already installed and will be overridden by {}".format(
name, self.__class__.__module__ + "." + self.__class__.__name__
Expand Down
2 changes: 1 addition & 1 deletion streamflow/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ def main(args):
asyncio.run(_async_report(args))
elif args.context == "run":
if args.quiet:
logger.setLevel(logging.WARN)
logger.setLevel(logging.WARNING)
elif args.debug:
logger.setLevel(logging.DEBUG)
if args.color and hasattr(sys.stdout, "isatty") and sys.stdout.isatty():
Expand Down
4 changes: 2 additions & 2 deletions streamflow/provenance/run_crate.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ async def add_file(self, file: MutableMapping[str, str]) -> None:
dst = file.get("dst", posixpath.sep)
dst_parent = posixpath.dirname(posixpath.normpath(dst))
if src in self.files_map:
if logger.isEnabledFor(logging.WARN):
if logger.isEnabledFor(logging.WARNING):
logger.warning(f"File {src} is already present in the archive.")
else:
if os.path.isfile(os.path.realpath(src)):
Expand Down Expand Up @@ -679,7 +679,7 @@ async def add_property(self, key: str, value: str):
f"Token `{k}` of key `{key}` does not exist in archive manifest."
)
current_obj = current_obj[k]
if logger.isEnabledFor(logging.WARN):
if logger.isEnabledFor(logging.WARNING):
if keys[-1] in current_obj:
logger.warning(
f"Key {key} already exists in archive manifest and will be overridden."
Expand Down

0 comments on commit b5ff6ad

Please sign in to comment.