diff --git a/zetta_utils/cloud_management/resource_allocation/k8s/common.py b/zetta_utils/cloud_management/resource_allocation/k8s/common.py index a82c92018..5d501d107 100644 --- a/zetta_utils/cloud_management/resource_allocation/k8s/common.py +++ b/zetta_utils/cloud_management/resource_allocation/k8s/common.py @@ -94,13 +94,13 @@ def get_mazepa_worker_command( def _get_provider_cluster_data(info: ClusterInfo) -> Tuple[ClusterAuth, str]: if info.project is not None: assert info.region is not None, "GKE cluster needs both `project` and `region`." - logger.info("Cluster provider: GKE/GCP.") + logger.debug("Cluster provider: GKE/GCP.") cluster_data, cert, token = gke_cluster_data(info.name, info.region, info.project) endpoint = cluster_data.endpoint workload_pool = cluster_data.workload_identity_config.workload_pool else: - logger.info("Cluster provider: EKS/AWS.") + logger.debug("Cluster provider: EKS/AWS.") cluster_data, cert, token = eks_cluster_data(info.name) endpoint = cluster_data["endpoint"] diff --git a/zetta_utils/cloud_management/resource_allocation/k8s/job.py b/zetta_utils/cloud_management/resource_allocation/k8s/job.py index 689caccd1..f8ec2640b 100644 --- a/zetta_utils/cloud_management/resource_allocation/k8s/job.py +++ b/zetta_utils/cloud_management/resource_allocation/k8s/job.py @@ -108,7 +108,7 @@ def _wait_for_job_start( logger.info(f"Waiting for `{job.metadata.name}` to start.") if job.status.ready == 1: break - time.sleep(5) + time.sleep(15) logger.info(f"`{job.metadata.name}` job started.") @@ -168,14 +168,12 @@ def wait_for_job_completion( ): batch_v1_api = _reset_batch_api(cluster_info) _wait_for_job_start(job, namespace, batch_v1_api) - job = batch_v1_api.read_namespaced_job_status( name=job.metadata.name, namespace=namespace, ) - not_done = job.status.succeeded == 0 or job.status.succeeded is None - not_failed = True - while not_done and not_failed: + + while True: logger.info(f"Waiting for `{job.metadata.name}` to complete.") time.sleep(15) try: @@ -189,8 +187,20 @@ def wait_for_job_completion( name=job.metadata.name, namespace=namespace, ) - not_done = job.status.succeeded == 0 or job.status.succeeded is None - not_failed = job.status.failed == 0 or job.status.failed is None + + if job.status.succeeded == 1: + break + + if job.status.failed: + if job.status.failed > job.spec.backoff_limit: + logger.warning(f"`{job.metadata.name}` backoff_limit reached.") + break + pod = get_job_pod(job, cluster_info, wait_until_start=False) + follow_job_logs(job, cluster_info, tail_lines=64, wait_until_start=False) + logger.warning( + f"Retrying job `{job.metadata.name}`: {job.status.failed}/{job.spec.backoff_limit}" + ) + _wait_for_job_start(job, namespace, batch_v1_api) logger.info(f"`{job.metadata.name}` job completed.") pod = get_job_pod(job, cluster_info, wait_until_start=False)