From 4fc8be65cca6afd422f23e45fc38232601dae16b Mon Sep 17 00:00:00 2001 From: Akhilesh Halageri Date: Tue, 3 Sep 2024 22:13:00 +0000 Subject: [PATCH] feat(gc): slack integration --- pyproject.toml | 1 + .../resource_allocation/k8s/cronjob.py | 1 + zetta_utils/run/gc.py | 94 +++++++++++-------- 3 files changed, 58 insertions(+), 38 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index c7be86968..d9e0e8fee 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,6 +27,7 @@ dependencies = [ "packaging >= 23.2", "pdbp >= 1.5.3", "boto3 == 1.28.4", + "slack_sdk >= 3.31.0", ] description = "Zetta AI Connectomics Toolkit" keywords = ["neuroscience connectomics EM"] diff --git a/zetta_utils/cloud_management/resource_allocation/k8s/cronjob.py b/zetta_utils/cloud_management/resource_allocation/k8s/cronjob.py index 9261b3c1a..44edc1b7e 100644 --- a/zetta_utils/cloud_management/resource_allocation/k8s/cronjob.py +++ b/zetta_utils/cloud_management/resource_allocation/k8s/cronjob.py @@ -122,6 +122,7 @@ def configure_cronjob( resource_requests=resource_requests, ) if patch: + name = f"run-{name}" batch_v1_api.patch_namespaced_cron_job(name=name, namespace=namespace, body=cronjob) else: batch_v1_api.create_namespaced_cron_job(namespace=namespace, body=cronjob) diff --git a/zetta_utils/run/gc.py b/zetta_utils/run/gc.py index c18f5181a..a56f20b5b 100644 --- a/zetta_utils/run/gc.py +++ b/zetta_utils/run/gc.py @@ -12,7 +12,8 @@ import taskqueue from boto3.exceptions import Boto3Error from google.api_core.exceptions import GoogleAPICallError -from kubernetes.client.exceptions import ApiException as K8sApiException +from slack_sdk import WebClient +from slack_sdk.errors import SlackApiError from kubernetes import client as k8s_client # type: ignore from zetta_utils.cloud_management.resource_allocation.k8s import ( @@ -36,6 +37,14 @@ ) logger = get_logger("zetta_utils") +slack_client = WebClient(token=os.environ["SLACK_BOT_TOKEN"]) + + +def post_message(msg: str): + try: + slack_client.chat_postMessage(channel=os.environ["SLACK_CHANNEL"], text=msg) + except SlackApiError as err: + logger.warning(err.response["error"]) def _get_current_resources_and_stale_run_ids() -> ( @@ -66,6 +75,44 @@ def _read_clusters(run_id_key: str) -> list[ClusterInfo]: # pragma: no cover return [ClusterInfo(**cluster) for cluster in clusters] +def _delete_k8s_resource(resource_id: str, resource: Resource): + success = True + k8s_apps_v1_api = k8s_client.AppsV1Api() + k8s_core_v1_api = k8s_client.CoreV1Api() + k8s_batch_v1_api = k8s_client.BatchV1Api() + try: + if resource.type == ResourceTypes.K8S_DEPLOYMENT.value: + logger.info(f"Deleting k8s deployment `{resource.name}`") + k8s_apps_v1_api.delete_namespaced_deployment(name=resource.name, namespace="default") + elif resource.type == ResourceTypes.K8S_SECRET.value: + logger.info(f"Deleting k8s secret `{resource.name}`") + k8s_core_v1_api.delete_namespaced_secret(name=resource.name, namespace="default") + elif resource.type == ResourceTypes.K8S_CONFIGMAP.value: + logger.info(f"Deleting k8s configmap `{resource.name}`") + k8s_core_v1_api.delete_namespaced_config_map( + name=resource.name, + namespace="default", + ) + elif resource.type == ResourceTypes.K8S_JOB.value: + logger.info(f"Deleting k8s job `{resource.name}`") + k8s_batch_v1_api.delete_namespaced_job( + name=resource.name, + namespace="default", + propagation_policy="Foreground", + ) + except k8s_client.ApiException as exc: + if exc.status == 404: + success = True + logger.info(f"Resource does not exist: `{resource.name}`: {exc}") + deregister_resource(resource_id) + else: + success = False + msg = f"Failed to delete k8s resource `{resource.name}`: {exc}" + logger.warning(msg) + post_message(msg) + return success + + def _delete_k8s_resources(run_id: str, resources: dict[str, Resource]) -> bool: # pragma: no cover success = True logger.info(f"Deleting k8s resources from run {run_id}") @@ -79,47 +126,14 @@ def _delete_k8s_resources(run_id: str, resources: dict[str, Resource]) -> bool: if exc.code == 404: for resource_id in resources.keys(): deregister_resource(resource_id) + else: + msg = f"Error connecting to cluster {run_id}:{cluster}:{exc.code}:{exc.message}." + post_message(msg) continue k8s_client.Configuration.set_default(configuration) - - k8s_apps_v1_api = k8s_client.AppsV1Api() - k8s_core_v1_api = k8s_client.CoreV1Api() - k8s_batch_v1_api = k8s_client.BatchV1Api() for resource_id, resource in resources.items(): - try: - if resource.type == ResourceTypes.K8S_DEPLOYMENT.value: - logger.info(f"Deleting k8s deployment `{resource.name}`") - k8s_apps_v1_api.delete_namespaced_deployment( - name=resource.name, namespace="default" - ) - elif resource.type == ResourceTypes.K8S_SECRET.value: - logger.info(f"Deleting k8s secret `{resource.name}`") - k8s_core_v1_api.delete_namespaced_secret( - name=resource.name, namespace="default" - ) - elif resource.type == ResourceTypes.K8S_CONFIGMAP.value: - logger.info(f"Deleting k8s configmap `{resource.name}`") - k8s_core_v1_api.delete_namespaced_config_map( - name=resource.name, - namespace="default", - ) - elif resource.type == ResourceTypes.K8S_JOB.value: - logger.info(f"Deleting k8s job `{resource.name}`") - k8s_batch_v1_api.delete_namespaced_job( - name=resource.name, - namespace="default", - propagation_policy="Foreground", - ) - except K8sApiException as exc: - if exc.status == 404: - success = True - logger.info(f"Resource does not exist: `{resource.name}`: {exc}") - deregister_resource(resource_id) - else: - success = False - logger.warning(f"Failed to delete k8s resource `{resource.name}`: {exc}") - raise K8sApiException() from exc + _delete_k8s_resource(resource_id, resource) return success @@ -163,6 +177,10 @@ def cleanup_run(run_id: str, resources_raw: dict): if __name__ == "__main__": # pragma: no cover logger.setLevel(logging.INFO) _resources, stale_run_ids = _get_current_resources_and_stale_run_ids() + if len(stale_run_ids) > 0: + post_message(f"Cleaning up {len(stale_run_ids)} runs.") + else: + post_message("Nothing to do.") for _id in stale_run_ids: logger.info(f"Cleaning up run `{_id}`") cleanup_run(_id, _resources[_id])