Skip to content

Commit

Permalink
fix(k8s): wait until backoff_limit is reached for failed jobs (#580)
Browse files Browse the repository at this point in the history
  • Loading branch information
akhileshh authored Dec 9, 2023
1 parent 6cf3346 commit 3184081
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,13 @@ def get_mazepa_worker_command(
def _get_provider_cluster_data(info: ClusterInfo) -> Tuple[ClusterAuth, str]:
if info.project is not None:
assert info.region is not None, "GKE cluster needs both `project` and `region`."
logger.info("Cluster provider: GKE/GCP.")
logger.debug("Cluster provider: GKE/GCP.")

cluster_data, cert, token = gke_cluster_data(info.name, info.region, info.project)
endpoint = cluster_data.endpoint
workload_pool = cluster_data.workload_identity_config.workload_pool
else:
logger.info("Cluster provider: EKS/AWS.")
logger.debug("Cluster provider: EKS/AWS.")

cluster_data, cert, token = eks_cluster_data(info.name)
endpoint = cluster_data["endpoint"]
Expand Down
24 changes: 17 additions & 7 deletions zetta_utils/cloud_management/resource_allocation/k8s/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def _wait_for_job_start(
logger.info(f"Waiting for `{job.metadata.name}` to start.")
if job.status.ready == 1:
break
time.sleep(5)
time.sleep(15)
logger.info(f"`{job.metadata.name}` job started.")


Expand Down Expand Up @@ -168,14 +168,12 @@ def wait_for_job_completion(
):
batch_v1_api = _reset_batch_api(cluster_info)
_wait_for_job_start(job, namespace, batch_v1_api)

job = batch_v1_api.read_namespaced_job_status(
name=job.metadata.name,
namespace=namespace,
)
not_done = job.status.succeeded == 0 or job.status.succeeded is None
not_failed = True
while not_done and not_failed:

while True:
logger.info(f"Waiting for `{job.metadata.name}` to complete.")
time.sleep(15)
try:
Expand All @@ -189,8 +187,20 @@ def wait_for_job_completion(
name=job.metadata.name,
namespace=namespace,
)
not_done = job.status.succeeded == 0 or job.status.succeeded is None
not_failed = job.status.failed == 0 or job.status.failed is None

if job.status.succeeded == 1:
break

if job.status.failed:
if job.status.failed > job.spec.backoff_limit:
logger.warning(f"`{job.metadata.name}` backoff_limit reached.")
break
pod = get_job_pod(job, cluster_info, wait_until_start=False)
follow_job_logs(job, cluster_info, tail_lines=64, wait_until_start=False)
logger.warning(
f"Retrying job `{job.metadata.name}`: {job.status.failed}/{job.spec.backoff_limit}"
)
_wait_for_job_start(job, namespace, batch_v1_api)

logger.info(f"`{job.metadata.name}` job completed.")
pod = get_job_pod(job, cluster_info, wait_until_start=False)
Expand Down

0 comments on commit 3184081

Please sign in to comment.