Skip to content

Commit

Permalink
Merge pull request #831 from alliance-genome/wft_cache
Browse files Browse the repository at this point in the history
store ATP stuff in globals.
  • Loading branch information
sweng66 authored Feb 14, 2025
2 parents cb97870 + b1a1757 commit 092fa61
Show file tree
Hide file tree
Showing 20 changed files with 628 additions and 622 deletions.
258 changes: 183 additions & 75 deletions agr_literature_service/api/crud/ateam_db_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,28 @@
from fastapi.responses import JSONResponse
from sqlalchemy.orm import Session
from os import environ
from typing import Dict
from sqlalchemy import text
from fastapi import HTTPException, status
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import cachetools.func
import logging

logger = logging.getLogger(__name__)

# List of valid prefix identifiers for curies
curie_prefix_list = ["FB", "MGI", "RGD", "SGD", "WB", "XenBase", "ZFIN"]

# Topic tag for ATP ontology
topic_category_atp = "ATP:0000002"

# Store these to save lookups.
atp_to_name: Dict[str, str] = {}
name_to_atp: Dict[str, str] = {}
atp_to_parent: Dict[str, str] = {}
atp_to_children : Dict[str, list] = {}


def create_ateam_db_session():
"""Create and return a SQLAlchemy session connected to the A-team database."""
Expand All @@ -21,8 +32,10 @@ def create_ateam_db_session():
SERVER = environ.get('PERSISTENT_STORE_DB_HOST', 'localhost')
PORT = environ.get('PERSISTENT_STORE_DB_PORT', '5432')
DB = environ.get('PERSISTENT_STORE_DB_NAME', 'unknown')

engine_var = 'postgresql://' + USER + ":" + PASSWORD + '@' + SERVER + ':' + PORT + '/' + DB
engine = create_engine(engine_var)

SessionClass = sessionmaker(bind=engine, autoflush=False, autocommit=False)
session = SessionClass()
return session
Expand Down Expand Up @@ -75,16 +88,14 @@ def classify_entity_list(entity_list):
else:
entity_name_list.append(entity.upper())

return (entity_name_list, entity_curie_list)
return entity_name_list, entity_curie_list


def search_for_entity_names(db: Session, entity_type, entity_name_list, taxon):
"""Look up entities in the DB by name (gene symbol, allele symbol, etc.), restricted by taxon."""
if len(entity_name_list) == 0:
return []

sql_query = None

if entity_type == 'gene':
"""
gene symbol: ACT1
Expand Down Expand Up @@ -163,8 +174,6 @@ def search_for_entity_curies(db: Session, entity_type, entity_curie_list):
if len(entity_curie_list) == 0:
return []

sql_query = None

if entity_type in ['gene', 'allele']:
entity_table_name = entity_type
sql_query = text(f"""
Expand Down Expand Up @@ -214,7 +223,6 @@ def search_topic(topic):
ORDER BY LENGTH(ot.name)
LIMIT 10
""")

rows = db.execute(sql_query, {
'search_query': search_query,
'topic_category_atp': topic_category_atp
Expand All @@ -229,10 +237,12 @@ def search_topic(topic):
]
db.close()
json_data = jsonable_encoder(data)

return JSONResponse(content=json_data)


def search_atp_descendants(ancestor_curie):
atp_get_children_as_dict(ancestor_curie)
db = create_ateam_db_session()
sql_query = text("""
SELECT ot.curie, ot.name
Expand Down Expand Up @@ -261,7 +271,6 @@ def search_atp_descendants(ancestor_curie):
def search_species(species):
"""Search for species in the NCBITaxonTerm ontology, matching either a curie or name."""
db = create_ateam_db_session()
search_query = None

if species.upper().startswith("NCBITAXON"):
search_query = f"{species.upper()}%"
Expand Down Expand Up @@ -295,44 +304,15 @@ def search_species(species):
return JSONResponse(content=json_data)


def map_atp_id_to_name(db: Session, atp_id):
"""
Given an ATPTerm curie (e.g. "ATP:0001234"), return the corresponding name.
"""
sql_query = text("""
SELECT name
FROM ontologyterm
WHERE ontologytermtype = 'ATPTerm'
AND curie = :atp_id
""")
row = db.execute(sql_query, {'atp_id': atp_id}).fetchone()
if row:
return row[0]
return None


def search_atp_ontology():
"""Return all ATPTerms from the ontologyterm table."""
db = create_ateam_db_session()
sql_query = text("""
SELECT curie, name, obsolete
FROM ontologyterm
WHERE ontologytermtype = 'ATPTerm'
""")
rows = db.execute(sql_query).fetchall()

result = [
{"curie": row.curie, "name": row.name, "obsolete": row.obsolete}
for row in rows
]
db.close()
return result


def search_ancestors_or_descendants(ontology_node, ancestors_or_descendants):
"""Return a list of ancestor or descendant curies for the given ontology_node."""
db = create_ateam_db_session()
# ATPs are already cached, so use that if applicable.
if ontology_node.startswith("ATP:"):
if ancestors_or_descendants == 'descendants':
return atp_get_all_descendents(ontology_node)
return atp_get_all_ancestors(ontology_node)

db = create_ateam_db_session()
if ancestors_or_descendants == 'descendants':
sql_query = text("""
SELECT ot.curie
Expand Down Expand Up @@ -362,21 +342,23 @@ def map_curies_to_names(category, curies):
Given a category (gene, allele, etc.) and a list of curies,
return a dictionary mapping each curie to its preferred display name.
"""
db = create_ateam_db_session()

if not curies:
return {}

# If category is an ATP:xxxx ID, look up its name first
if category.startswith('ATP:'):
category_label = map_atp_id_to_name(db, category)
category_label = atp_get_name(category)
if category_label is None:
# If we can't find a label for the ATP category, just return identity mapping.
return {curie: curie for curie in curies}
category = category_label

category = category.lower()
sql_query = None
if category in 'atpterm':
return atp_to_name_subset(curies)

db = create_ateam_db_session()
if category == 'gene':
sql_query = text("""
SELECT be.primaryexternalid, sa.displaytext
Expand Down Expand Up @@ -412,7 +394,7 @@ def map_curies_to_names(category, curies):
AND sa.slotannotationtype = 'ConstructSymbolSlotAnnotation'
""")

elif category in ['species', 'atpterm', 'ecoterm']:
elif category in ['species', 'ecoterm']:
# Do an uppercase match
curies = [curie.upper() for curie in curies]
sql_query = text("""
Expand All @@ -433,49 +415,175 @@ def map_curies_to_names(category, curies):
return curie_to_name_map


def get_name_to_atp_and_children(curie):
def set_globals(atp_to_name_init, name_to_atp_init, atp_to_children_init, atp_to_parent_init):
global atp_to_name, name_to_atp, atp_to_children, atp_to_parent

atp_to_name = atp_to_name_init.copy()
name_to_atp = name_to_atp_init.copy()
atp_to_children = atp_to_children_init.copy()
atp_to_parent = atp_to_parent_init.copy()


def load_name_to_atp_and_relationships(termtype='ATPTerm'):
"""
Add data to atp_to_name and name_to_atp dictionaries.
From the top curie given go down all children and store the data.
"""
# To minimise db calls grab all atp data and then process
db = create_ateam_db_session()
global atp_to_name, name_to_atp, atp_to_children, atp_to_parent

try:
db = create_ateam_db_session()
except Exception as e:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail=f"Exception {e} Setting connection of ateam db")
# Load atp data
id_to_curie = {}
sql_query = text("""
SELECT o.curie as curie, o.name as name, o.obsolete as obsolete, o.id as id, opc.isachildren_id as child
FROM ontologyterm o
LEFT JOIN ontologyterm_isa_parent_children opc ON o.id = opc.isaparents_id
WHERE o.ontologytermtype = 'ATPTerm'
""")
rows = db.execute(sql_query).fetchall()
curie_to_id = {}
id_to_curie = {}
full_name_to_atp = {} # list of all atp, which include extras
full_atp_to_name = {}
children = {}
try:
rows = db.execute(sql_query).fetchall()
except Exception as e:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail=f"Exception {e} Doing query of ateam db")
db.close()

# need ALL id_to_curies loaded before we do child/parents
for row in rows:
if row.obsolete:
continue
curie_to_id[row.curie] = row.id
id_to_curie[row.id] = row.curie
full_name_to_atp[row.name] = row.curie
full_atp_to_name[row.curie] = row.name
if row.id in children:
children[row.id].append(row.child)
name_to_atp[row.name] = row.curie
atp_to_name[row.curie] = row.name

# Load the relationships
# hopefully the results are cached.
for row in rows:
if row.obsolete:
continue
if row.child:
child_curie = id_to_curie[row.child]
else:
children[row.id] = [row.child]
db.close()
child_curie = None
parent_curie = id_to_curie[row.id]
if child_curie and parent_curie in atp_to_children:
atp_to_children[parent_curie].append(child_curie)
else:
atp_to_children[parent_curie] = [child_curie]
if child_curie:
atp_to_parent[child_curie] = parent_curie
logger.debug("ATP global vars successfully loaded")
return


def atp_get_parent(child_id):
if not atp_to_parent:
load_name_to_atp_and_relationships()
if child_id in atp_to_parent:
return atp_to_parent[child_id]
else:
return []


def atp_get_children(parent_id):
if not atp_to_children:
load_name_to_atp_and_relationships()
if parent_id in atp_to_children:
return atp_to_children[parent_id]
else:
return []


def atp_get_children_as_dict(parent_id):
children = atp_get_children(parent_id)
result = []
for atp_id in children:
result.append({'curie': atp_id, 'name': atp_to_name[atp_id]})
return result

# Now filter down to the ones we want.
name_to_atp = {}
atp_to_name = {}
id_list = [curie_to_id[curie]]
while len(id_list):
atp_id = id_list.pop()
if not atp_id:

def atp_to_name_subset(curies: list):
if not atp_to_name:
load_name_to_atp_and_relationships()
subset = {}
for curie in curies:
subset[curie] = atp_to_name[curie]
return subset


@cachetools.func.ttl_cache(ttl=24 * 60 * 60)
def atp_get_all_descendents(curie: str) -> list:
try:
return list(get_name_to_atp_for_all_children(curie)[1].keys())
except IndexError:
return []


def atp_get_all_ancestors(curie: str):
parent_list = []
not_seen = [curie]
while len(not_seen) > 0:
p = not_seen.pop(0)
if p in atp_to_parent:
parent = atp_to_parent[p]
parent_list.append(parent)
not_seen.append(parent)
return parent_list


def get_name_to_atp_for_all_children(workflow_parent):
"""
Get ALL descendents for an ATP.
as a dictionary of name to atp curies and
curies to names..
"""
# global atp_to_name, atp_to_children

if not atp_to_name:
load_name_to_atp_and_relationships()

subset_name_to_atp = {}
subset_atp_to_name = {}
list_ids = []
if workflow_parent in atp_to_children:
# NOTE:
# list_ids = atp_get_children(workflow_parent)
# Gives address and not contents!
list_ids.extend(atp_get_children(workflow_parent))
else:
return []

while len(list_ids) > 0:
curie = list_ids.pop()

if not curie:
continue
if atp_id in children and children[atp_id] is not None:
id_list.extend(children[atp_id])
atp_curie = id_to_curie[atp_id]
name_to_atp[full_atp_to_name[atp_curie]] = atp_curie
atp_to_name[atp_curie] = full_atp_to_name[atp_curie]
return name_to_atp, atp_to_name
subset_atp_to_name[curie] = atp_to_name[curie]
subset_name_to_atp[atp_to_name[curie]] = curie
if curie in atp_to_children:
list_ids.extend(atp_get_children(curie))
return subset_name_to_atp, subset_atp_to_name


def atp_get_name(atp_id):
if not atp_to_name:
try:
load_name_to_atp_and_relationships()
except Exception as e:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"{e}")
if atp_id in atp_to_name:
return atp_to_name[atp_id]
return None


def atp_return_invalid_ids(atp_ids: list):
if not atp_to_name:
load_name_to_atp_and_relationships()
invalid_atps = []
for atp_id in atp_ids:
if atp_id not in atp_to_name:
invalid_atps.append(atp_id)
return invalid_atps
Loading

0 comments on commit 092fa61

Please sign in to comment.