Skip to content

Commit

Permalink
feat(gc): slack integration
Browse files Browse the repository at this point in the history
  • Loading branch information
akhileshh authored and supersergiy committed Sep 5, 2024
1 parent 0b316a9 commit 4fc8be6
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 38 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies = [
"packaging >= 23.2",
"pdbp >= 1.5.3",
"boto3 == 1.28.4",
"slack_sdk >= 3.31.0",
]
description = "Zetta AI Connectomics Toolkit"
keywords = ["neuroscience connectomics EM"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def configure_cronjob(
resource_requests=resource_requests,
)
if patch:
name = f"run-{name}"
batch_v1_api.patch_namespaced_cron_job(name=name, namespace=namespace, body=cronjob)
else:
batch_v1_api.create_namespaced_cron_job(namespace=namespace, body=cronjob)
94 changes: 56 additions & 38 deletions zetta_utils/run/gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
import taskqueue
from boto3.exceptions import Boto3Error
from google.api_core.exceptions import GoogleAPICallError
from kubernetes.client.exceptions import ApiException as K8sApiException
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError

from kubernetes import client as k8s_client # type: ignore
from zetta_utils.cloud_management.resource_allocation.k8s import (
Expand All @@ -36,6 +37,14 @@
)

logger = get_logger("zetta_utils")
slack_client = WebClient(token=os.environ["SLACK_BOT_TOKEN"])


def post_message(msg: str):
try:
slack_client.chat_postMessage(channel=os.environ["SLACK_CHANNEL"], text=msg)
except SlackApiError as err:
logger.warning(err.response["error"])


def _get_current_resources_and_stale_run_ids() -> (
Expand Down Expand Up @@ -66,6 +75,44 @@ def _read_clusters(run_id_key: str) -> list[ClusterInfo]: # pragma: no cover
return [ClusterInfo(**cluster) for cluster in clusters]


def _delete_k8s_resource(resource_id: str, resource: Resource):
success = True
k8s_apps_v1_api = k8s_client.AppsV1Api()
k8s_core_v1_api = k8s_client.CoreV1Api()
k8s_batch_v1_api = k8s_client.BatchV1Api()
try:
if resource.type == ResourceTypes.K8S_DEPLOYMENT.value:
logger.info(f"Deleting k8s deployment `{resource.name}`")
k8s_apps_v1_api.delete_namespaced_deployment(name=resource.name, namespace="default")
elif resource.type == ResourceTypes.K8S_SECRET.value:
logger.info(f"Deleting k8s secret `{resource.name}`")
k8s_core_v1_api.delete_namespaced_secret(name=resource.name, namespace="default")
elif resource.type == ResourceTypes.K8S_CONFIGMAP.value:
logger.info(f"Deleting k8s configmap `{resource.name}`")
k8s_core_v1_api.delete_namespaced_config_map(
name=resource.name,
namespace="default",
)
elif resource.type == ResourceTypes.K8S_JOB.value:
logger.info(f"Deleting k8s job `{resource.name}`")
k8s_batch_v1_api.delete_namespaced_job(
name=resource.name,
namespace="default",
propagation_policy="Foreground",
)
except k8s_client.ApiException as exc:
if exc.status == 404:
success = True
logger.info(f"Resource does not exist: `{resource.name}`: {exc}")
deregister_resource(resource_id)
else:
success = False
msg = f"Failed to delete k8s resource `{resource.name}`: {exc}"
logger.warning(msg)
post_message(msg)
return success


def _delete_k8s_resources(run_id: str, resources: dict[str, Resource]) -> bool: # pragma: no cover
success = True
logger.info(f"Deleting k8s resources from run {run_id}")
Expand All @@ -79,47 +126,14 @@ def _delete_k8s_resources(run_id: str, resources: dict[str, Resource]) -> bool:
if exc.code == 404:
for resource_id in resources.keys():
deregister_resource(resource_id)
else:
msg = f"Error connecting to cluster {run_id}:{cluster}:{exc.code}:{exc.message}."
post_message(msg)
continue

k8s_client.Configuration.set_default(configuration)

k8s_apps_v1_api = k8s_client.AppsV1Api()
k8s_core_v1_api = k8s_client.CoreV1Api()
k8s_batch_v1_api = k8s_client.BatchV1Api()
for resource_id, resource in resources.items():
try:
if resource.type == ResourceTypes.K8S_DEPLOYMENT.value:
logger.info(f"Deleting k8s deployment `{resource.name}`")
k8s_apps_v1_api.delete_namespaced_deployment(
name=resource.name, namespace="default"
)
elif resource.type == ResourceTypes.K8S_SECRET.value:
logger.info(f"Deleting k8s secret `{resource.name}`")
k8s_core_v1_api.delete_namespaced_secret(
name=resource.name, namespace="default"
)
elif resource.type == ResourceTypes.K8S_CONFIGMAP.value:
logger.info(f"Deleting k8s configmap `{resource.name}`")
k8s_core_v1_api.delete_namespaced_config_map(
name=resource.name,
namespace="default",
)
elif resource.type == ResourceTypes.K8S_JOB.value:
logger.info(f"Deleting k8s job `{resource.name}`")
k8s_batch_v1_api.delete_namespaced_job(
name=resource.name,
namespace="default",
propagation_policy="Foreground",
)
except K8sApiException as exc:
if exc.status == 404:
success = True
logger.info(f"Resource does not exist: `{resource.name}`: {exc}")
deregister_resource(resource_id)
else:
success = False
logger.warning(f"Failed to delete k8s resource `{resource.name}`: {exc}")
raise K8sApiException() from exc
_delete_k8s_resource(resource_id, resource)
return success


Expand Down Expand Up @@ -163,6 +177,10 @@ def cleanup_run(run_id: str, resources_raw: dict):
if __name__ == "__main__": # pragma: no cover
logger.setLevel(logging.INFO)
_resources, stale_run_ids = _get_current_resources_and_stale_run_ids()
if len(stale_run_ids) > 0:
post_message(f"Cleaning up {len(stale_run_ids)} runs.")
else:
post_message("Nothing to do.")
for _id in stale_run_ids:
logger.info(f"Cleaning up run `{_id}`")
cleanup_run(_id, _resources[_id])

0 comments on commit 4fc8be6

Please sign in to comment.