Skip to content

Commit

Permalink
fix(job): better completion status message
Browse files Browse the repository at this point in the history
  • Loading branch information
akhileshh authored and supersergiy committed Dec 7, 2023
1 parent 9ac6286 commit b001a0c
Showing 1 changed file with 16 additions and 5 deletions.
21 changes: 16 additions & 5 deletions zetta_utils/cloud_management/resource_allocation/k8s/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,12 @@ def follow_job_logs(
job: k8s_client.V1Job,
cluster_info: ClusterInfo,
namespace: str = "default",
tail_lines: Optional[int] = None,
wait_until_start: Optional[bool] = True,
):
batch_v1_api = _reset_batch_api(cluster_info)
_wait_for_job_start(job, namespace, batch_v1_api)
if wait_until_start:
_wait_for_job_start(job, namespace, batch_v1_api)

core_api = k8s_client.CoreV1Api()
podlist = core_api.list_namespaced_pod(
Expand All @@ -129,6 +132,7 @@ def follow_job_logs(
core_api.read_namespaced_pod_log,
name=job_name,
namespace=namespace,
tail_lines=tail_lines,
)
for output in log_stream:
logger.info(output)
Expand All @@ -138,9 +142,11 @@ def get_job_pod(
job: k8s_client.V1Job,
cluster_info: ClusterInfo,
namespace: str = "default",
wait_until_start: Optional[bool] = True,
) -> k8s_client.V1Pod:
batch_v1_api = _reset_batch_api(cluster_info)
_wait_for_job_start(job, namespace, batch_v1_api)
if wait_until_start:
_wait_for_job_start(job, namespace, batch_v1_api)

core_api = k8s_client.CoreV1Api()
podlist = core_api.list_namespaced_pod(
Expand All @@ -162,9 +168,10 @@ def wait_for_job_completion(
namespace=namespace,
)
not_done = job.status.succeeded == 0 or job.status.succeeded is None
while not_done:
not_failed = True
while not_done and not_failed:
logger.info(f"Waiting for `{job.metadata.name}` to complete.")
time.sleep(5)
time.sleep(15)
try:
job = batch_v1_api.read_namespaced_job_status(
name=job.metadata.name,
Expand All @@ -176,9 +183,13 @@ def wait_for_job_completion(
name=job.metadata.name,
namespace=namespace,
)

not_done = job.status.succeeded == 0 or job.status.succeeded is None
not_failed = job.status.failed == 0 or job.status.failed is None

logger.info(f"`{job.metadata.name}` job completed.")
pod = get_job_pod(job, cluster_info, wait_until_start=False)
logger.info(f"job pod phase: {pod.status.phase}")
follow_job_logs(job, cluster_info, tail_lines=64, wait_until_start=False)


@contextmanager
Expand Down

0 comments on commit b001a0c

Please sign in to comment.