Skip to content

Commit

Permalink
OpenConceptLab/ocl_issues#1747 | Mappings to use resolve operations
Browse files Browse the repository at this point in the history
  • Loading branch information
snyaggarwal committed Jan 30, 2024
1 parent 1cff026 commit 5a36ed2
Showing 1 changed file with 35 additions and 42 deletions.
77 changes: 35 additions & 42 deletions core/mappings/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from django.db.models import Q, F, Max
from pydash import get

from core.common.constants import NAMESPACE_REGEX, LATEST
from core.common.constants import NAMESPACE_REGEX, LATEST, HEAD
from core.common.mixins import SourceChildMixin
from core.common.models import VersionedModel
from core.common.tasks import batch_index_resources
Expand Down Expand Up @@ -333,77 +333,70 @@ def create_initial_version(cls, mapping, **kwargs):
initial_version.save()
return initial_version

def populate_fields_from_relations(self, data):
def populate_fields_from_relations(self, data): # pylint: disable=too-many-locals
from core.concepts.models import Concept
from core.sources.models import Source

to_concept_url = data.get('to_concept_url', None)
from_concept_url = data.get('from_concept_url', None)
to_source_url = data.get('to_source_url', None)
from_source_url = data.get('from_source_url', None)

def get_concept(expression):
concept = Concept.objects.filter(uri=expression).first()
if concept:
return concept
concept = Concept.objects.filter(uri=encode_string(expression, safe='/')).first()
if concept:
return concept
to_concept_code = data.get('to_concept_code')
from_concept_code = data.get('from_concept_code')
from_concept_url = data.get('from_concept_url', '')
to_concept_url = data.get('to_concept_url', '')
from_source_url = data.get('from_source_url', None) or to_parent_uri(from_concept_url)
to_source_url = data.get('to_source_url', None) or to_parent_uri(to_concept_url)

parent_uri = to_parent_uri(expression)
code = expression.replace(parent_uri, '').replace('concepts/', '').split('/')[0]
return {'mnemonic': code}
def get_concept(expr):
concept = Concept.objects.filter(
uri=expr).first() or Concept.objects.filter(uri=encode_string(expr, safe='/')).first()

def get_source_info(parent_uri, child_uri, existing_version, concept):
if not parent_uri and not child_uri:
return existing_version, get(concept, 'parent.uri')
return concept or {'mnemonic': expr.replace(to_parent_uri(expr), '').replace('concepts/', '').split('/')[0]}

if parent_uri:
version, uri = separate_version(parent_uri)
else:
version, uri = separate_version(to_parent_uri(child_uri))
def get_source(url):
source = Source.resolve_reference_expression(url, None, HEAD)
if source.id:
return source, source.versioned_object_url or source.resolution_url or url
return None, source.resolution_url or url

return version or existing_version, uri or get(concept, 'parent.uri')
self.from_source, self.from_source_url = get_source(from_source_url)
self.to_source, self.to_source_url = get_source(to_source_url)

to_concept_code = data.get('to_concept_code')
from_concept_code = data.get('from_concept_code')
if to_concept_code and not is_url_encoded_string(to_concept_code):
to_concept_code = encode_string(to_concept_code)
if from_concept_code and not is_url_encoded_string(from_concept_code):
from_concept_code = encode_string(from_concept_code)

if not to_concept_url and not get(self, 'to_concept') and to_source_url and (
if not to_concept_url and not get(self, 'to_concept') and self.to_source_url and (
to_concept_code or self.to_concept_code):
to_concept_code = to_concept_code or self.to_concept_code
to_concept_url = to_source_url + 'concepts/' + to_concept_code + '/'
to_concept_url = self.to_source_url + 'concepts/' + to_concept_code + '/'

if not from_concept_url and not get(self, 'from_concept') and from_source_url and (
if not from_concept_url and not get(self, 'from_concept') and self.from_source_url and (
from_concept_code or self.from_concept_code):
from_concept_code = from_concept_code or self.from_concept_code
from_concept_url = from_source_url + 'concepts/' + from_concept_code + '/'
from_concept_url = self.from_source_url + 'concepts/' + from_concept_code + '/'

from_concept = get_concept(from_concept_url) if from_concept_url else get(self, 'from_concept')
to_concept = get_concept(to_concept_url) if to_concept_url else get(self, 'to_concept')

self.from_concept_id = get(from_concept, 'id')
self.to_concept_id = get(to_concept, 'id')

self.from_source = self.from_source or get(from_concept, 'parent')
self.to_source = self.to_source or get(to_concept, 'parent')
self.to_source_url = self.to_source_url or get(to_concept, 'parent.uri')
self.from_source_url = self.from_source_url or get(from_concept, 'parent.uri')
self.from_concept_code = data.get(
'from_concept_code', None) or get(from_concept, 'mnemonic') or self.from_concept_code
self.from_concept_name = data.get('from_concept_name', None) or self.from_concept_name
self.to_concept_code = data.get('to_concept_code', None) or get(to_concept, 'mnemonic') or self.to_concept_code
self.to_concept_code = data.get(
'to_concept_code', None) or get(to_concept, 'mnemonic') or self.to_concept_code
self.to_concept_name = data.get('to_concept_name', None) or self.to_concept_name

self.from_source_version, self.from_source_url = get_source_info(
from_source_url, from_concept_url, self.from_source_version, from_concept
)
self.to_source_version, self.to_source_url = get_source_info(
to_source_url, to_concept_url, self.to_source_version, to_concept
)
if self.to_source_url:
self.to_source = Source.get_first_or_head(self.to_source_url)
to_source_version, to_source_url = separate_version(self.to_source_url)
self.to_source_version = self.to_source_version or to_source_version
self.to_source_url = to_source_url
if self.from_source_url:
self.from_source = Source.get_first_or_head(self.from_source_url)
from_source_version, from_source_url = separate_version(self.from_source_url)
self.from_source_version = self.from_source_version or from_source_version
self.from_source_url = from_source_url

def is_existing_in_parent(self):
return self.parent.mappings_set.filter(mnemonic__exact=self.mnemonic).exists()
Expand Down

0 comments on commit 5a36ed2

Please sign in to comment.