Skip to content

Commit

Permalink
trim down ewms_sidecar.py
Browse files Browse the repository at this point in the history
  • Loading branch information
ric-evans committed Dec 19, 2023
1 parent 44335c2 commit 88df7f0
Showing 1 changed file with 115 additions and 183 deletions.
298 changes: 115 additions & 183 deletions ewms_sidecar/ewms_sidecar.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""The central module."""
"""The EWMS Sidecar."""


import argparse
Expand All @@ -7,14 +7,14 @@

from wipac_dev_tools import argparse_tools, logging_tools

from . import condor, k8s
from . import condor
from .config import ENV, LOGGER


def main() -> None:
"""Main."""
parser = argparse.ArgumentParser(
description="Manage Skymap Scanner client workers",
description="Handle EWMS requests beside a Skymap Scanner central server",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)

Expand All @@ -24,32 +24,8 @@ def main() -> None:
help="the uuid for the cluster",
)

# orchestrator
orch_subparsers = parser.add_subparsers(
required=True,
dest="orchestrator",
help="the resource orchestration tool to use for worker scheduling",
)
OrchestratorArgs.condor(
orch_condor_parser := orch_subparsers.add_parser(
"condor", help="orchestrate with HTCondor"
)
)
OrchestratorArgs.k8s(
orch_k8s_parser := orch_subparsers.add_parser(
"k8s", help="orchestrate with Kubernetes"
)
)

# action -- add sub-parser to each sub-parser (can't add multiple sub-parsers)
for p in [orch_condor_parser, orch_k8s_parser]:
act_subparsers = p.add_subparsers(
required=True,
dest="action",
help="the action to perform on the worker cluster",
)
ActionArgs.starter(act_subparsers.add_parser("start", help="start workers"))
ActionArgs.stopper(act_subparsers.add_parser("stop", help="stop workers"))
add_condor_args(parser)
add_starter_args(parser)

# parse args & set up logging
args = parser.parse_args()
Expand All @@ -63,157 +39,113 @@ def main() -> None:
logging_tools.log_argparse_args(args, logger=LOGGER, level="WARNING")

# Go!
match args.orchestrator:
case "condor":
condor.act(args)
case "k8s":
k8s.act(args)
case other:
raise RuntimeError(f"Orchestrator not supported: {other}")


class OrchestratorArgs:
@staticmethod
def condor(sub_parser: argparse.ArgumentParser) -> None:
"""Add args to subparser."""
sub_parser.add_argument(
"--collector",
default="",
help="the full URL address of the HTCondor collector server. Ex: foo-bar.icecube.wisc.edu",
)
sub_parser.add_argument(
"--schedd",
default="",
help="the full DNS name of the HTCondor Schedd server. Ex: baz.icecube.wisc.edu",
)

@staticmethod
def k8s(sub_parser: argparse.ArgumentParser) -> None:
"""Add args to subparser."""
sub_parser.add_argument(
"--host",
required=True,
help="the host server address to connect to for running workers",
)
sub_parser.add_argument(
"--namespace",
required=True,
help="the k8s namespace to use for running workers",
)
sub_parser.add_argument(
"--cpu-arch",
default="x64",
help="which CPU architecture to use for running workers",
)
sub_parser.add_argument(
"--job-config-stub",
type=Path,
default=Path("resources/worker_k8s_job_stub.json"),
help="worker k8s job config file to dynamically complete, then run (json)",
)


class ActionArgs:
@staticmethod
def starter(sub_parser: argparse.ArgumentParser) -> None:
"""Add args to subparser."""

def wait_for_file(waitee: Path, wait_time: int) -> Path:
"""Wait for `waitee` to exist, then return fullly-resolved path."""
elapsed_time = 0
sleep = 5
while not waitee.exists():
LOGGER.info(f"waiting for {waitee} ({sleep}s intervals)...")
time.sleep(sleep)
elapsed_time += sleep
if elapsed_time >= wait_time:
raise argparse.ArgumentTypeError(
f"FileNotFoundError: waited {wait_time}s [{waitee}]"
)
return waitee.resolve()

# helper args
sub_parser.add_argument(
"--dryrun",
default=False,
action="store_true",
help="does everything except submitting the worker(s)",
)
sub_parser.add_argument(
"--spool",
default=False,
action="store_true",
help="whether to spool (persist) logs -- if not given, logs are not kept",
)

# worker args
sub_parser.add_argument(
"--worker-memory-bytes",
required=True,
type=int,
help="amount of worker memory (bytes)",
)
sub_parser.add_argument(
"--worker-disk-bytes",
required=True,
type=int,
help="amount of worker disk (bytes)",
)
sub_parser.add_argument(
"--n-cores",
default=1,
type=int,
help="number of cores per worker",
)
sub_parser.add_argument(
"--n-workers",
required=True,
type=int,
help="number of worker to start",
)
sub_parser.add_argument(
"--max-worker-runtime",
required=True,
type=int,
help="how long each worker is allowed to run -- condor only", # TODO - set for k8s?
)
sub_parser.add_argument(
"--priority",
required=True,
help="relative priority of this job/jobs -- condor only", # TODO - set for k8s?
)

# client args
sub_parser.add_argument(
"--client-args",
required=False,
nargs="*",
type=lambda x: argparse_tools.validate_arg(
x.split(":", maxsplit=1),
len(x.split(":", maxsplit=1)) == 2,
ValueError('must " "-delimited series of "clientarg:value"-tuples'),
),
help="n 'key:value' pairs containing the python CL arguments to pass to skymap_scanner.client",
)
sub_parser.add_argument(
"--client-startup-json",
help="The 'startup.json' file to startup each client",
type=lambda x: wait_for_file(
Path(x),
ENV.CLIENT_STARTER_WAIT_FOR_STARTUP_JSON,
),
)
sub_parser.add_argument(
"--image",
required=True,
help="a path or url to the workers' image",
)

@staticmethod
def stopper(sub_parser: argparse.ArgumentParser) -> None:
"""Add args to subparser."""
sub_parser.add_argument(
"--cluster-id",
required=True,
help="the cluster id of the workers to be stopped/removed",
)
condor.act(args)


def add_condor_args(parser: argparse.ArgumentParser) -> None:
"""Add args."""
parser.add_argument(
"--collector",
default="",
help="the full URL address of the HTCondor collector server. Ex: foo-bar.icecube.wisc.edu",
)
parser.add_argument(
"--schedd",
default="",
help="the full DNS name of the HTCondor Schedd server. Ex: baz.icecube.wisc.edu",
)


def add_starter_args(parser: argparse.ArgumentParser) -> None:
"""Add args."""

def wait_for_file(waitee: Path, wait_time: int) -> Path:
"""Wait for `waitee` to exist, then return fully-resolved path."""
elapsed_time = 0
sleep = 5
while not waitee.exists():
LOGGER.info(f"waiting for {waitee} ({sleep}s intervals)...")
time.sleep(sleep)
elapsed_time += sleep
if elapsed_time >= wait_time:
raise argparse.ArgumentTypeError(
f"FileNotFoundError: waited {wait_time}s [{waitee}]"
)
return waitee.resolve()

# helper args
parser.add_argument(
"--dryrun",
default=False,
action="store_true",
help="does everything except submitting the worker(s)",
)
parser.add_argument(
"--spool",
default=False,
action="store_true",
help="whether to spool (persist) logs -- if not given, logs are not kept",
)

# worker args
parser.add_argument(
"--worker-memory-bytes",
required=True,
type=int,
help="amount of worker memory (bytes)",
)
parser.add_argument(
"--worker-disk-bytes",
required=True,
type=int,
help="amount of worker disk (bytes)",
)
parser.add_argument(
"--n-cores",
default=1,
type=int,
help="number of cores per worker",
)
parser.add_argument(
"--n-workers",
required=True,
type=int,
help="number of worker to start",
)
parser.add_argument(
"--max-worker-runtime",
required=True,
type=int,
help="how long each worker is allowed to run",
)
parser.add_argument(
"--priority",
required=True,
help="relative priority of this job/jobs",
)

# client args
parser.add_argument(
"--client-args",
required=False,
nargs="*",
type=lambda x: argparse_tools.validate_arg(
x.split(":", maxsplit=1),
len(x.split(":", maxsplit=1)) == 2,
ValueError('must " "-delimited series of "clientarg:value"-tuples'),
),
help="n 'key:value' pairs containing the python CL arguments to pass to skymap_scanner.client",
)
parser.add_argument(
"--client-startup-json",
help="The 'startup.json' file to startup each client",
type=lambda x: wait_for_file(
Path(x),
ENV.CLIENT_STARTER_WAIT_FOR_STARTUP_JSON,
),
)
parser.add_argument(
"--image",
required=True,
help="a path or url to the workers' image",
)

0 comments on commit 88df7f0

Please sign in to comment.