Skip to content

Commit

Permalink
Pod status check for k8s runner jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
almahmoud committed Jul 16, 2024
1 parent 156e05d commit 28d28dc
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
18 changes: 17 additions & 1 deletion lib/galaxy/jobs/runners/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
HTTPError,
Ingress,
ingress_object_dict,
is_pod_running,
is_pod_unschedulable,
Job,
job_object_dict,
Expand Down Expand Up @@ -802,10 +803,12 @@ def check_watched_item(self, job_state):
pass
else:
pass
else:
elif self.__check_job_pod_running(job_state):
log.debug("Job set to running...")
job_state.running = True
job_state.job_wrapper.change_state(model.Job.states.RUNNING)
else:
pass
return job_state
elif job_persisted_state == model.Job.states.DELETED:
# Job has been deleted via stop_job and job has not been deleted,
Expand Down Expand Up @@ -956,6 +959,19 @@ def __job_failed_due_to_low_memory(self, job_state):

return False


def __check_job_pod_running(self, job_state):
"""
checks the state of the pod to see if it is unschedulable.
"""
pods = find_pod_object_by_name(self._pykube_api, job_state.job_id, self.runner_params["k8s_namespace"])
if not pods.response["items"]:
return False

pod = Pod(self._pykube_api, pods.response["items"][0])
return is_pod_running(self._pykube_api, pod, self.runner_params["k8s_namespace"])


def __job_pending_due_to_unschedulable_pod(self, job_state):
"""
checks the state of the pod to see if it is unschedulable.
Expand Down
9 changes: 9 additions & 0 deletions lib/galaxy/jobs/runners/util/pykube_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ def find_pod_object_by_name(pykube_api, job_name, namespace=None):
return Pod.objects(pykube_api).filter(selector=f"job-name={job_name}", namespace=namespace)


def is_pod_running(pykube_api, pod, namespace=None):
is_running = not any(c.get("state", {}).get("running", {}).get("startedAt", False) == False for c in pod.obj["status"].get("containerStatuses", []))
if pod.obj["status"].get("phase") == "Running" and is_running:
return True

return False


def is_pod_unschedulable(pykube_api, pod, namespace=None):
is_unschedulable = any(c.get("reason") == "Unschedulable" for c in pod.obj["status"].get("conditions", []))
if pod.obj["status"].get("phase") == "Pending" and is_unschedulable:
Expand Down Expand Up @@ -311,6 +319,7 @@ def galaxy_instance_id(params):
"find_pod_object_by_name",
"galaxy_instance_id",
"HTTPError",
"is_pod_running"
"is_pod_unschedulable",
"Job",
"Service",
Expand Down

0 comments on commit 28d28dc

Please sign in to comment.