Skip to content

Commit

Permalink
[duplicating clientmanager/: merge over ewms_sidecar]
Browse files Browse the repository at this point in the history
  • Loading branch information
ric-evans committed Dec 18, 2023
2 parents 4355f70 + 2ade2c9 commit bdde649
Show file tree
Hide file tree
Showing 17 changed files with 1,566 additions and 0 deletions.
16 changes: 16 additions & 0 deletions ewms_sidecar/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""Public init."""

# version is a human-readable version number.

# version_info is a four-tuple for programmatic comparison. The first
# three numbers are the components of the version number. The fourth
# is zero for an official release, positive for a development branch,
# or negative for a release candidate or beta (after the base version
# number has been incremented)
__version__ = "1.0.1"
version_info = (
int(__version__.split(".")[0]),
int(__version__.split(".")[1]),
int(__version__.split(".")[2]),
0,
)
7 changes: 7 additions & 0 deletions ewms_sidecar/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""Entry-point to start up clientmanager service."""

from . import clientmanager, config

if __name__ == "__main__":
clientmanager.main()
config.LOGGER.info("Done.")
219 changes: 219 additions & 0 deletions ewms_sidecar/clientmanager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
"""The central module."""


import argparse
import time
from pathlib import Path

from wipac_dev_tools import argparse_tools, logging_tools

from . import condor, k8s
from .config import ENV, LOGGER


def main() -> None:
"""Main."""
parser = argparse.ArgumentParser(
description="Manage Skymap Scanner client workers",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)

parser.add_argument(
"--uuid",
required=True,
help="the uuid for the cluster",
)

# orchestrator
orch_subparsers = parser.add_subparsers(
required=True,
dest="orchestrator",
help="the resource orchestration tool to use for worker scheduling",
)
OrchestratorArgs.condor(
orch_condor_parser := orch_subparsers.add_parser(
"condor", help="orchestrate with HTCondor"
)
)
OrchestratorArgs.k8s(
orch_k8s_parser := orch_subparsers.add_parser(
"k8s", help="orchestrate with Kubernetes"
)
)

# action -- add sub-parser to each sub-parser (can't add multiple sub-parsers)
for p in [orch_condor_parser, orch_k8s_parser]:
act_subparsers = p.add_subparsers(
required=True,
dest="action",
help="the action to perform on the worker cluster",
)
ActionArgs.starter(act_subparsers.add_parser("start", help="start workers"))
ActionArgs.stopper(act_subparsers.add_parser("stop", help="stop workers"))

# parse args & set up logging
args = parser.parse_args()
logging_tools.set_level(
"DEBUG", # os.getenv("SKYSCAN_LOG", "INFO"), # type: ignore[arg-type]
first_party_loggers=LOGGER,
third_party_level=ENV.SKYSCAN_LOG_THIRD_PARTY, # type: ignore[arg-type]
use_coloredlogs=True, # for formatting
future_third_parties=["boto3", "botocore"],
)
logging_tools.log_argparse_args(args, logger=LOGGER, level="WARNING")

# Go!
match args.orchestrator:
case "condor":
condor.act(args)
case "k8s":
k8s.act(args)
case other:
raise RuntimeError(f"Orchestrator not supported: {other}")


class OrchestratorArgs:
@staticmethod
def condor(sub_parser: argparse.ArgumentParser) -> None:
"""Add args to subparser."""
sub_parser.add_argument(
"--collector",
default="",
help="the full URL address of the HTCondor collector server. Ex: foo-bar.icecube.wisc.edu",
)
sub_parser.add_argument(
"--schedd",
default="",
help="the full DNS name of the HTCondor Schedd server. Ex: baz.icecube.wisc.edu",
)

@staticmethod
def k8s(sub_parser: argparse.ArgumentParser) -> None:
"""Add args to subparser."""
sub_parser.add_argument(
"--host",
required=True,
help="the host server address to connect to for running workers",
)
sub_parser.add_argument(
"--namespace",
required=True,
help="the k8s namespace to use for running workers",
)
sub_parser.add_argument(
"--cpu-arch",
default="x64",
help="which CPU architecture to use for running workers",
)
sub_parser.add_argument(
"--job-config-stub",
type=Path,
default=Path("resources/worker_k8s_job_stub.json"),
help="worker k8s job config file to dynamically complete, then run (json)",
)


class ActionArgs:
@staticmethod
def starter(sub_parser: argparse.ArgumentParser) -> None:
"""Add args to subparser."""

def wait_for_file(waitee: Path, wait_time: int) -> Path:
"""Wait for `waitee` to exist, then return fullly-resolved path."""
elapsed_time = 0
sleep = 5
while not waitee.exists():
LOGGER.info(f"waiting for {waitee} ({sleep}s intervals)...")
time.sleep(sleep)
elapsed_time += sleep
if elapsed_time >= wait_time:
raise argparse.ArgumentTypeError(
f"FileNotFoundError: waited {wait_time}s [{waitee}]"
)
return waitee.resolve()

# helper args
sub_parser.add_argument(
"--dryrun",
default=False,
action="store_true",
help="does everything except submitting the worker(s)",
)
sub_parser.add_argument(
"--spool",
default=False,
action="store_true",
help="whether to spool (persist) logs -- if not given, logs are not kept",
)

# worker args
sub_parser.add_argument(
"--worker-memory-bytes",
required=True,
type=int,
help="amount of worker memory (bytes)",
)
sub_parser.add_argument(
"--worker-disk-bytes",
required=True,
type=int,
help="amount of worker disk (bytes)",
)
sub_parser.add_argument(
"--n-cores",
default=1,
type=int,
help="number of cores per worker",
)
sub_parser.add_argument(
"--n-workers",
required=True,
type=int,
help="number of worker to start",
)
sub_parser.add_argument(
"--max-worker-runtime",
required=True,
type=int,
help="how long each worker is allowed to run -- condor only", # TODO - set for k8s?
)
sub_parser.add_argument(
"--priority",
required=True,
help="relative priority of this job/jobs -- condor only", # TODO - set for k8s?
)

# client args
sub_parser.add_argument(
"--client-args",
required=False,
nargs="*",
type=lambda x: argparse_tools.validate_arg(
x.split(":", maxsplit=1),
len(x.split(":", maxsplit=1)) == 2,
ValueError('must " "-delimited series of "clientarg:value"-tuples'),
),
help="n 'key:value' pairs containing the python CL arguments to pass to skymap_scanner.client",
)
sub_parser.add_argument(
"--client-startup-json",
help="The 'startup.json' file to startup each client",
type=lambda x: wait_for_file(
Path(x),
ENV.CLIENT_STARTER_WAIT_FOR_STARTUP_JSON,
),
)
sub_parser.add_argument(
"--image",
required=True,
help="a path or url to the workers' image",
)

@staticmethod
def stopper(sub_parser: argparse.ArgumentParser) -> None:
"""Add args to subparser."""
sub_parser.add_argument(
"--cluster-id",
required=True,
help="the cluster id of the workers to be stopped/removed",
)
3 changes: 3 additions & 0 deletions ewms_sidecar/condor/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""Init."""

from .act import act # noqa: F401
95 changes: 95 additions & 0 deletions ewms_sidecar/condor/act.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""The post-argparse entry point for condor actions."""


import argparse

import htcondor # type: ignore[import-untyped]

from .. import utils
from ..config import ENV, LOGGER
from . import condor_tools, starter, stopper, watcher


def act(args: argparse.Namespace) -> None:
"""Do the action."""
htcondor.set_subsystem("TOOL")
htcondor.param["TOOL_DEBUG"] = "D_FULLDEBUG"
# htcondor.param["TOOL_LOG"] = "log.txt"
# htcondor.enable_log()
htcondor.enable_debug()

# condor auth & go
with htcondor.SecMan() as secman:
secman.setToken(htcondor.Token(ENV.CONDOR_TOKEN))
schedd_obj = condor_tools.get_schedd_obj(args.collector, args.schedd)
_act(args, schedd_obj)


def _act(args: argparse.Namespace, schedd_obj: htcondor.Schedd) -> None:
match args.action:
case "start":
LOGGER.info(
f"Starting {args.n_workers} Skymap Scanner client workers on {args.collector} / {args.schedd}"
)
# make connections -- do now so we don't have any surprises downstream
skydriver_rc = utils.connect_to_skydriver()
# start
submit_dict = starter.prep(
spool=args.spool,
# starter CL args -- worker
worker_memory_bytes=args.worker_memory_bytes,
worker_disk_bytes=args.worker_disk_bytes,
n_cores=args.n_cores,
max_worker_runtime=args.max_worker_runtime,
priority=args.priority,
# starter CL args -- client
client_args=args.client_args,
client_startup_json_s3=utils.s3ify(args.client_startup_json),
image=args.image,
)
# final checks
if args.dryrun:
LOGGER.critical("Script Aborted: dryrun enabled")
return
if utils.skydriver_aborted_scan(skydriver_rc):
LOGGER.critical("Script Aborted: SkyDriver aborted scan")
return
# start
submit_result_obj = starter.start(
schedd_obj=schedd_obj,
n_workers=args.n_workers,
submit_dict=submit_dict,
spool=args.spool,
)
# report to SkyDriver
skydriver_cluster_obj = dict(
orchestrator="condor",
location={
"collector": args.collector,
"schedd": args.schedd,
},
uuid=args.uuid,
cluster_id=submit_result_obj.cluster(),
n_workers=submit_result_obj.num_procs(),
starter_info=submit_dict,
)
utils.update_skydriver(skydriver_rc, **skydriver_cluster_obj)
LOGGER.info("Sent cluster info to SkyDriver")
watcher.watch(
args.collector,
args.schedd,
submit_result_obj.cluster(),
schedd_obj,
submit_result_obj.num_procs(),
skydriver_rc,
skydriver_cluster_obj,
)
case "stop":
stopper.stop(
args.collector,
args.schedd,
args.cluster_id,
schedd_obj,
)
case _:
raise RuntimeError(f"Unknown action: {args.action}")
45 changes: 45 additions & 0 deletions ewms_sidecar/condor/condor_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""Util functions wrapping common htcondor actions."""


import htcondor # type: ignore[import-untyped]

from ..config import LOGGER


def get_schedd_obj(collector: str, schedd: str) -> htcondor.Schedd:
"""Get object for talking with HTCondor schedd.
Examples:
`collector = "foo-bar.icecube.wisc.edu"`
`schedd = "baz.icecube.wisc.edu"`
"""
schedd_ad = htcondor.Collector(collector).locate( # ~> exception
htcondor.DaemonTypes.Schedd, schedd
)
schedd_obj = htcondor.Schedd(schedd_ad)
LOGGER.info(f"Connected to Schedd {collector=} {schedd=}")
return schedd_obj


IDLE = 1
RUNNING = 2
REMOVED = 3
COMPLETED = 4
HELD = 5
TRANSFERRING_OUTPUT = 6
SUSPENDED = 7

_STATUS_MAPPING = {
IDLE: "Idle",
RUNNING: "Running",
REMOVED: "Removed",
COMPLETED: "Completed",
HELD: "Held",
TRANSFERRING_OUTPUT: "Transferring Output",
SUSPENDED: "Suspended",
}


def job_status_to_str(status_code: int) -> str:
"""Get the human-readable string for the job status int."""
return _STATUS_MAPPING.get(status_code, f"Invalid status code: {status_code}")
Loading

0 comments on commit bdde649

Please sign in to comment.