diff --git a/cdci_data_analysis/analysis/job_manager.py b/cdci_data_analysis/analysis/job_manager.py index 65dc5223..b2cc12af 100644 --- a/cdci_data_analysis/analysis/job_manager.py +++ b/cdci_data_analysis/analysis/job_manager.py @@ -250,6 +250,22 @@ def get_call_back_url(self): return url + def get_query_new_status(self): + if self.status == 'done': + query_new_status = 'done' + elif self.status == 'failed': + query_new_status = 'failed' + elif self.status == 'progress': + query_new_status = 'progress' + else: + job_monitor = self.updated_dataserver_monitor() + if job_monitor['status'] == 'progress': + query_new_status = 'progress' + else: + query_new_status = 'submitted' + self.set_submitted() + + return query_new_status class OsaJob(Job): def __init__(self, diff --git a/cdci_data_analysis/flask_app/dispatcher_query.py b/cdci_data_analysis/flask_app/dispatcher_query.py index aed09c7e..e086245f 100644 --- a/cdci_data_analysis/flask_app/dispatcher_query.py +++ b/cdci_data_analysis/flask_app/dispatcher_query.py @@ -2172,15 +2172,7 @@ def run_query(self, off_line=False, disp_conf=None): debug_message=e.debug_message) if query_out.status_dictionary['status'] == 0: - if job.status == 'done': - query_new_status = 'done' - elif job.status == 'failed': - query_new_status = 'failed' - elif job.status == 'progress': - query_new_status = 'progress' - else: - query_new_status = 'submitted' - job.set_submitted() + query_new_status = job.get_query_new_status() if email_helper.is_email_to_send_run_query(self.logger, query_new_status, @@ -2270,15 +2262,7 @@ def run_query(self, off_line=False, disp_conf=None): self.logger.info('-----------------> job status after query: %s', job.status) if query_out.status_dictionary['status'] == 0: - if job.status == 'done': - query_new_status = 'done' - elif job.status == 'failed': - query_new_status = 'failed' - elif job.status == 'progress': - query_new_status = 'progress' - else: - query_new_status = 'submitted' - job.set_submitted() + query_new_status = job.get_query_new_status() products_url = self.generate_products_url(self.app.config.get('conf').products_url, self.par_dic) email_api_code = DispatcherAPI.set_api_code(self.par_dic, diff --git a/requirements.txt b/requirements.txt index 18ab949d..e1a16194 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,8 @@ future -numpy +numpy<2.0.0 pyyaml simplejson -flask>=2.0.3 +flask==2.0.3 astropy>=5.0.1 pylogstash_context>=0.1.19 gunicorn diff --git a/setup.py b/setup.py index d8cc4e21..de2adfba 100644 --- a/setup.py +++ b/setup.py @@ -21,10 +21,10 @@ install_req = [ 'oda_api>=1.1.31', 'pylogstash_context>=0.1.19', - "numpy", + "numpy<2.0.0", "pyyaml", "simplejson", - "flask", + "flask==2.0.3", "astropy>=2.0.3", "gunicorn", "decorator", @@ -49,7 +49,7 @@ "giturlparse", "sentry-sdk", "validators==0.28.3", - "jsonschema" + "jsonschema<=4.17.3" ] test_req = [ diff --git a/tests/test_job_management.py b/tests/test_job_management.py index a086019e..4e1bf1d9 100644 --- a/tests/test_job_management.py +++ b/tests/test_job_management.py @@ -515,6 +515,74 @@ def test_resubmission_job_id(dispatcher_live_fixture_no_resubmit_timeout, status assert jdata['exit_status']['job_status'] == 'ready' +@pytest.mark.not_safe_parallel +def test_resubmission_after_callback(dispatcher_live_fixture_no_resubmit_timeout): + server = dispatcher_live_fixture_no_resubmit_timeout + DispatcherJobState.remove_scratch_folders() + DataServerQuery.set_status("submitted") + logger.info("constructed server: %s", server) + + # let's generate a valid token + token_payload = { + **default_token_payload, + } + encoded_token = jwt.encode(token_payload, secret_key, algorithm='HS256') + + # these parameters define request content + base_dict_param = dict( + instrument="empty-async", + product_type="dummy-log-submit", + query_type="Real", + ) + + dict_param = dict( + query_status="new", + token=encoded_token, + **base_dict_param + ) + + c = requests.get(os.path.join(server, "run_analysis"), + dict_param + ) + + assert c.status_code == 200 + jdata = c.json() + print(json.dumps(jdata, sort_keys=True, indent=4)) + dispatcher_job_state = DispatcherJobState.from_run_analysis_response(c.json()) + time_request = jdata['time_request'] + jdata = c.json() + assert jdata['exit_status']['job_status'] == "submitted" + assert DataServerQuery.get_status() == "submitted" + + c = requests.get(os.path.join(server, "call_back"), + params=dict( + job_id=dispatcher_job_state.job_id, + session_id=dispatcher_job_state.session_id, + instrument_name="empty-async", + action="progress", + node_id='node_progress', + message='progressing', + token=encoded_token, + time_original_request=time_request + )) + assert c.status_code == 200 + jdata = dispatcher_job_state.load_job_state_record('node_progress', 'progressing') + assert jdata['status'] == "progress" + assert jdata['full_report_dict']['action'] == "progress" + + # resubmit the job after the timeout expired + time.sleep(10.5) + dict_param['job_id'] = dispatcher_job_state.job_id + dict_param['query_status'] = "progress" + c = requests.get(os.path.join(server, "run_analysis"), + dict_param + ) + + assert c.status_code == 200 + jdata = c.json() + assert jdata['exit_status']['job_status'] == "progress" + assert jdata['query_status'] == "progress" + @pytest.mark.not_safe_parallel def test_failed_resubmission(dispatcher_live_fixture_no_resubmit_timeout): server = dispatcher_live_fixture_no_resubmit_timeout