diff --git a/dependencies-from-Dockerfile.log b/dependencies-from-Dockerfile.log index efb59ab8..2dc2b65b 100644 --- a/dependencies-from-Dockerfile.log +++ b/dependencies-from-Dockerfile.log @@ -7,8 +7,8 @@ # pip freeze ######################################################################## backoff==2.2.1 -boto3==1.34.1 -botocore==1.34.1 +boto3==1.34.3 +botocore==1.34.3 cachetools==5.3.2 certifi==2023.11.17 cffi==1.16.0 @@ -29,15 +29,15 @@ jmespath==1.0.1 kubernetes==28.1.0 motor==3.3.2 oauthlib==3.2.2 -opentelemetry-api==1.21.0 +opentelemetry-api==1.22.0 opentelemetry-exporter-jaeger==1.21.0 opentelemetry-exporter-jaeger-proto-grpc==1.21.0 opentelemetry-exporter-jaeger-thrift==1.21.0 -opentelemetry-exporter-otlp-proto-common==1.21.0 -opentelemetry-exporter-otlp-proto-http==1.21.0 -opentelemetry-proto==1.21.0 -opentelemetry-sdk==1.21.0 -opentelemetry-semantic-conventions==0.42b0 +opentelemetry-exporter-otlp-proto-common==1.22.0 +opentelemetry-exporter-otlp-proto-http==1.22.0 +opentelemetry-proto==1.22.0 +opentelemetry-sdk==1.22.0 +opentelemetry-semantic-conventions==0.43b0 protobuf==4.25.1 pyasn1==0.5.1 pyasn1-modules==0.3.0 @@ -74,16 +74,16 @@ cryptography==41.0.7 pip==23.2.1 pipdeptree==2.13.1 setuptools==65.5.1 -skydriver-clientmanager -├── boto3 [required: Any, installed: 1.34.1] -│ ├── botocore [required: >=1.34.1,<1.35.0, installed: 1.34.1] +skydriver-clientmanager-ewms-sidecar +├── boto3 [required: Any, installed: 1.34.3] +│ ├── botocore [required: >=1.34.3,<1.35.0, installed: 1.34.3] │ │ ├── jmespath [required: >=0.7.1,<2.0.0, installed: 1.0.1] │ │ ├── python-dateutil [required: >=2.1,<3.0.0, installed: 2.8.2] │ │ │ └── six [required: >=1.5, installed: 1.16.0] │ │ └── urllib3 [required: >=1.25.4,<2.1, installed: 1.26.18] │ ├── jmespath [required: >=0.7.1,<2.0.0, installed: 1.0.1] │ └── s3transfer [required: >=0.9.0,<0.10.0, installed: 0.9.0] -│ └── botocore [required: >=1.33.2,<2.0a.0, installed: 1.34.1] +│ └── botocore [required: >=1.33.2,<2.0a.0, installed: 1.34.3] │ ├── jmespath [required: >=0.7.1,<2.0.0, installed: 1.0.1] │ ├── python-dateutil [required: >=2.1,<3.0.0, installed: 2.8.2] │ │ └── six [required: >=1.5, installed: 1.16.0] @@ -169,7 +169,7 @@ wheel==0.42.0 wipac-telemetry==0.3.0 ├── coloredlogs [required: Any, installed: 15.0.1] │ └── humanfriendly [required: >=9.1, installed: 10.0] -├── opentelemetry-api [required: Any, installed: 1.21.0] +├── opentelemetry-api [required: Any, installed: 1.22.0] │ ├── Deprecated [required: >=1.2.6, installed: 1.2.14] │ │ └── wrapt [required: >=1.10,<2, installed: 1.16.0] │ └── importlib-metadata [required: >=6.0,<7.0, installed: 6.11.0] @@ -179,72 +179,72 @@ wipac-telemetry==0.3.0 │ │ ├── googleapis-common-protos [required: ~=1.52,<1.60.0, installed: 1.59.1] │ │ │ └── protobuf [required: >=3.19.5,<5.0.0.dev0,!=4.21.5,!=4.21.4,!=4.21.3,!=4.21.2,!=4.21.1,!=3.20.1,!=3.20.0, installed: 4.25.1] │ │ ├── grpcio [required: >=1.0.0,<2.0.0, installed: 1.60.0] -│ │ ├── opentelemetry-api [required: ~=1.3, installed: 1.21.0] +│ │ ├── opentelemetry-api [required: ~=1.3, installed: 1.22.0] │ │ │ ├── Deprecated [required: >=1.2.6, installed: 1.2.14] │ │ │ │ └── wrapt [required: >=1.10,<2, installed: 1.16.0] │ │ │ └── importlib-metadata [required: >=6.0,<7.0, installed: 6.11.0] │ │ │ └── zipp [required: >=0.5, installed: 3.17.0] -│ │ └── opentelemetry-sdk [required: ~=1.11, installed: 1.21.0] -│ │ ├── opentelemetry-api [required: ==1.21.0, installed: 1.21.0] +│ │ └── opentelemetry-sdk [required: ~=1.11, installed: 1.22.0] +│ │ ├── opentelemetry-api [required: ==1.22.0, installed: 1.22.0] │ │ │ ├── Deprecated [required: >=1.2.6, installed: 1.2.14] │ │ │ │ └── wrapt [required: >=1.10,<2, installed: 1.16.0] │ │ │ └── importlib-metadata [required: >=6.0,<7.0, installed: 6.11.0] │ │ │ └── zipp [required: >=0.5, installed: 3.17.0] -│ │ ├── opentelemetry-semantic-conventions [required: ==0.42b0, installed: 0.42b0] +│ │ ├── opentelemetry-semantic-conventions [required: ==0.43b0, installed: 0.43b0] │ │ └── typing-extensions [required: >=3.7.4, installed: 4.9.0] │ └── opentelemetry-exporter-jaeger-thrift [required: ==1.21.0, installed: 1.21.0] -│ ├── opentelemetry-api [required: ~=1.3, installed: 1.21.0] +│ ├── opentelemetry-api [required: ~=1.3, installed: 1.22.0] │ │ ├── Deprecated [required: >=1.2.6, installed: 1.2.14] │ │ │ └── wrapt [required: >=1.10,<2, installed: 1.16.0] │ │ └── importlib-metadata [required: >=6.0,<7.0, installed: 6.11.0] │ │ └── zipp [required: >=0.5, installed: 3.17.0] -│ ├── opentelemetry-sdk [required: ~=1.11, installed: 1.21.0] -│ │ ├── opentelemetry-api [required: ==1.21.0, installed: 1.21.0] +│ ├── opentelemetry-sdk [required: ~=1.11, installed: 1.22.0] +│ │ ├── opentelemetry-api [required: ==1.22.0, installed: 1.22.0] │ │ │ ├── Deprecated [required: >=1.2.6, installed: 1.2.14] │ │ │ │ └── wrapt [required: >=1.10,<2, installed: 1.16.0] │ │ │ └── importlib-metadata [required: >=6.0,<7.0, installed: 6.11.0] │ │ │ └── zipp [required: >=0.5, installed: 3.17.0] -│ │ ├── opentelemetry-semantic-conventions [required: ==0.42b0, installed: 0.42b0] +│ │ ├── opentelemetry-semantic-conventions [required: ==0.43b0, installed: 0.43b0] │ │ └── typing-extensions [required: >=3.7.4, installed: 4.9.0] │ └── thrift [required: >=0.10.0, installed: 0.16.0] │ └── six [required: >=1.7.2, installed: 1.16.0] -├── opentelemetry-exporter-otlp-proto-http [required: Any, installed: 1.21.0] +├── opentelemetry-exporter-otlp-proto-http [required: Any, installed: 1.22.0] │ ├── backoff [required: >=1.10.0,<3.0.0, installed: 2.2.1] │ ├── Deprecated [required: >=1.2.6, installed: 1.2.14] │ │ └── wrapt [required: >=1.10,<2, installed: 1.16.0] │ ├── googleapis-common-protos [required: ~=1.52, installed: 1.59.1] │ │ └── protobuf [required: >=3.19.5,<5.0.0.dev0,!=4.21.5,!=4.21.4,!=4.21.3,!=4.21.2,!=4.21.1,!=3.20.1,!=3.20.0, installed: 4.25.1] -│ ├── opentelemetry-api [required: ~=1.15, installed: 1.21.0] +│ ├── opentelemetry-api [required: ~=1.15, installed: 1.22.0] │ │ ├── Deprecated [required: >=1.2.6, installed: 1.2.14] │ │ │ └── wrapt [required: >=1.10,<2, installed: 1.16.0] │ │ └── importlib-metadata [required: >=6.0,<7.0, installed: 6.11.0] │ │ └── zipp [required: >=0.5, installed: 3.17.0] -│ ├── opentelemetry-exporter-otlp-proto-common [required: ==1.21.0, installed: 1.21.0] +│ ├── opentelemetry-exporter-otlp-proto-common [required: ==1.22.0, installed: 1.22.0] │ │ ├── backoff [required: >=1.10.0,<3.0.0, installed: 2.2.1] -│ │ └── opentelemetry-proto [required: ==1.21.0, installed: 1.21.0] +│ │ └── opentelemetry-proto [required: ==1.22.0, installed: 1.22.0] │ │ └── protobuf [required: >=3.19,<5.0, installed: 4.25.1] -│ ├── opentelemetry-proto [required: ==1.21.0, installed: 1.21.0] +│ ├── opentelemetry-proto [required: ==1.22.0, installed: 1.22.0] │ │ └── protobuf [required: >=3.19,<5.0, installed: 4.25.1] -│ ├── opentelemetry-sdk [required: ~=1.21.0, installed: 1.21.0] -│ │ ├── opentelemetry-api [required: ==1.21.0, installed: 1.21.0] +│ ├── opentelemetry-sdk [required: ~=1.22.0, installed: 1.22.0] +│ │ ├── opentelemetry-api [required: ==1.22.0, installed: 1.22.0] │ │ │ ├── Deprecated [required: >=1.2.6, installed: 1.2.14] │ │ │ │ └── wrapt [required: >=1.10,<2, installed: 1.16.0] │ │ │ └── importlib-metadata [required: >=6.0,<7.0, installed: 6.11.0] │ │ │ └── zipp [required: >=0.5, installed: 3.17.0] -│ │ ├── opentelemetry-semantic-conventions [required: ==0.42b0, installed: 0.42b0] +│ │ ├── opentelemetry-semantic-conventions [required: ==0.43b0, installed: 0.43b0] │ │ └── typing-extensions [required: >=3.7.4, installed: 4.9.0] │ └── requests [required: ~=2.7, installed: 2.31.0] │ ├── certifi [required: >=2017.4.17, installed: 2023.11.17] │ ├── charset-normalizer [required: >=2,<4, installed: 3.3.2] │ ├── idna [required: >=2.5,<4, installed: 3.6] │ └── urllib3 [required: >=1.21.1,<3, installed: 1.26.18] -├── opentelemetry-sdk [required: Any, installed: 1.21.0] -│ ├── opentelemetry-api [required: ==1.21.0, installed: 1.21.0] +├── opentelemetry-sdk [required: Any, installed: 1.22.0] +│ ├── opentelemetry-api [required: ==1.22.0, installed: 1.22.0] │ │ ├── Deprecated [required: >=1.2.6, installed: 1.2.14] │ │ │ └── wrapt [required: >=1.10,<2, installed: 1.16.0] │ │ └── importlib-metadata [required: >=6.0,<7.0, installed: 6.11.0] │ │ └── zipp [required: >=0.5, installed: 3.17.0] -│ ├── opentelemetry-semantic-conventions [required: ==0.42b0, installed: 0.42b0] +│ ├── opentelemetry-semantic-conventions [required: ==0.43b0, installed: 0.43b0] │ └── typing-extensions [required: >=3.7.4, installed: 4.9.0] ├── protobuf [required: Any, installed: 4.25.1] ├── typing-extensions [required: Any, installed: 4.9.0] diff --git a/ewms_sidecar/condor/stopper.py b/ewms_sidecar/condor/stopper.py deleted file mode 100644 index 79d68f44..00000000 --- a/ewms_sidecar/condor/stopper.py +++ /dev/null @@ -1,30 +0,0 @@ -"""For stopping Skymap Scanner clients on an HTCondor cluster.""" - - -import htcondor # type: ignore[import] - -from ..config import LOGGER - - -def stop( - collector: str, - schedd: str, - cluster_id: str, - schedd_obj: htcondor.Schedd, -) -> None: - """Main logic.""" - LOGGER.info( - f"Stopping Skymap Scanner client workers on {cluster_id} / {collector} / {schedd}" - ) - - # Remove workers -- may not be instantaneous - LOGGER.info("Requesting removal...") - act_obj = schedd_obj.act( - htcondor.JobAction.Remove, - f"ClusterId == {cluster_id}", - reason="Requested by SkyDriver", - ) - LOGGER.debug(act_obj) - LOGGER.info(f"Removed {act_obj['TotalSuccess']} workers") - - # TODO: get/forward worker logs diff --git a/ewms_sidecar/config.py b/ewms_sidecar/config.py index c880ccc3..6d81010d 100644 --- a/ewms_sidecar/config.py +++ b/ewms_sidecar/config.py @@ -7,7 +7,7 @@ from wipac_dev_tools import from_environment_as_dataclass -LOGGER = logging.getLogger("clientmanager") +LOGGER = logging.getLogger("ewms-sidecar") LOCAL_K8S_HOST = "local" @@ -34,13 +34,6 @@ class EnvConfig: CLIENT_STARTER_WAIT_FOR_STARTUP_JSON: int = 60 CONDOR_TOKEN: str = "" # - WORKER_K8S_TOKEN: str = "" - WORKER_K8S_CACERT: str = "" - WORKER_K8S_CONFIG_FILE_BASE64: str = "" - # local k8s - WORKER_K8S_LOCAL_APPLICATION_NAME: str = "" - WORKER_K8S_LOCAL_WORKERS_MAX: int = 3 # don't want too many *local* workers - # EWMS_PILOT_QUARANTINE_TIME: int = 0 # EWMS_TMS_S3_ACCESS_KEY_ID: str = "" diff --git a/ewms_sidecar/clientmanager.py b/ewms_sidecar/ewms_sidecar.py similarity index 100% rename from ewms_sidecar/clientmanager.py rename to ewms_sidecar/ewms_sidecar.py diff --git a/ewms_sidecar/k8s/__init__.py b/ewms_sidecar/k8s/__init__.py deleted file mode 100644 index a526f44f..00000000 --- a/ewms_sidecar/k8s/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -"""Init.""" - -from .act import act # noqa: F401 diff --git a/ewms_sidecar/k8s/act.py b/ewms_sidecar/k8s/act.py deleted file mode 100644 index c97a51ad..00000000 --- a/ewms_sidecar/k8s/act.py +++ /dev/null @@ -1,137 +0,0 @@ -"""The post-argparse entry point for k8s actions.""" - - -import argparse -import base64 -import time -from tempfile import NamedTemporaryFile - -import kubernetes # type: ignore[import-untyped] - -from .. import utils -from ..config import ENV, LOCAL_K8S_HOST, LOGGER -from . import starter, stopper - - -def act(args: argparse.Namespace) -> None: - """Do the action.""" - k8s_client_config = kubernetes.client.Configuration() - - # Creating K8S cluster client - # Local - if args.host == LOCAL_K8S_HOST: - LOGGER.info("connecting to local k8s...") - # use *this* pod's service account - kubernetes.config.load_incluster_config(k8s_client_config) - # Using config file + token - elif ENV.WORKER_K8S_CONFIG_FILE_BASE64 and ENV.WORKER_K8S_TOKEN: - LOGGER.info("connecting to remote k8s via config file + token...") - # connect to remote host - with NamedTemporaryFile(delete=False) as tempf: - tempf.write(base64.b64decode(ENV.WORKER_K8S_CONFIG_FILE_BASE64)) - LOGGER.info("loading k8s configuration...") - kubernetes.config.load_kube_config( - config_file=tempf.name, - client_configuration=k8s_client_config, - ) - k8s_client_config.host = args.host - k8s_client_config.api_key["authorization"] = ENV.WORKER_K8S_TOKEN - # Using CA cert + token - elif ENV.WORKER_K8S_CACERT and ENV.WORKER_K8S_TOKEN: - # https://medium.com/@jankrynauw/run-a-job-on-google-kubernetes-engine-using-the-python-client-library-and-not-kubectl-4ee8bdd55b1b - LOGGER.info("connecting to remote k8s via ca cert + token...") - with NamedTemporaryFile(delete=False) as tempf: - tempf.write(base64.b64decode(ENV.WORKER_K8S_CACERT)) - k8s_client_config.ssl_ca_cert = tempf.name - k8s_client_config.host = args.host - k8s_client_config.verify_ssl = True - k8s_client_config.debug = True # remove? - k8s_client_config.api_key = {"authorization": "Bearer " + ENV.WORKER_K8S_TOKEN} - k8s_client_config.assert_hostname = False - kubernetes.client.Configuration.set_default(k8s_client_config) - else: - raise RuntimeError( - f"Did not provide sufficient configuration to connect to {args.host}" - ) - - # connect & go - with kubernetes.client.ApiClient(k8s_client_config) as k8s_api: - try: - LOGGER.debug("testing k8s credentials") - resp = kubernetes.client.BatchV1Api(k8s_api).get_api_resources() - LOGGER.debug(resp) - except kubernetes.client.rest.ApiException as e: - LOGGER.exception(e) - raise - _act(args, k8s_api) - - -def _act(args: argparse.Namespace, k8s_api: kubernetes.client.ApiClient) -> None: - match args.action: - case "start": - cluster_id = f"skyscan-worker-{ENV.SKYSCAN_SKYDRIVER_SCAN_ID}-{int(time.time())}" # TODO: make more unique - LOGGER.info( - f"Starting {args.n_workers} Skymap Scanner client workers on " - f"{args.host}/{args.namespace}/{cluster_id}" - ) - # make connections -- do now so we don't have any surprises downstream - skydriver_rc = utils.connect_to_skydriver() - # start - k8s_job_dict = starter.prep( - cluster_id=cluster_id, - # k8s CL args - cpu_arch=args.cpu_arch, - host=args.host, - job_config_stub=args.job_config_stub, - namespace=args.namespace, - # starter CL args -- worker - worker_memory_bytes=args.worker_memory_bytes, - worker_disk_bytes=args.worker_disk_bytes, - n_cores=args.n_cores, - n_workers=args.n_workers, - # starter CL args -- client - client_args=args.client_args if args.client_args else [], - client_startup_json_s3=utils.s3ify(args.client_startup_json), - container_image=args.image, - ) - # final checks - if args.dryrun: - LOGGER.critical("Script Aborted: dryrun enabled") - return - if utils.skydriver_aborted_scan(skydriver_rc): - LOGGER.critical("Script Aborted: SkyDriver aborted scan") - return - # start - k8s_job_dict = starter.start( - k8s_api, - k8s_job_dict, - cluster_id, - args.host, - args.namespace, - ) - # report to SkyDriver - utils.update_skydriver( - skydriver_rc, - "k8s", - location={ - "host": args.host, - "namespace": args.namespace, - }, - uuid=args.uuid, - cluster_id=cluster_id, - n_workers=args.n_workers, - starter_info=k8s_job_dict, - ) - LOGGER.info("Sent cluster info to SkyDriver") - case "stop": - LOGGER.info( - f"Stopping Skymap Scanner client workers on " - f"{args.host}/{args.namespace}/{args.cluster_id}" - ) - stopper.stop( - args.namespace, - args.cluster_id, - k8s_api, - ) - case _: - raise RuntimeError(f"Unknown action: {args.action}") diff --git a/ewms_sidecar/k8s/k8s_tools.py b/ewms_sidecar/k8s/k8s_tools.py deleted file mode 100644 index 17a919be..00000000 --- a/ewms_sidecar/k8s/k8s_tools.py +++ /dev/null @@ -1,62 +0,0 @@ -"""An interface to the Kubernetes cluster.""" - - -import kubernetes.client # type: ignore[import] - -from ..config import ENV, LOCAL_K8S_HOST, LOGGER - - -def get_worker_k8s_secret_name(cluster_id: str) -> str: - return f"{cluster_id}-secret" - - -def patch_or_create_namespaced_secret( - k8s_core_api: kubernetes.client.CoreV1Api, - host: str, - namespace: str, - secret_name: str, - secret_type: str, - encoded_secret_data: dict[str, str], -) -> None: - """Patch secret and if not exist create.""" - - if host == LOCAL_K8S_HOST: - metadata = kubernetes.client.V1ObjectMeta( - name=secret_name, - labels={ - # https://argo-cd.readthedocs.io/en/stable/user-guide/resource_tracking/ - "app.kubernetes.io/instance": ENV.WORKER_K8S_LOCAL_APPLICATION_NAME, - }, - annotations={ - "argocd.argoproj.io/sync-options": "Prune=false" # don't want argocd to prune this job - }, - ) - else: - metadata = kubernetes.client.V1ObjectMeta(name=secret_name) - - # Instantiate the Secret object - body = kubernetes.client.V1Secret( - data=encoded_secret_data, - type=secret_type, - metadata=metadata, - ) - - # try to patch first - try: - k8s_core_api.patch_namespaced_secret(secret_name, namespace, body) - LOGGER.info(f"Secret {secret_name} in namespace {namespace} has been patched") - except kubernetes.client.rest.ApiException as e: - # a (None or 404) means we can create secret instead, see below - if e.status and e.status != 404: - LOGGER.exception(e) - raise - - # create if patch failed - try: - k8s_core_api.create_namespaced_secret(namespace=namespace, body=body) - LOGGER.info( - f"Created secret {secret_name} of type {secret_type} in namespace {namespace}" - ) - except kubernetes.client.rest.ApiException as e: - LOGGER.exception(e) - raise diff --git a/ewms_sidecar/k8s/starter.py b/ewms_sidecar/k8s/starter.py deleted file mode 100644 index 54b1a368..00000000 --- a/ewms_sidecar/k8s/starter.py +++ /dev/null @@ -1,246 +0,0 @@ -"""For starting Skymap Scanner clients on an K8s cluster.""" - - -import base64 -import json -import os -import pprint -from pathlib import Path -from typing import Any - -import kubernetes # type: ignore[import-untyped] - -from ..config import ( - ENV, - FORWARDED_ENV_VARS, - LOCAL_K8S_HOST, - LOGGER, - SECRET_FORWARDED_ENV_VARS, -) -from ..utils import S3File -from . import k8s_tools - - -def make_k8s_job_desc( - job_config_stub: Path, - # k8s args - host: str, - namespace: str, - cluster_id: str, - worker_memory_bytes: int, - worker_disk_bytes: int, - n_workers: int, - n_cores: int, - # skymap scanner args - container_image: str, - client_startup_json_s3: S3File, - add_client_args: list[tuple[str, str]], - # special args for the cloud - cpu_arch: str, - # env vars for secrets - secret_env_vars: list[str], -) -> dict[str, Any]: - """Make the k8s job description (submit object).""" - with open(job_config_stub, "r") as f: - k8s_job_dict = json.load(f) - - # multiple different variations add to these... - for meta_field in ["labels", "annotations"]: - if meta_field not in k8s_job_dict["metadata"]: - k8s_job_dict["metadata"][meta_field] = {} - - # ARM-specific fields - # TODO: cleanup these ifs - if cpu_arch == "arm": - cpu_arch = "arm64" - else: - # elif cpu_arch == "x86": - cpu_arch = "amd64" - # labels - k8s_job_dict["metadata"]["labels"].update({"kubernetes.io/arch": cpu_arch}) - # affinity - k8s_job_dict["spec"]["template"]["spec"]["affinity"] = { - "nodeAffinity": { - "requiredDuringSchedulingIgnoredDuringExecution": { - "nodeSelectorTerms": [ - { - "matchExpressions": [ - { - "key": "kubernetes.io/arch", - "operator": "In", - "values": [cpu_arch], - } - ] - } - ] - } - } - } - - # Setting metadata - k8s_job_dict["metadata"]["namespace"] = namespace - k8s_job_dict["metadata"]["name"] = cluster_id - if host == LOCAL_K8S_HOST: - k8s_job_dict["metadata"]["labels"].update( - { - # https://argo-cd.readthedocs.io/en/stable/user-guide/resource_tracking/ - "app.kubernetes.io/instance": ENV.WORKER_K8S_LOCAL_APPLICATION_NAME, - } - ) - k8s_job_dict["metadata"]["annotations"].update( - { - "argocd.argoproj.io/sync-options": "Prune=false" # don't want argocd to prune this job - } - ) - - # Setting parallelism - k8s_job_dict["spec"]["completions"] = n_workers - k8s_job_dict["spec"]["parallelism"] = n_workers - - # set memory & # cores - k8s_job_dict["spec"]["template"]["spec"]["containers"][0]["resources"] = { - "limits": { - "cpu": str(n_cores), - # TODO: give a bit more just in case? - "memory": str(worker_memory_bytes), - "ephemeral-storage": str(worker_disk_bytes), - }, - "requests": { - "cpu": str(n_cores), - "memory": str(worker_memory_bytes), - "ephemeral-storage": str(worker_disk_bytes), - }, - } - - # Setting JSON input file url - k8s_job_dict["spec"]["template"]["spec"]["initContainers"][0]["env"][0][ - "value" - ] = client_startup_json_s3.url - - def add_override_env(new_env_dicts: list[dict[str, Any]]) -> None: - k8s_job_dict["spec"]["template"]["spec"]["containers"][0]["env"] = [ - x - for x in k8s_job_dict["spec"]["template"]["spec"]["containers"][0]["env"] - if x["name"] not in new_env_dicts - ] + new_env_dicts - - # Forward all env vars: ex. SKYSCAN_* & EWMS_* - add_override_env( - [{"name": var, "value": os.environ[var]} for var in FORWARDED_ENV_VARS] - ) - # now add/override any env vars that need to be in a secret - add_override_env( - [ - { - "name": v, # "SKYDRIVER_TOKEN" - "valueFrom": { - "secretKeyRef": { - "name": k8s_tools.get_worker_k8s_secret_name(cluster_id), - "key": v.lower(), # "skydriver_token" - } - }, - } - for v in secret_env_vars - ] - ) - - # Container image - k8s_job_dict["spec"]["template"]["spec"]["containers"][0]["image"] = container_image - - # Adding more args to client - client_args = k8s_job_dict["spec"]["template"]["spec"]["containers"][0]["args"] - for carg, value in add_client_args: - client_args.append(f"--{carg}") - client_args.append(f"{value}") - k8s_job_dict["spec"]["template"]["spec"]["containers"][0]["args"] = client_args - - return k8s_job_dict # type: ignore[no-any-return] - - -def prep( - cluster_id: str, - # k8s CL args - job_config_stub: Path, - host: str, - namespace: str, - cpu_arch: str, - # starter CL args -- worker - worker_memory_bytes: int, - worker_disk_bytes: int, - n_workers: int, - n_cores: int, - # starter CL args -- client - client_args: list[tuple[str, str]], - client_startup_json_s3: S3File, - container_image: str, -) -> dict[str, Any]: - """Create objects needed for starting cluster.""" - if host == LOCAL_K8S_HOST and n_workers > ENV.WORKER_K8S_LOCAL_WORKERS_MAX: - LOGGER.warning( - f"Requested more workers ({n_workers}) than the max allowed {ENV.WORKER_K8S_LOCAL_WORKERS_MAX}. Using the maximum instead." - ) - n_workers = ENV.WORKER_K8S_LOCAL_WORKERS_MAX - - # make k8s job description - k8s_job_dict = make_k8s_job_desc( - job_config_stub, - host, - namespace, - cluster_id, - # condor args - worker_memory_bytes, - worker_disk_bytes, - n_workers, - n_cores, - # skymap scanner args - container_image, - client_startup_json_s3, - client_args, - cpu_arch, - # env vars for secrets - SECRET_FORWARDED_ENV_VARS, - ) - try: - # must be natively json-encodable - LOGGER.info(json.dumps(k8s_job_dict, indent=4)) - except json.decoder.JSONDecodeError: - LOGGER.info(pprint.pformat(k8s_job_dict, indent=4)) - raise - - return k8s_job_dict - - -def start( - k8s_api: kubernetes.client.ApiClient, - k8s_job_dict: dict[str, Any], - cluster_id: str, - # k8s CL args - host: str, - namespace: str, -) -> dict[str, Any]: - """Start cluster.""" - - # create namespace - # kubernetes.client.CoreV1Api(k8s_api).create_namespace( - # kubernetes.client.V1Namespace( - # metadata=kubernetes.client.V1ObjectMeta(name=namespace) - # ) - # ) - - # create secret - k8s_tools.patch_or_create_namespaced_secret( - kubernetes.client.CoreV1Api(k8s_api), - host, - namespace, - k8s_tools.get_worker_k8s_secret_name(cluster_id), - "opaque", - { - v.lower(): base64.b64encode(os.environ[v].encode("ascii")).decode("utf-8") - for v in SECRET_FORWARDED_ENV_VARS - }, - ) - - # submit jobs - kubernetes.utils.create_from_dict(k8s_api, k8s_job_dict, namespace=namespace) - - return k8s_job_dict diff --git a/ewms_sidecar/k8s/stopper.py b/ewms_sidecar/k8s/stopper.py deleted file mode 100644 index 72e4ad17..00000000 --- a/ewms_sidecar/k8s/stopper.py +++ /dev/null @@ -1,42 +0,0 @@ -"""For stopping Skymap Scanner clients on a K8s cluster.""" - - -import kubernetes # type: ignore[import] - -from ..config import LOGGER -from . import k8s_tools - - -def stop( - namespace: str, - cluster_id: str, - k8s_api: kubernetes.client.ApiClient, -) -> None: - """Main logic.""" - - # Remove workers -- may not be instantaneous - LOGGER.info("Requesting removal...") - resp = kubernetes.client.BatchV1Api(k8s_api).delete_namespaced_job( - name=cluster_id, - namespace=namespace, - body=kubernetes.client.V1DeleteOptions( - propagation_policy="Foreground", grace_period_seconds=5 - ), - ) - LOGGER.info( - f"Removed workers: {cluster_id} in namespace {namespace} with response {resp.status} " - ) - - # Remove secret -- may not be instantaneous - resp = kubernetes.client.CoreV1Api(k8s_api).delete_namespaced_secret( - name=k8s_tools.get_worker_k8s_secret_name(cluster_id), - namespace=namespace, - body=kubernetes.client.V1DeleteOptions( - propagation_policy="Foreground", grace_period_seconds=5 - ), - ) - LOGGER.info( - f"Removed secret: {k8s_tools.get_worker_k8s_secret_name(cluster_id)} in namespace {namespace} with response {resp.status} " - ) - - # TODO: get/forward job logs diff --git a/setup.cfg b/setup.cfg index 7bc58675..15370e2c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -4,16 +4,17 @@ python_max = 3.11 package_dirs = skydriver clientmanager + ewms_sidecar [metadata] # generated by wipac:cicd_setup_builder: name, version, keywords version = attr: skydriver.__version__ keywords = WIPAC IceCube -name = skydriver-clientmanager +name = skydriver-clientmanager-ewms-sidecar [semantic_release] # fully-generated by wipac:cicd_setup_builder -version_variable = skydriver/__init__.py:__version__,clientmanager/__init__.py:__version__ +version_variable = skydriver/__init__.py:__version__,clientmanager/__init__.py:__version__,ewms_sidecar/__init__.py:__version__ upload_to_pypi = False patch_without_tag = True commit_parser = semantic_release.history.emoji_parser @@ -56,8 +57,10 @@ mypy = include = skydriver clientmanager + ewms_sidecar skydriver.* clientmanager.* + ewms_sidecar.* exclude = test tests