Skip to content

Commit

Permalink
Merge pull request #267 from wguanicedew/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
wguanicedew authored Jan 15, 2024
2 parents 7d40e1b + 52acf34 commit c19b1af
Show file tree
Hide file tree
Showing 14 changed files with 431 additions and 72 deletions.
130 changes: 108 additions & 22 deletions doma/lib/idds/doma/workflowv2/domapandawork.py

Large diffs are not rendered by default.

16 changes: 12 additions & 4 deletions doma/lib/idds/doma/workflowv2/domatree.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,14 @@ def order_job_nodes(self, job_nodes):
for job_node in job_nodes:
potential_order_id = job_node.get_potential_order_id()
if potential_order_id not in job_nodes_order_id:
job_nodes_order_id[potential_order_id] = job_node
job_nodes_order_id[potential_order_id] = []
job_nodes_order_id[potential_order_id].append(job_node)
potential_order_ids = sorted(list(job_nodes_order_id.keys()))
ordered_job_nodes = []
for potential_order_id in potential_order_ids:
ordered_job_nodes.append(job_nodes_order_id[potential_order_id])
p_job_nodes = job_nodes_order_id[potential_order_id]
for p_job_node in p_job_nodes:
ordered_job_nodes.append(p_job_node)
order_id = 0
for job_node in ordered_job_nodes:
job_node.order_id = order_id
Expand All @@ -134,7 +137,7 @@ def order_job_tree(self, label_level_dict):
job_nodes = label_node.jobs
self.order_job_nodes(job_nodes)

def save_order_id_map(self, label_level_dict, order_id_map_file):
def generate_order_id_map(self, label_level_dict):
order_id_map = {}
for level in label_level_dict:
for node in label_level_dict[level]:
Expand All @@ -146,6 +149,9 @@ def save_order_id_map(self, label_level_dict, order_id_map_file):
gwjob = job_node.gwjob
order_id = gwjob.attrs.get("order_id", 0)
order_id_map[label_name][str(order_id)] = gwjob.name
return order_id_map

def save_order_id_map(self, order_id_map, order_id_map_file):
with open(order_id_map_file, 'w') as f:
json.dump(order_id_map, f)

Expand All @@ -163,4 +169,6 @@ def order_jobs_from_generic_workflow(self, generic_workflow, order_id_map_file):
label_level_dict = self.get_ordered_nodes_by_level(label_tree_roots)

self.order_job_tree(label_level_dict)
self.save_order_id_map(label_level_dict, order_id_map_file)
# self.save_order_id_map(label_level_dict, order_id_map_file)
order_id_map = self.generate_order_id_map(label_level_dict)
return order_id_map
17 changes: 17 additions & 0 deletions main/etc/sql/oracle_update.sql
Original file line number Diff line number Diff line change
Expand Up @@ -451,3 +451,20 @@ alter table contents drop constraint CONTENT_ID_UQ;
alter table contents add constraint CONTENT_ID_UQ UNIQUE (transform_id, coll_id, map_id, sub_map_id, dep_sub_map_id, content_relation_type, name_md5, scope_name_md5, min_id, max_id) USING INDEX LOCAL;
drop index CONTENTS_ID_NAME_IDX;
CREATE INDEX CONTENTS_ID_NAME_IDX ON CONTENTS (coll_id, scope, standard_hash(name, 'MD5'), status);


--- 20240111
CREATE SEQUENCE METAINFO_ID_SEQ MINVALUE 1 INCREMENT BY 1 START WITH 1 NOCACHE ORDER NOCYCLE GLOBAL;
CREATE TABLE meta_info
(
meta_id NUMBER(12) DEFAULT ON NULL METAINFO_ID_SEQ.NEXTVAL constraint METAINFO_ID_NN NOT NULL,
name VARCHAR2(50),
status NUMBER(2),
created_at DATE DEFAULT SYS_EXTRACT_UTC(systimestamp(0)),
updated_at DATE DEFAULT SYS_EXTRACT_UTC(systimestamp(0)),
description VARCHAR2(1000),
metadata CLOB,
CONSTRAINT METAINFO_PK PRIMARY KEY (meta_id), -- USING INDEX LOCAL,
CONSTRAINT METAINFO_NAME_UQ UNIQUE (name)
);

3 changes: 3 additions & 0 deletions main/lib/idds/agents/carrier/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,7 @@ def get_updated_contents_by_input_output_maps(input_output_maps=None, terminated
content['substatus'] = input_content_update_status
updated_contents_full_input.append(content)
u_content_substatus = {'content_id': content['content_id'],
'status': content['substatus'],
'substatus': content['substatus'],
'request_id': content['request_id'],
'transform_id': content['transform_id'],
Expand All @@ -756,6 +757,7 @@ def get_updated_contents_by_input_output_maps(input_output_maps=None, terminated
content['substatus'] = output_content_update_status
updated_contents_full_output.append(content)
u_content_substatus = {'content_id': content['content_id'],
'status': content['substatus'],
'substatus': content['substatus'],
'request_id': content['request_id'],
'transform_id': content['transform_id'],
Expand Down Expand Up @@ -1311,6 +1313,7 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates=
if not work.es or con['substatus'] in [ContentStatus.Available]:
con_dict = {'content_id': con['content_id'],
'request_id': con['request_id'],
'status': con['substatus'],
'substatus': con['substatus']}
if 'content_metadata' in con and con['content_metadata']:
con_dict['content_metadata'] = con['content_metadata']
Expand Down
25 changes: 20 additions & 5 deletions main/lib/idds/agents/clerk/clerk.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# http://www.apache.org/licenses/LICENSE-2.0OA
#
# Authors:
# - Wen Guan, <[email protected]>, 2019 - 2023
# - Wen Guan, <[email protected]>, 2019 - 2024

import datetime
import random
Expand Down Expand Up @@ -157,13 +157,19 @@ def get_new_requests(self):
if time.time() < self.start_at + 3600:
if BaseAgent.poll_new_min_request_id_times % 30 == 0:
# get_new_requests is called every 10 seconds. 30 * 10 = 300 seconds, which is 5 minutes.
min_request_id = BaseAgent.min_request_id - 1000
if BaseAgent.min_request_id:
min_request_id = BaseAgent.min_request_id - 1000
else:
min_request_id = None
else:
min_request_id = BaseAgent.min_request_id
else:
if BaseAgent.poll_new_min_request_id_times % 180 == 0:
# get_new_requests is called every 10 seconds. 180 * 10 = 300 seconds, which is 30 minutes.
min_request_id = BaseAgent.min_request_id - 1000
if BaseAgent.min_request_id:
min_request_id = BaseAgent.min_request_id - 1000
else:
min_request_id = None
else:
min_request_id = BaseAgent.min_request_id

Expand All @@ -184,6 +190,7 @@ def get_new_requests(self):
BaseAgent.min_request_id_cache[req_id] = time.time()
if BaseAgent.min_request_id is None or BaseAgent.min_request_id > req_id:
BaseAgent.min_request_id = req_id
self.logger.info("new min_request_id: %s" % BaseAgent.min_request_id)
core_requests.set_min_request_id(BaseAgent.min_request_id)

event = NewRequestEvent(publisher_id=self.id, request_id=req_id)
Expand Down Expand Up @@ -213,13 +220,19 @@ def get_running_requests(self):
if time.time() < self.start_at + 3600:
if BaseAgent.poll_running_min_request_id_times % 30 == 0:
# get_new_requests is called every 10 seconds. 30 * 10 = 300 seconds, which is 5 minutes.
min_request_id = BaseAgent.min_request_id - 1000
if BaseAgent.min_request_id:
min_request_id = BaseAgent.min_request_id - 1000
else:
min_request_id = None
else:
min_request_id = BaseAgent.min_request_id
else:
if BaseAgent.poll_running_min_request_id_times % 180 == 0:
# get_new_requests is called every 10 seconds. 180 * 10 = 1800 seconds, which is 30 minutes.
min_request_id = BaseAgent.min_request_id - 1000
if BaseAgent.min_request_id:
min_request_id = BaseAgent.min_request_id - 1000
else:
min_request_id = None
else:
min_request_id = BaseAgent.min_request_id

Expand All @@ -245,6 +258,7 @@ def get_running_requests(self):
BaseAgent.min_request_id_cache[req_id] = time.time()
if BaseAgent.min_request_id is None or BaseAgent.min_request_id > req_id:
BaseAgent.min_request_id = req_id
self.logger.info("new min_request_id: %s" % BaseAgent.min_request_id)
core_requests.set_min_request_id(BaseAgent.min_request_id)

event = UpdateRequestEvent(publisher_id=self.id, request_id=req_id)
Expand Down Expand Up @@ -295,6 +309,7 @@ def get_operation_requests(self):

if BaseAgent.min_request_id is None or BaseAgent.min_request_id > request_id:
BaseAgent.min_request_id = request_id
self.logger.info("new min_request_id: %s" % BaseAgent.min_request_id)
BaseAgent.min_request_id_cache[request_id] = time.time()
core_requests.set_min_request_id(BaseAgent.min_request_id)

Expand Down
7 changes: 4 additions & 3 deletions main/lib/idds/agents/common/baseagent.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,10 @@ def execute_event_schedule(self):
num_free_workers = self.executors.get_num_free_workers()
if num_free_workers > 0:
events = self.event_bus.get(event_type, num_free_workers)
for event in events:
future = self.executors.submit(exec_func, event)
self.event_futures[event._id] = (event, future, time.time())
if events:
for event in events:
future = self.executors.submit(exec_func, event)
self.event_futures[event._id] = (event, future, time.time())
event_funcs[event_type]["to_exec_at"] = time.time() + self.event_interval_delay

def execute_schedules(self):
Expand Down
51 changes: 29 additions & 22 deletions main/lib/idds/agents/conductor/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,32 +208,39 @@ def is_message_processed(self, message):
if 'relation_type' not in msg_content or msg_content['relation_type'] != 'input':
return True

workload_id = msg_content['workload_id']
processings = core_processings.get_processings_by_transform_id(transform_id=transform_id)
find_processing = None
if processings:
for processing in processings:
if processing['workload_id'] == workload_id:
find_processing = processing
if find_processing and find_processing['status'] in [ProcessingStatus.Finished, ProcessingStatus.Failed,
ProcessingStatus.Lost, ProcessingStatus.SubFinished,
ProcessingStatus.Cancelled, ProcessingStatus.Expired,
ProcessingStatus.Suspended, ProcessingStatus.Broken]:
return True

files = msg_content['files']
one_file = files[0]
# only check one file in a message
map_id = one_file['map_id']
files_map_id = [f['map_id'] for f in files]
contents = core_catalog.get_contents_by_request_transform(request_id=request_id,
transform_id=transform_id,
map_id=map_id)
transform_id=transform_id)
proc_conents = {}
for content in contents:
if content['content_relation_type'] == ContentRelationType.Output:
if (content['status'] == ContentStatus.Missing):
workload_id = msg_content['workload_id']
processings = core_processings.get_processings_by_transform_id(transform_id=transform_id)
find_processing = None
if processings:
for processing in processings:
if processing['workload_id'] == workload_id:
find_processing = processing
if find_processing and find_processing['status'] in [ProcessingStatus.Finished, ProcessingStatus.Failed,
ProcessingStatus.Lost, ProcessingStatus.SubFinished,
ProcessingStatus.Cancelled, ProcessingStatus.Expired,
ProcessingStatus.Suspended, ProcessingStatus.Broken]:
return True
else:
return False
if (content['status'] != ContentStatus.New):
return True
if content['map_id'] not in proc_conents:
proc_conents[content['map_id']] = []
if content['status'] not in proc_conents[content['map_id']]:
proc_conents[content['map_id']].append(content['status'])
all_map_id_processed = True
for map_id in files_map_id:
content_statuses = proc_conents.get(map_id, [])
if not content_statuses:
pass
if len(content_statuses) == 1 and content_statuses == [ContentStatus.New]:
all_map_id_processed = False
return all_map_id_processed
return all_map_id_processed
except Exception as ex:
self.logger.error(ex)
self.logger.error(traceback.format_exc())
Expand Down
58 changes: 58 additions & 0 deletions main/lib/idds/core/meta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#!/usr/bin/env python
#
# Licensed under the Apache License, Version 2.0 (the "License");
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0OA
#
# Authors:
# - Wen Guan, <[email protected]>, 2024


"""
operations related to Meta info.
"""

from idds.common.constants import MetaStatus
from idds.orm import meta as orm_meta
from idds.orm.base.session import read_session, transactional_session


@transactional_session
def add_meta_item(name, status=MetaStatus.Active, description=None, meta_info=None, session=None):
"""
Add a meta item.
:param name: The meta name.
:param status: The meta status.
:param description: The meta description.
:param meta_info: The metadata.
:param session: The database session.
"""
return orm_meta.add_meta_item(name=name, status=status, description=description,
meta_info=meta_info, session=session)


@read_session
def get_meta_item(name, session=None):
"""
Retrieve meta item.
:param name: The meta name.
:param session: The database session.
:returns metainfo: dictionary of meta info
"""
orm_meta.get_meta_item(name=name, session=session)


@read_session
def get_meta_items(session=None):
"""
Retrieve meta items.
:param session: The database session.
:returns metainfo: List of dictionaries
"""
orm_meta.get_meta_items(session=session)
4 changes: 2 additions & 2 deletions main/lib/idds/core/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ def set_min_request_id(min_request_id, session=None):
:param min_request_id: Int of min_request_id.
"""
orm_meta.add_meta_item(name='min_request_id', status=MetaStatus.Active, description="min request id",
metadata={"min_request_id": min_request_id}, session=None)
meta_info={"min_request_id": min_request_id}, session=None)


@read_session
Expand All @@ -511,4 +511,4 @@ def get_min_request_id(session=None):
if not meta:
return None
else:
return meta.get("min_request_id", None)
return meta['meta_info'].get("min_request_id", None)
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/usr/bin/env python
#
# Licensed under the Apache License, Version 2.0 (the "License");
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0OA
#
# Authors:
# - Wen Guan, <[email protected]>, 2023

"""add meta info table
Revision ID: 354f8e5a5879
Revises: 1bc6e82e8514
Create Date: 2024-01-09 10:26:54.783489+00:00
"""

import datetime

from alembic import op
from alembic import context
import sqlalchemy as sa

from idds.common.constants import MetaStatus
from idds.orm.base.types import EnumWithValue
from idds.orm.base.types import JSON

# revision identifiers, used by Alembic.
revision = '354f8e5a5879'
down_revision = '1bc6e82e8514'
branch_labels = None
depends_on = None


def upgrade() -> None:
if context.get_context().dialect.name in ['oracle', 'mysql', 'postgresql']:
schema = context.get_context().version_table_schema if context.get_context().version_table_schema else ''
op.create_table('meta_info',
sa.Column('meta_id', sa.BigInteger(), sa.Sequence('METAINFO_ID_SEQ', schema=schema)),
sa.Column('name', sa.String(50), nullable=False),
sa.Column('status', EnumWithValue(MetaStatus), nullable=False),
sa.Column("created_at", sa.DateTime, default=datetime.datetime.utcnow, nullable=False),
sa.Column("updated_at", sa.DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False),
sa.Column("description", sa.String(1000)),
sa.Column('meta_info', JSON()),
schema=schema)
op.create_primary_key('METAINFO_PK', 'meta_info', ['meta_id'], schema=schema)
op.create_unique_constraint('METAINFO_NAME_UQ', 'meta_info', ['name'], schema=schema)


def downgrade() -> None:
if context.get_context().dialect.name in ['oracle', 'mysql', 'postgresql']:
schema = context.get_context().version_table_schema if context.get_context().version_table_schema else ''
op.drop_constraint('METAINFO_NAME_UQ', table_name='meta_info', schema=schema)
op.drop_constraint('METAINFO_PK', table_name='meta_info', schema=schema)
op.drop_table('meta_info', schema=schema)
2 changes: 1 addition & 1 deletion main/lib/idds/orm/base/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ class MetaInfo(BASE, ModelBase):
created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False)
updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)
description = Column(String(1000), nullable=True)
metadata = Column(JSON())
meta_info = Column(JSON())

__table_args__ = (PrimaryKeyConstraint('meta_id', name='METAINFO_PK'),
UniqueConstraint('name', name='METAINFO_NAME_UQ'))
Expand Down
Loading

0 comments on commit c19b1af

Please sign in to comment.