From 604173e56ba176f66b287ca339e4d017051d66a6 Mon Sep 17 00:00:00 2001 From: Ric Evans Date: Tue, 19 Dec 2023 11:01:55 -0600 Subject: [PATCH] trim down `entrypoint.py` --- ewms_sidecar/clientmanager.py | 219 ---------------------------------- ewms_sidecar/entrypoint.py | 151 +++++++++++++++++++++++ 2 files changed, 151 insertions(+), 219 deletions(-) delete mode 100644 ewms_sidecar/clientmanager.py create mode 100644 ewms_sidecar/entrypoint.py diff --git a/ewms_sidecar/clientmanager.py b/ewms_sidecar/clientmanager.py deleted file mode 100644 index 37997b51b..000000000 --- a/ewms_sidecar/clientmanager.py +++ /dev/null @@ -1,219 +0,0 @@ -"""The central module.""" - - -import argparse -import time -from pathlib import Path - -from wipac_dev_tools import argparse_tools, logging_tools - -from . import condor, k8s -from .config import ENV, LOGGER - - -def main() -> None: - """Main.""" - parser = argparse.ArgumentParser( - description="Manage Skymap Scanner client workers", - formatter_class=argparse.ArgumentDefaultsHelpFormatter, - ) - - parser.add_argument( - "--uuid", - required=True, - help="the uuid for the cluster", - ) - - # orchestrator - orch_subparsers = parser.add_subparsers( - required=True, - dest="orchestrator", - help="the resource orchestration tool to use for worker scheduling", - ) - OrchestratorArgs.condor( - orch_condor_parser := orch_subparsers.add_parser( - "condor", help="orchestrate with HTCondor" - ) - ) - OrchestratorArgs.k8s( - orch_k8s_parser := orch_subparsers.add_parser( - "k8s", help="orchestrate with Kubernetes" - ) - ) - - # action -- add sub-parser to each sub-parser (can't add multiple sub-parsers) - for p in [orch_condor_parser, orch_k8s_parser]: - act_subparsers = p.add_subparsers( - required=True, - dest="action", - help="the action to perform on the worker cluster", - ) - ActionArgs.starter(act_subparsers.add_parser("start", help="start workers")) - ActionArgs.stopper(act_subparsers.add_parser("stop", help="stop workers")) - - # parse args & set up logging - args = parser.parse_args() - logging_tools.set_level( - "DEBUG", # os.getenv("SKYSCAN_LOG", "INFO"), # type: ignore[arg-type] - first_party_loggers=LOGGER, - third_party_level=ENV.SKYSCAN_LOG_THIRD_PARTY, # type: ignore[arg-type] - use_coloredlogs=True, # for formatting - future_third_parties=["boto3", "botocore"], - ) - logging_tools.log_argparse_args(args, logger=LOGGER, level="WARNING") - - # Go! - match args.orchestrator: - case "condor": - condor.act(args) - case "k8s": - k8s.act(args) - case other: - raise RuntimeError(f"Orchestrator not supported: {other}") - - -class OrchestratorArgs: - @staticmethod - def condor(sub_parser: argparse.ArgumentParser) -> None: - """Add args to subparser.""" - sub_parser.add_argument( - "--collector", - default="", - help="the full URL address of the HTCondor collector server. Ex: foo-bar.icecube.wisc.edu", - ) - sub_parser.add_argument( - "--schedd", - default="", - help="the full DNS name of the HTCondor Schedd server. Ex: baz.icecube.wisc.edu", - ) - - @staticmethod - def k8s(sub_parser: argparse.ArgumentParser) -> None: - """Add args to subparser.""" - sub_parser.add_argument( - "--host", - required=True, - help="the host server address to connect to for running workers", - ) - sub_parser.add_argument( - "--namespace", - required=True, - help="the k8s namespace to use for running workers", - ) - sub_parser.add_argument( - "--cpu-arch", - default="x64", - help="which CPU architecture to use for running workers", - ) - sub_parser.add_argument( - "--job-config-stub", - type=Path, - default=Path("resources/worker_k8s_job_stub.json"), - help="worker k8s job config file to dynamically complete, then run (json)", - ) - - -class ActionArgs: - @staticmethod - def starter(sub_parser: argparse.ArgumentParser) -> None: - """Add args to subparser.""" - - def wait_for_file(waitee: Path, wait_time: int) -> Path: - """Wait for `waitee` to exist, then return fullly-resolved path.""" - elapsed_time = 0 - sleep = 5 - while not waitee.exists(): - LOGGER.info(f"waiting for {waitee} ({sleep}s intervals)...") - time.sleep(sleep) - elapsed_time += sleep - if elapsed_time >= wait_time: - raise argparse.ArgumentTypeError( - f"FileNotFoundError: waited {wait_time}s [{waitee}]" - ) - return waitee.resolve() - - # helper args - sub_parser.add_argument( - "--dryrun", - default=False, - action="store_true", - help="does everything except submitting the worker(s)", - ) - sub_parser.add_argument( - "--spool", - default=False, - action="store_true", - help="whether to spool (persist) logs -- if not given, logs are not kept", - ) - - # worker args - sub_parser.add_argument( - "--worker-memory-bytes", - required=True, - type=int, - help="amount of worker memory (bytes)", - ) - sub_parser.add_argument( - "--worker-disk-bytes", - required=True, - type=int, - help="amount of worker disk (bytes)", - ) - sub_parser.add_argument( - "--n-cores", - default=1, - type=int, - help="number of cores per worker", - ) - sub_parser.add_argument( - "--n-workers", - required=True, - type=int, - help="number of worker to start", - ) - sub_parser.add_argument( - "--max-worker-runtime", - required=True, - type=int, - help="how long each worker is allowed to run -- condor only", # TODO - set for k8s? - ) - sub_parser.add_argument( - "--priority", - required=True, - help="relative priority of this job/jobs -- condor only", # TODO - set for k8s? - ) - - # client args - sub_parser.add_argument( - "--client-args", - required=False, - nargs="*", - type=lambda x: argparse_tools.validate_arg( - x.split(":", maxsplit=1), - len(x.split(":", maxsplit=1)) == 2, - ValueError('must " "-delimited series of "clientarg:value"-tuples'), - ), - help="n 'key:value' pairs containing the python CL arguments to pass to skymap_scanner.client", - ) - sub_parser.add_argument( - "--client-startup-json", - help="The 'startup.json' file to startup each client", - type=lambda x: wait_for_file( - Path(x), - ENV.CLIENT_STARTER_WAIT_FOR_STARTUP_JSON, - ), - ) - sub_parser.add_argument( - "--image", - required=True, - help="a path or url to the workers' image", - ) - - @staticmethod - def stopper(sub_parser: argparse.ArgumentParser) -> None: - """Add args to subparser.""" - sub_parser.add_argument( - "--cluster-id", - required=True, - help="the cluster id of the workers to be stopped/removed", - ) diff --git a/ewms_sidecar/entrypoint.py b/ewms_sidecar/entrypoint.py new file mode 100644 index 000000000..6f25fc7a5 --- /dev/null +++ b/ewms_sidecar/entrypoint.py @@ -0,0 +1,151 @@ +"""The EWMS Sidecar.""" + + +import argparse +import time +from pathlib import Path + +from wipac_dev_tools import argparse_tools, logging_tools + +from . import condor +from .config import ENV, LOGGER + + +def main() -> None: + """Main.""" + parser = argparse.ArgumentParser( + description="Handle EWMS requests beside a Skymap Scanner central server", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + + parser.add_argument( + "--uuid", + required=True, + help="the uuid for the cluster", + ) + + add_condor_args(parser) + add_starter_args(parser) + + # parse args & set up logging + args = parser.parse_args() + logging_tools.set_level( + "DEBUG", # os.getenv("SKYSCAN_LOG", "INFO"), # type: ignore[arg-type] + first_party_loggers=LOGGER, + third_party_level=ENV.SKYSCAN_LOG_THIRD_PARTY, # type: ignore[arg-type] + use_coloredlogs=True, # for formatting + future_third_parties=["boto3", "botocore"], + ) + logging_tools.log_argparse_args(args, logger=LOGGER, level="WARNING") + + # Go! + condor.act(args) + + +def add_condor_args(parser: argparse.ArgumentParser) -> None: + """Add args.""" + parser.add_argument( + "--collector", + default="", + help="the full URL address of the HTCondor collector server. Ex: foo-bar.icecube.wisc.edu", + ) + parser.add_argument( + "--schedd", + default="", + help="the full DNS name of the HTCondor Schedd server. Ex: baz.icecube.wisc.edu", + ) + + +def add_starter_args(parser: argparse.ArgumentParser) -> None: + """Add args.""" + + def wait_for_file(waitee: Path, wait_time: int) -> Path: + """Wait for `waitee` to exist, then return fully-resolved path.""" + elapsed_time = 0 + sleep = 5 + while not waitee.exists(): + LOGGER.info(f"waiting for {waitee} ({sleep}s intervals)...") + time.sleep(sleep) + elapsed_time += sleep + if elapsed_time >= wait_time: + raise argparse.ArgumentTypeError( + f"FileNotFoundError: waited {wait_time}s [{waitee}]" + ) + return waitee.resolve() + + # helper args + parser.add_argument( + "--dryrun", + default=False, + action="store_true", + help="does everything except submitting the worker(s)", + ) + parser.add_argument( + "--spool", + default=False, + action="store_true", + help="whether to spool (persist) logs -- if not given, logs are not kept", + ) + + # worker args + parser.add_argument( + "--worker-memory-bytes", + required=True, + type=int, + help="amount of worker memory (bytes)", + ) + parser.add_argument( + "--worker-disk-bytes", + required=True, + type=int, + help="amount of worker disk (bytes)", + ) + parser.add_argument( + "--n-cores", + default=1, + type=int, + help="number of cores per worker", + ) + parser.add_argument( + "--n-workers", + required=True, + type=int, + help="number of worker to start", + ) + parser.add_argument( + "--max-worker-runtime", + required=True, + type=int, + help="how long each worker is allowed to run", + ) + parser.add_argument( + "--priority", + required=True, + help="relative priority of this job/jobs", + ) + + # client args + parser.add_argument( + "--client-args", + required=False, + nargs="*", + type=lambda x: argparse_tools.validate_arg( + x.split(":", maxsplit=1), + len(x.split(":", maxsplit=1)) == 2, + ValueError('must " "-delimited series of "clientarg:value"-tuples'), + ), + help="n 'key:value' pairs containing the python CL arguments to pass to skymap_scanner.client", + ) + parser.add_argument( + "--client-startup-json", + help="The 'startup.json' file to startup each client", + type=lambda x: wait_for_file( + Path(x), + ENV.CLIENT_STARTER_WAIT_FOR_STARTUP_JSON, + ), + ) + parser.add_argument( + "--image", + required=True, + help="a path or url to the workers' image", + )