From 9ada9686414f5c3cc27cbf393abdd7994f5636b1 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Fri, 12 Jan 2024 19:23:07 +0100 Subject: [PATCH 1/9] add sql for oracle update --- main/etc/sql/oracle_update.sql | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/main/etc/sql/oracle_update.sql b/main/etc/sql/oracle_update.sql index 0a0eeda3..e3dc6cf0 100644 --- a/main/etc/sql/oracle_update.sql +++ b/main/etc/sql/oracle_update.sql @@ -451,3 +451,20 @@ alter table contents drop constraint CONTENT_ID_UQ; alter table contents add constraint CONTENT_ID_UQ UNIQUE (transform_id, coll_id, map_id, sub_map_id, dep_sub_map_id, content_relation_type, name_md5, scope_name_md5, min_id, max_id) USING INDEX LOCAL; drop index CONTENTS_ID_NAME_IDX; CREATE INDEX CONTENTS_ID_NAME_IDX ON CONTENTS (coll_id, scope, standard_hash(name, 'MD5'), status); + + +--- 20240111 +CREATE SEQUENCE METAINFO_ID_SEQ MINVALUE 1 INCREMENT BY 1 START WITH 1 NOCACHE ORDER NOCYCLE GLOBAL; +CREATE TABLE meta_info +( + meta_id NUMBER(12) DEFAULT ON NULL METAINFO_ID_SEQ.NEXTVAL constraint METAINFO_ID_NN NOT NULL, + name VARCHAR2(50), + status NUMBER(2), + created_at DATE DEFAULT SYS_EXTRACT_UTC(systimestamp(0)), + updated_at DATE DEFAULT SYS_EXTRACT_UTC(systimestamp(0)), + description VARCHAR2(1000), + metadata CLOB, + CONSTRAINT METAINFO_PK PRIMARY KEY (meta_id), -- USING INDEX LOCAL, + CONSTRAINT METAINFO_NAME_UQ UNIQUE (name) +); + From 3330cdcd131d2bc4712295b0e89d555f3a18e5b6 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Fri, 12 Jan 2024 19:26:56 +0100 Subject: [PATCH 2/9] reorganize doma tree functions --- doma/lib/idds/doma/workflowv2/domatree.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/doma/lib/idds/doma/workflowv2/domatree.py b/doma/lib/idds/doma/workflowv2/domatree.py index 7cd028b4..a2af1599 100644 --- a/doma/lib/idds/doma/workflowv2/domatree.py +++ b/doma/lib/idds/doma/workflowv2/domatree.py @@ -134,7 +134,7 @@ def order_job_tree(self, label_level_dict): job_nodes = label_node.jobs self.order_job_nodes(job_nodes) - def save_order_id_map(self, label_level_dict, order_id_map_file): + def generate_order_id_map(self, label_level_dict): order_id_map = {} for level in label_level_dict: for node in label_level_dict[level]: @@ -146,6 +146,9 @@ def save_order_id_map(self, label_level_dict, order_id_map_file): gwjob = job_node.gwjob order_id = gwjob.attrs.get("order_id", 0) order_id_map[label_name][str(order_id)] = gwjob.name + return order_id_map + + def save_order_id_map(self, order_id_map, order_id_map_file): with open(order_id_map_file, 'w') as f: json.dump(order_id_map, f) @@ -163,4 +166,6 @@ def order_jobs_from_generic_workflow(self, generic_workflow, order_id_map_file): label_level_dict = self.get_ordered_nodes_by_level(label_tree_roots) self.order_job_tree(label_level_dict) - self.save_order_id_map(label_level_dict, order_id_map_file) + # self.save_order_id_map(label_level_dict, order_id_map_file) + order_id_map = self.generate_order_id_map(label_level_dict) + return order_id_map From 28f2ebda17ba5cdb73a44dfd228f0e0eab5a96ff Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Sun, 14 Jan 2024 18:38:51 +0100 Subject: [PATCH 3/9] optim min_request_id setting --- main/lib/idds/agents/clerk/clerk.py | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/main/lib/idds/agents/clerk/clerk.py b/main/lib/idds/agents/clerk/clerk.py index 83578eda..a77dc908 100644 --- a/main/lib/idds/agents/clerk/clerk.py +++ b/main/lib/idds/agents/clerk/clerk.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2023 +# - Wen Guan, , 2019 - 2024 import datetime import random @@ -157,13 +157,19 @@ def get_new_requests(self): if time.time() < self.start_at + 3600: if BaseAgent.poll_new_min_request_id_times % 30 == 0: # get_new_requests is called every 10 seconds. 30 * 10 = 300 seconds, which is 5 minutes. - min_request_id = BaseAgent.min_request_id - 1000 + if BaseAgent.min_request_id: + min_request_id = BaseAgent.min_request_id - 1000 + else: + min_request_id = None else: min_request_id = BaseAgent.min_request_id else: if BaseAgent.poll_new_min_request_id_times % 180 == 0: # get_new_requests is called every 10 seconds. 180 * 10 = 300 seconds, which is 30 minutes. - min_request_id = BaseAgent.min_request_id - 1000 + if BaseAgent.min_request_id: + min_request_id = BaseAgent.min_request_id - 1000 + else: + min_request_id = None else: min_request_id = BaseAgent.min_request_id @@ -184,6 +190,7 @@ def get_new_requests(self): BaseAgent.min_request_id_cache[req_id] = time.time() if BaseAgent.min_request_id is None or BaseAgent.min_request_id > req_id: BaseAgent.min_request_id = req_id + self.logger.info("new min_request_id: %s" % BaseAgent.min_request_id) core_requests.set_min_request_id(BaseAgent.min_request_id) event = NewRequestEvent(publisher_id=self.id, request_id=req_id) @@ -213,13 +220,19 @@ def get_running_requests(self): if time.time() < self.start_at + 3600: if BaseAgent.poll_running_min_request_id_times % 30 == 0: # get_new_requests is called every 10 seconds. 30 * 10 = 300 seconds, which is 5 minutes. - min_request_id = BaseAgent.min_request_id - 1000 + if BaseAgent.min_request_id: + min_request_id = BaseAgent.min_request_id - 1000 + else: + min_request_id = None else: min_request_id = BaseAgent.min_request_id else: if BaseAgent.poll_running_min_request_id_times % 180 == 0: # get_new_requests is called every 10 seconds. 180 * 10 = 1800 seconds, which is 30 minutes. - min_request_id = BaseAgent.min_request_id - 1000 + if BaseAgent.min_request_id: + min_request_id = BaseAgent.min_request_id - 1000 + else: + min_request_id = None else: min_request_id = BaseAgent.min_request_id @@ -245,6 +258,7 @@ def get_running_requests(self): BaseAgent.min_request_id_cache[req_id] = time.time() if BaseAgent.min_request_id is None or BaseAgent.min_request_id > req_id: BaseAgent.min_request_id = req_id + self.logger.info("new min_request_id: %s" % BaseAgent.min_request_id) core_requests.set_min_request_id(BaseAgent.min_request_id) event = UpdateRequestEvent(publisher_id=self.id, request_id=req_id) @@ -295,6 +309,7 @@ def get_operation_requests(self): if BaseAgent.min_request_id is None or BaseAgent.min_request_id > request_id: BaseAgent.min_request_id = request_id + self.logger.info("new min_request_id: %s" % BaseAgent.min_request_id) BaseAgent.min_request_id_cache[request_id] = time.time() core_requests.set_min_request_id(BaseAgent.min_request_id) From 91703ac42992eb3be8e069a5d599fc6a08850948 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 15 Jan 2024 20:29:57 +0100 Subject: [PATCH 4/9] fix ordering events --- doma/lib/idds/doma/workflowv2/domatree.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/doma/lib/idds/doma/workflowv2/domatree.py b/doma/lib/idds/doma/workflowv2/domatree.py index a2af1599..49954e12 100644 --- a/doma/lib/idds/doma/workflowv2/domatree.py +++ b/doma/lib/idds/doma/workflowv2/domatree.py @@ -113,11 +113,14 @@ def order_job_nodes(self, job_nodes): for job_node in job_nodes: potential_order_id = job_node.get_potential_order_id() if potential_order_id not in job_nodes_order_id: - job_nodes_order_id[potential_order_id] = job_node + job_nodes_order_id[potential_order_id] = [] + job_nodes_order_id[potential_order_id].append(job_node) potential_order_ids = sorted(list(job_nodes_order_id.keys())) ordered_job_nodes = [] for potential_order_id in potential_order_ids: - ordered_job_nodes.append(job_nodes_order_id[potential_order_id]) + p_job_nodes = job_nodes_order_id[potential_order_id] + for p_job_node in p_job_nodes: + ordered_job_nodes.append(p_job_node) order_id = 0 for job_node in ordered_job_nodes: job_node.order_id = order_id From d6db9332f8ef28f4b0e3a5d714042b66a6702277 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 15 Jan 2024 20:30:57 +0100 Subject: [PATCH 5/9] fix polling event status --- .../lib/idds/doma/workflowv2/domapandawork.py | 130 +++++++++++++++--- 1 file changed, 108 insertions(+), 22 deletions(-) diff --git a/doma/lib/idds/doma/workflowv2/domapandawork.py b/doma/lib/idds/doma/workflowv2/domapandawork.py index b337ca4b..70746b95 100644 --- a/doma/lib/idds/doma/workflowv2/domapandawork.py +++ b/doma/lib/idds/doma/workflowv2/domapandawork.py @@ -983,7 +983,13 @@ def get_content_status_from_panda_status(self, job_info): else: return ContentStatus.SubAvailable else: - return ContentStatus.SubAvailable + attempt_nr = int(job_info.attemptNr) if job_info.attemptNr else 0 + max_attempt = int(job_info.maxAttempt) if job_info.maxAttempt else 0 + self_maxAttempt = int(self.maxAttempt) if self.maxAttempt else 0 + if (attempt_nr >= max_attempt) and (attempt_nr >= self_maxAttempt): + return ContentStatus.FinalSubAvailable + else: + return ContentStatus.SubAvailable elif jobstatus in ['failed', 'closed', 'cancelled', 'lost', 'broken', 'missing']: attempt_nr = int(job_info.attemptNr) if job_info.attemptNr else 0 max_attempt = int(job_info.maxAttempt) if job_info.maxAttempt else 0 @@ -1131,7 +1137,7 @@ def get_panda_event_status(self, jobids, log_prefix=''): self.logger.debug(log_prefix + "get_panda_event_status, jobids[:3]: %s" % str(jobids[:3])) try: from pandaclient import Client - ret = Client.get_event_status(jobids, verbose=True) + ret = Client.get_events_status(jobids, verbose=True) if ret[0] == 0: job_events_status = ret[1] return job_events_status @@ -1192,6 +1198,7 @@ def poll_panda_jobs(self, job_ids, log_prefix=''): job_status_info[filename]['panda_id'] = panda_ids else: es_job_ids = [] + self.logger.debug("job_status_info: %s" % (job_status_info)) for filename in job_status_info: job_set_id = job_status_info[filename]['job_set_id'] jobs = job_status_info[filename]['jobs'] @@ -1200,34 +1207,69 @@ def poll_panda_jobs(self, job_ids, log_prefix=''): job_status_info[filename]['status'] = status job_status_info[filename]['job_info'] = job_info job_status_info[filename]['panda_id'] = panda_ids - if status in [ContentStatus.FinalSubAvailable]: + if status in [ContentStatus.FinalSubAvailable, ContentStatus.FinalFailed]: task_id = job_info.jediTaskID + es_job_id = {'task_id': task_id, 'panda_id': job_set_id} + es_job_ids.append(es_job_id) for panda_id in panda_ids: es_job_id = {'task_id': task_id, 'panda_id': panda_id} es_job_ids.append(es_job_id) job_events_status = self.poll_panda_events(es_job_ids) + self.logger.debug("poll_panda_events, es_job_ids: %s, job_events_status: %s" % (str(es_job_ids), job_events_status)) for filename in job_status_info: jobs = job_status_info[filename]['jobs'] for job in jobs: panda_id = job['panda_id'] - events = job_events_status.get(panda_id, {}) + events = job_events_status.get(str(panda_id), {}) job['events'] = events + job_set_id = job_status_info[filename]['job_set_id'] + job_status_info[filename]['job_set_events'] = job_events_status.get(str(job_set_id), {}) + self.logger.debug("job_status_info: %s" % (job_status_info)) return job_status_info - def get_event_job(self, sub_map_id, panda_jobs): - ret_event, ret_job = None, None + def get_event_job(self, sub_map_id, panda_jobs, job_set_events): + ret_event, ret_job = {}, None + sub_map_id_jobs = {} for panda_job in panda_jobs: events = panda_job.get('events', {}) for event_id in events: event_index = int(event_id.split('-')[3]) - 1 if event_index == sub_map_id: event_status = events[event_id] - ret_event['status'] = event_status + if event_status not in sub_map_id_jobs: + sub_map_id_jobs[event_status] = [] # todo: get the event error code and error diag - ret_event['error_code'] = None - ret_event['error_diag'] = None - ret_job = panda_job - break + item = {'status': event_status, 'error_code': None, 'error_diag': None, 'job': panda_job} + sub_map_id_jobs[event_status].append(item) + + if not ret_event: + for event_id in job_set_events: + event_index = int(event_id.split('-')[3]) - 1 + if event_index == sub_map_id: + event_status = job_set_events[event_id] + if event_status not in sub_map_id_jobs: + sub_map_id_jobs[event_status] = [] + # todo: get the event error code and error diag + item = {'status': event_status, 'error_code': None, 'error_diag': None} + sub_map_id_jobs[event_status].append(item) + + final_event_status = None + for event_status in sub_map_id_jobs: + if event_status in ['finished', 'done', 'merged']: + final_event_status = event_status + break + elif event_status in ['failed', 'fatal', 'cancelled', 'discarded', 'corrupted']: + final_event_status = event_status + else: + if final_event_status is None: + final_event_status = event_status + if final_event_status: + item = sub_map_id_jobs[final_event_status][0] + ret_event['status'] = item['status'] + # todo: get the event error code and error diag + ret_event['error_code'] = item['error_code'] + ret_event['error_diag'] = item['error_diag'] + ret_job = item.get('job', None) return ret_event, ret_job def get_update_contents(self, unterminated_jobs_status, input_output_maps, contents_ext, job_info_maps, abort=False, log_prefix=''): @@ -1277,11 +1319,12 @@ def get_update_contents(self, unterminated_jobs_status, input_output_maps, conte output_contents = map_id_output['outputs'] for content in output_contents: + content['status'] = panda_status content['substatus'] = panda_status update_contents_full.append(content) update_content = {'content_id': content['content_id'], 'request_id': content['request_id'], - # 'status': panda_status, + 'status': panda_status, 'substatus': panda_status} if 'panda_id' in content['content_metadata'] and content['content_metadata']['panda_id']: @@ -1344,6 +1387,7 @@ def get_update_contents(self, unterminated_jobs_status, input_output_maps, conte for input_file in unterminated_jobs_status: # job_set_id = unterminated_jobs_status[input_file]['job_set_id'] panda_jobs = unterminated_jobs_status[input_file]['jobs'] + job_set_events = unterminated_jobs_status[input_file]['job_set_events'] if 'status' not in unterminated_jobs_status[input_file]: continue @@ -1363,11 +1407,12 @@ def get_update_contents(self, unterminated_jobs_status, input_output_maps, conte output_contents = map_id_output['outputs'] for content in output_contents: + content['status'] = panda_status content['substatus'] = panda_status update_contents_full.append(content) update_content = {'content_id': content['content_id'], 'request_id': content['request_id'], - # 'status': panda_status, + 'status': panda_status, 'substatus': panda_status} if 'panda_id' in content['content_metadata'] and content['content_metadata']['panda_id']: @@ -1433,20 +1478,58 @@ def get_update_contents(self, unterminated_jobs_status, input_output_maps, conte for content in output_contents: sub_map_id = content['sub_map_id'] + self.logger.debug("wen") # min_id = content['min_id'] # min_id should be the same as sub_map_id here - event, event_panda_job = self.get_event_job(sub_map_id, panda_jobs) - event_status = event['status'] - event_error_code = event['error_code'] - event_error_diag = event['error_diag'] + event, event_panda_job = self.get_event_job(sub_map_id, panda_jobs, job_set_events) + self.logger.debug("sub_map_id: %s, panda_jobs: %s, job_set_events: %s, event: %s, event_panda_job: %s" % (sub_map_id, panda_jobs, job_set_events, event, event_panda_job)) + if event: + event_status = event['status'] + # 'ready', 'sent', 'running', 'finished', 'cancelled', 'discarded', 'done', 'failed', + # 'fatal', 'merged', 'corrupted', 'reserved_fail', 'reserved_get' + if event_status in ['finished', 'done', 'merged']: + event_status = ContentStatus.Available + event_error_code = event['error_code'] + event_error_diag = event['error_diag'] + if event_panda_job: + panda_id = event_panda_job['panda_id'] + job_info = event_panda_job['job_info'] + else: + panda_ids = unterminated_jobs_status[input_file]['panda_id'] + panda_id = ",".join([str(i) for i in panda_ids]) + job_info = unterminated_jobs_status[input_file]['job_info'] + elif event_status in ['failed', 'fatal', 'cancelled', 'discarded', 'corrupted']: + event_status = ContentStatus.FinalFailed + event_error_code = event['error_code'] + event_error_diag = event['error_diag'] + if event_panda_job: + panda_id = event_panda_job['panda_id'] + job_info = event_panda_job['job_info'] + else: + panda_ids = unterminated_jobs_status[input_file]['panda_id'] + panda_id = ",".join([str(i) for i in panda_ids]) + job_info = unterminated_jobs_status[input_file]['job_info'] + else: + event_status = panda_status + event_error_code = None + event_error_diag = None + panda_ids = unterminated_jobs_status[input_file]['panda_id'] + panda_id = ",".join([str(i) for i in panda_ids]) + job_info = unterminated_jobs_status[input_file]['job_info'] + else: + event_status = panda_status + event_error_code = None + event_error_diag = None - panda_id = event_panda_job['panda_id'] - job_info = event_panda_job['job_info'] + panda_ids = unterminated_jobs_status[input_file]['panda_id'] + panda_id = ",".join([str(i) for i in panda_ids]) + job_info = unterminated_jobs_status[input_file]['job_info'] + content['status'] = event_status content['substatus'] = event_status update_contents_full.append(content) update_content = {'content_id': content['content_id'], 'request_id': content['request_id'], - # 'status': panda_status, + 'status': panda_status, 'substatus': event_status} if 'panda_id' in content['content_metadata'] and content['content_metadata']['panda_id']: @@ -1482,7 +1565,7 @@ def get_update_contents(self, unterminated_jobs_status, input_output_maps, conte 'workload_id': content['workload_id'], 'coll_id': content['coll_id'], 'map_id': content['map_id'], - 'status': panda_status} + 'status': event_status} for job_info_item in job_info_maps: new_content_ext[job_info_item] = getattr(job_info, job_info_maps[job_info_item]) if new_content_ext[job_info_item] == 'NULL': @@ -1499,7 +1582,7 @@ def get_update_contents(self, unterminated_jobs_status, input_output_maps, conte else: update_content_ext = {'content_id': content['content_id'], 'request_id': content['request_id'], - 'status': panda_status} + 'status': event_status} for job_info_item in job_info_maps: update_content_ext[job_info_item] = getattr(job_info, job_info_maps[job_info_item]) if update_content_ext[job_info_item] == 'NULL': @@ -1522,6 +1605,7 @@ def get_update_contents(self, unterminated_jobs_status, input_output_maps, conte if content['content_id'] not in update_contents_dict: update_content = {'content_id': content['content_id'], 'request_id': content['request_id'], + 'status': ContentStatus.Missing, 'substatus': ContentStatus.Missing} update_contents.append(update_content) if content['content_id'] not in contents_ext_dict and content['content_id'] not in new_contents_ext_dict: @@ -1572,6 +1656,8 @@ def poll_panda_task(self, processing=None, input_output_maps=None, contents_ext= self.logger.debug(log_prefix + "poll_panda_task, task_id: %s, all jobs: %s, unterminated_jobs: %s" % (str(task_id), len(all_jobs_ids), len(unterminated_jobs))) unterminated_jobs_status = self.poll_panda_jobs(unterminated_jobs, log_prefix=log_prefix) + self.logger.debug("unterminated_jobs_status: %s" % str(unterminated_jobs_status)) + abort_status = False if processing_status in [ProcessingStatus.Cancelled]: abort_status = True From d95be191e10b9fd9db18de16a5487b0bdf792e69 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 15 Jan 2024 20:31:54 +0100 Subject: [PATCH 6/9] reduce db operations --- main/lib/idds/agents/carrier/utils.py | 3 +++ main/lib/idds/agents/common/baseagent.py | 7 ++++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/main/lib/idds/agents/carrier/utils.py b/main/lib/idds/agents/carrier/utils.py index c89db59b..8c56e461 100644 --- a/main/lib/idds/agents/carrier/utils.py +++ b/main/lib/idds/agents/carrier/utils.py @@ -731,6 +731,7 @@ def get_updated_contents_by_input_output_maps(input_output_maps=None, terminated content['substatus'] = input_content_update_status updated_contents_full_input.append(content) u_content_substatus = {'content_id': content['content_id'], + 'status': content['substatus'], 'substatus': content['substatus'], 'request_id': content['request_id'], 'transform_id': content['transform_id'], @@ -756,6 +757,7 @@ def get_updated_contents_by_input_output_maps(input_output_maps=None, terminated content['substatus'] = output_content_update_status updated_contents_full_output.append(content) u_content_substatus = {'content_id': content['content_id'], + 'status': content['substatus'], 'substatus': content['substatus'], 'request_id': content['request_id'], 'transform_id': content['transform_id'], @@ -1311,6 +1313,7 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= if not work.es or con['substatus'] in [ContentStatus.Available]: con_dict = {'content_id': con['content_id'], 'request_id': con['request_id'], + 'status': con['substatus'], 'substatus': con['substatus']} if 'content_metadata' in con and con['content_metadata']: con_dict['content_metadata'] = con['content_metadata'] diff --git a/main/lib/idds/agents/common/baseagent.py b/main/lib/idds/agents/common/baseagent.py index 7b017514..54275972 100644 --- a/main/lib/idds/agents/common/baseagent.py +++ b/main/lib/idds/agents/common/baseagent.py @@ -189,9 +189,10 @@ def execute_event_schedule(self): num_free_workers = self.executors.get_num_free_workers() if num_free_workers > 0: events = self.event_bus.get(event_type, num_free_workers) - for event in events: - future = self.executors.submit(exec_func, event) - self.event_futures[event._id] = (event, future, time.time()) + if events: + for event in events: + future = self.executors.submit(exec_func, event) + self.event_futures[event._id] = (event, future, time.time()) event_funcs[event_type]["to_exec_at"] = time.time() + self.event_interval_delay def execute_schedules(self): From 94bcf379b5d8e4a5a5892db2d0061b4135658358 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 15 Jan 2024 20:33:32 +0100 Subject: [PATCH 7/9] add meta_info table and alembic upgrade func --- main/lib/idds/core/meta.py | 58 ++++++++++ main/lib/idds/core/requests.py | 4 +- .../354f8e5a5879_add_meta_info_table.py | 57 ++++++++++ main/lib/idds/orm/base/models.py | 2 +- main/lib/idds/orm/meta.py | 102 ++++++++++++++++++ 5 files changed, 220 insertions(+), 3 deletions(-) create mode 100644 main/lib/idds/core/meta.py create mode 100644 main/lib/idds/orm/base/alembic/versions/354f8e5a5879_add_meta_info_table.py create mode 100644 main/lib/idds/orm/meta.py diff --git a/main/lib/idds/core/meta.py b/main/lib/idds/core/meta.py new file mode 100644 index 00000000..c7c9415e --- /dev/null +++ b/main/lib/idds/core/meta.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0OA +# +# Authors: +# - Wen Guan, , 2024 + + +""" +operations related to Meta info. +""" + +from idds.common.constants import MetaStatus +from idds.orm import meta as orm_meta +from idds.orm.base.session import read_session, transactional_session + + +@transactional_session +def add_meta_item(name, status=MetaStatus.Active, description=None, meta_info=None, session=None): + """ + Add a meta item. + + :param name: The meta name. + :param status: The meta status. + :param description: The meta description. + :param meta_info: The metadata. + :param session: The database session. + """ + return orm_meta.add_meta_item(name=name, status=status, description=description, + meta_info=meta_info, session=session) + + +@read_session +def get_meta_item(name, session=None): + """ + Retrieve meta item. + + :param name: The meta name. + :param session: The database session. + + :returns metainfo: dictionary of meta info + """ + orm_meta.get_meta_item(name=name, session=session) + + +@read_session +def get_meta_items(session=None): + """ + Retrieve meta items. + + :param session: The database session. + + :returns metainfo: List of dictionaries + """ + orm_meta.get_meta_items(session=session) diff --git a/main/lib/idds/core/requests.py b/main/lib/idds/core/requests.py index fb6d3084..0c2f4077 100644 --- a/main/lib/idds/core/requests.py +++ b/main/lib/idds/core/requests.py @@ -497,7 +497,7 @@ def set_min_request_id(min_request_id, session=None): :param min_request_id: Int of min_request_id. """ orm_meta.add_meta_item(name='min_request_id', status=MetaStatus.Active, description="min request id", - metadata={"min_request_id": min_request_id}, session=None) + meta_info={"min_request_id": min_request_id}, session=None) @read_session @@ -511,4 +511,4 @@ def get_min_request_id(session=None): if not meta: return None else: - return meta.get("min_request_id", None) + return meta['meta_info'].get("min_request_id", None) diff --git a/main/lib/idds/orm/base/alembic/versions/354f8e5a5879_add_meta_info_table.py b/main/lib/idds/orm/base/alembic/versions/354f8e5a5879_add_meta_info_table.py new file mode 100644 index 00000000..2689f8f7 --- /dev/null +++ b/main/lib/idds/orm/base/alembic/versions/354f8e5a5879_add_meta_info_table.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0OA +# +# Authors: +# - Wen Guan, , 2023 + +"""add meta info table + +Revision ID: 354f8e5a5879 +Revises: 1bc6e82e8514 +Create Date: 2024-01-09 10:26:54.783489+00:00 + +""" + +import datetime + +from alembic import op +from alembic import context +import sqlalchemy as sa + +from idds.common.constants import MetaStatus +from idds.orm.base.types import EnumWithValue +from idds.orm.base.types import JSON + +# revision identifiers, used by Alembic. +revision = '354f8e5a5879' +down_revision = '1bc6e82e8514' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + if context.get_context().dialect.name in ['oracle', 'mysql', 'postgresql']: + schema = context.get_context().version_table_schema if context.get_context().version_table_schema else '' + op.create_table('meta_info', + sa.Column('meta_id', sa.BigInteger(), sa.Sequence('METAINFO_ID_SEQ', schema=schema)), + sa.Column('name', sa.String(50), nullable=False), + sa.Column('status', EnumWithValue(MetaStatus), nullable=False), + sa.Column("created_at", sa.DateTime, default=datetime.datetime.utcnow, nullable=False), + sa.Column("updated_at", sa.DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False), + sa.Column("description", sa.String(1000)), + sa.Column('meta_info', JSON()), + schema=schema) + op.create_primary_key('METAINFO_PK', 'meta_info', ['meta_id'], schema=schema) + op.create_unique_constraint('METAINFO_NAME_UQ', 'meta_info', ['name'], schema=schema) + + +def downgrade() -> None: + if context.get_context().dialect.name in ['oracle', 'mysql', 'postgresql']: + schema = context.get_context().version_table_schema if context.get_context().version_table_schema else '' + op.drop_constraint('METAINFO_NAME_UQ', table_name='meta_info', schema=schema) + op.drop_constraint('METAINFO_PK', table_name='meta_info', schema=schema) + op.drop_table('meta_info', schema=schema) diff --git a/main/lib/idds/orm/base/models.py b/main/lib/idds/orm/base/models.py index 7c06eecd..cbdd8ff2 100644 --- a/main/lib/idds/orm/base/models.py +++ b/main/lib/idds/orm/base/models.py @@ -933,7 +933,7 @@ class MetaInfo(BASE, ModelBase): created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False) updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False) description = Column(String(1000), nullable=True) - metadata = Column(JSON()) + meta_info = Column(JSON()) __table_args__ = (PrimaryKeyConstraint('meta_id', name='METAINFO_PK'), UniqueConstraint('name', name='METAINFO_NAME_UQ')) diff --git a/main/lib/idds/orm/meta.py b/main/lib/idds/orm/meta.py new file mode 100644 index 00000000..a57f38fe --- /dev/null +++ b/main/lib/idds/orm/meta.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0OA +# +# Authors: +# - Wen Guan, , 2024 + + +""" +operations related to Health. +""" + +import datetime +import re + +from sqlalchemy.exc import DatabaseError, IntegrityError + +from idds.common import exceptions +from idds.common.constants import MetaStatus +from idds.orm.base import models +from idds.orm.base.session import read_session, transactional_session + + +@transactional_session +def add_meta_item(name, status=MetaStatus.Active, description=None, meta_info=None, session=None): + """ + Add a meta item. + + :param name: The meta name. + :param status: The meta status. + :param description: The meta description. + :param meta_info: The metadata. + :param session: The database session. + """ + + try: + to_update = {'updated_at': datetime.datetime.utcnow()} + if status: + to_update['status'] = status + if description: + to_update['description'] = description + if meta_info: + to_update['meta_info'] = meta_info + counts = session.query(models.MetaInfo)\ + .filter(models.MetaInfo.name == name)\ + .update(to_update) + if not counts: + new_item = models.MetaInfo(name=name, status=status, description=description, meta_info=meta_info) + new_item.save(session=session) + except IntegrityError as e: + if re.match('.*ORA-00001.*', e.args[0]) or re.match('.*unique constraint.*', e.args[0]): + print("unique constraintviolated: %s" % str(e)) + except DatabaseError as e: + raise exceptions.DatabaseException('Could not persist meta info: %s' % str(e)) + + +@read_session +def get_meta_item(name, session=None): + """ + Retrieve meta item. + + :param name: The meta name. + :param session: The database session. + + :returns metainfo: dictionary of meta info + """ + try: + query = session.query(models.MetaInfo) + query = query.filter(models.MetaInfo.name == name) + + ret = query.first() + if not ret: + return None + else: + return ret.to_dict() + except IntegrityError as e: + raise exceptions.DatabaseException(e.args) + + +@read_session +def get_meta_items(session=None): + """ + Retrieve meta items. + + :param session: The database session. + + :returns metainfo: List of dictionaries + """ + items = [] + try: + query = session.query(models.MetaInfo) + + tmp = query.all() + if tmp: + for t in tmp: + items.append(t.to_dict()) + return items + except IntegrityError as e: + raise exceptions.DatabaseException(e.args) From 08cbcedf663b05ae1ffde2771ad79a8baffdeadc Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 15 Jan 2024 20:34:43 +0100 Subject: [PATCH 8/9] fix to check whether a message has been processed --- main/lib/idds/agents/conductor/conductor.py | 51 ++++++++++++--------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/main/lib/idds/agents/conductor/conductor.py b/main/lib/idds/agents/conductor/conductor.py index 4764fc5b..226306a0 100644 --- a/main/lib/idds/agents/conductor/conductor.py +++ b/main/lib/idds/agents/conductor/conductor.py @@ -208,32 +208,39 @@ def is_message_processed(self, message): if 'relation_type' not in msg_content or msg_content['relation_type'] != 'input': return True + workload_id = msg_content['workload_id'] + processings = core_processings.get_processings_by_transform_id(transform_id=transform_id) + find_processing = None + if processings: + for processing in processings: + if processing['workload_id'] == workload_id: + find_processing = processing + if find_processing and find_processing['status'] in [ProcessingStatus.Finished, ProcessingStatus.Failed, + ProcessingStatus.Lost, ProcessingStatus.SubFinished, + ProcessingStatus.Cancelled, ProcessingStatus.Expired, + ProcessingStatus.Suspended, ProcessingStatus.Broken]: + return True + files = msg_content['files'] - one_file = files[0] - # only check one file in a message - map_id = one_file['map_id'] + files_map_id = [f['map_id'] for f in files] contents = core_catalog.get_contents_by_request_transform(request_id=request_id, - transform_id=transform_id, - map_id=map_id) + transform_id=transform_id) + proc_conents = {} for content in contents: if content['content_relation_type'] == ContentRelationType.Output: - if (content['status'] == ContentStatus.Missing): - workload_id = msg_content['workload_id'] - processings = core_processings.get_processings_by_transform_id(transform_id=transform_id) - find_processing = None - if processings: - for processing in processings: - if processing['workload_id'] == workload_id: - find_processing = processing - if find_processing and find_processing['status'] in [ProcessingStatus.Finished, ProcessingStatus.Failed, - ProcessingStatus.Lost, ProcessingStatus.SubFinished, - ProcessingStatus.Cancelled, ProcessingStatus.Expired, - ProcessingStatus.Suspended, ProcessingStatus.Broken]: - return True - else: - return False - if (content['status'] != ContentStatus.New): - return True + if content['map_id'] not in proc_conents: + proc_conents[content['map_id']] = [] + if content['status'] not in proc_conents[content['map_id']]: + proc_conents[content['map_id']].append(content['status']) + all_map_id_processed = True + for map_id in files_map_id: + content_statuses = proc_conents.get(map_id, []) + if not content_statuses: + pass + if len(content_statuses) == 1 and content_statuses == [ContentStatus.New]: + all_map_id_processed = False + return all_map_id_processed + return all_map_id_processed except Exception as ex: self.logger.error(ex) self.logger.error(traceback.format_exc()) From 52acf3453d5dd8c42ff5d7d730a1eaa911f8949f Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Mon, 15 Jan 2024 20:36:03 +0100 Subject: [PATCH 9/9] fix to check whether a message has been processed --- main/lib/idds/tests/core_tests.py | 4 ++-- main/lib/idds/tests/panda_test.py | 27 ++++++++++++++++----------- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/main/lib/idds/tests/core_tests.py b/main/lib/idds/tests/core_tests.py index 15f2b374..cd265c0e 100644 --- a/main/lib/idds/tests/core_tests.py +++ b/main/lib/idds/tests/core_tests.py @@ -174,8 +174,8 @@ def print_workflow_template(workflow, layers=0): # reqs = get_requests(request_id=385554, with_request=True, with_detail=False, with_metadata=True) reqs = get_requests(request_id=479187, with_request=True, with_detail=False, with_metadata=True) reqs = get_requests(request_id=4498, with_request=True, with_detail=False, with_metadata=True) -reqs = get_requests(request_id=4615, with_request=True, with_detail=False, with_metadata=True) -reqs = get_requests(request_id=535793, with_request=True, with_detail=False, with_metadata=True) +reqs = get_requests(request_id=3244, with_request=True, with_detail=False, with_metadata=True) +# reqs = get_requests(request_id=589913, with_request=True, with_detail=False, with_metadata=True) for req in reqs: # print(req['request_id']) # print(req) diff --git a/main/lib/idds/tests/panda_test.py b/main/lib/idds/tests/panda_test.py index 37b093fc..8fd68a49 100644 --- a/main/lib/idds/tests/panda_test.py +++ b/main/lib/idds/tests/panda_test.py @@ -46,6 +46,8 @@ task_ids = [166253, 166254] task_ids = [167759] task_ids = [i for i in range(167781, 167785)] +task_ids = [i for i in range(166799, 167877)] +task_ids = [i for i in range(167997, 168003)] for task_id in task_ids: print("Killing %s" % task_id) ret = Client.killTask(task_id, verbose=True) @@ -90,17 +92,18 @@ jobs_list = ret[1] # print(jobs_list) for job_info in jobs_list: - print(job_info) - print(job_info.eventService) - print(job_info.jobStatus) - print(job_info.jobSubStatus) - print(job_info.jobsetID) - print(job_info.taskID) - print(job_info.jediTaskID) - print(job_info.Files) - for job_file in job_info.Files: - print(job_file.type) - print(job_file.lfn) + if job_info: + print(job_info) + print(job_info.eventService) + print(job_info.jobStatus) + print(job_info.jobSubStatus) + print(job_info.jobsetID) + print(job_info.taskID) + print(job_info.jediTaskID) + print(job_info.Files) + for job_file in job_info.Files: + print(job_file.type) + print(job_file.lfn) # sys.exit(0) jediTaskID = 166303 @@ -112,6 +115,8 @@ panda_ids = [{'task_id': 166303, 'panda_id': 66573292}] panda_ids = [{'task_id': 166643, 'panda_id': 66988434}] panda_ids = [{'task_id': 166943, 'panda_id': 67228019}] +panda_ids = [{"task_id": 167852, "panda_id": 67486349}] +panda_ids = [{"task_id": 167852, "panda_id": 67486348}] ret = Client.get_events_status(panda_ids, verbose=True) print(ret)