From 3004b14a7d60eb39dfcbdc51e15242b27edd70c3 Mon Sep 17 00:00:00 2001 From: Marco Donadoni Date: Thu, 16 Nov 2023 13:56:58 +0100 Subject: [PATCH] consumer: fix fetching of workflow engine logs when scheduling fails Closes #543 --- CHANGES.rst | 5 +++ reana_workflow_controller/consumer.py | 59 ++++++++++++--------------- 2 files changed, 31 insertions(+), 33 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index a27869be..1b4ae448 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,6 +1,11 @@ Changes ======= +Version 0.9.2 (UNRELEASED) +-------------------------- + +- Fixes job status consumer exception while attempting to fetch workflow engine logs for workflows could not have been successfully scheduled. + Version 0.9.1 (2023-09-27) -------------------------- diff --git a/reana_workflow_controller/consumer.py b/reana_workflow_controller/consumer.py index 798442c0..4f4e4d48 100644 --- a/reana_workflow_controller/consumer.py +++ b/reana_workflow_controller/consumer.py @@ -153,20 +153,20 @@ def _update_workflow_status(workflow, status, logs): try: workflow_engine_logs = _get_workflow_engine_pod_logs(workflow) workflow.logs += workflow_engine_logs + "\n" - except REANAWorkflowControllerError as exception: - logging.error( - f"Could not fetch workflow engine pod logs for workflow {workflow.id_}." - f" Error: {exception}" + except ApiException as e: + logging.exception( + f"Could not fetch workflow engine pod logs for workflow {workflow.id_}. " + f"Error: {e}" ) workflow.logs += "Workflow engine logs could not be retrieved.\n" if RunStatus.should_cleanup_job(status): try: _delete_workflow_job(workflow) - except REANAWorkflowControllerError as exception: + except ApiException as e: logging.error( - f"Could not clean up workflow job for workflow {workflow.id_}." - f" Error: {exception}" + f"Could not clean up workflow job for workflow {workflow.id_}. " + f"Error: {e}" ) @@ -284,32 +284,25 @@ def _update_job_cache(msg): def _delete_workflow_job(workflow: Workflow) -> None: job_name = build_unique_component_name("run-batch", workflow.id_) - try: - current_k8s_batchv1_api_client.delete_namespaced_job( - name=job_name, - namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, - propagation_policy="Background", - ) - except ApiException as e: - raise REANAWorkflowControllerError( - f"Workflow engine pod could not be deleted. Error: {e}" - ) + current_k8s_batchv1_api_client.delete_namespaced_job( + name=job_name, + namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, + propagation_policy="Background", + ) def _get_workflow_engine_pod_logs(workflow: Workflow) -> str: - try: - pods = current_k8s_corev1_api_client.list_namespaced_pod( - namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, - label_selector=f"reana-run-batch-workflow-uuid={str(workflow.id_)}", - ) - for pod in pods.items: - if str(workflow.id_) in pod.metadata.name: - return current_k8s_corev1_api_client.read_namespaced_pod_log( - namespace=pod.metadata.namespace, - name=pod.metadata.name, - container="workflow-engine", - ) - except ApiException as e: - raise REANAWorkflowControllerError( - f"Workflow engine pod logs could not be fetched. Error: {e}" - ) + pods = current_k8s_corev1_api_client.list_namespaced_pod( + namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, + label_selector=f"reana-run-batch-workflow-uuid={str(workflow.id_)}", + ) + for pod in pods.items: + if str(workflow.id_) in pod.metadata.name: + return current_k8s_corev1_api_client.read_namespaced_pod_log( + namespace=pod.metadata.namespace, + name=pod.metadata.name, + container="workflow-engine", + ) + # There might not be any pod returned by `list_namespaced_pod`, for example + # when a workflow fails to be scheduled + return ""