Skip to content

Commit

Permalink
test if empty values in required base variables
Browse files Browse the repository at this point in the history
  • Loading branch information
MaryKilewe committed Dec 9, 2024
1 parent 9755c05 commit 2707e8d
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 0 deletions.
19 changes: 19 additions & 0 deletions models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,25 @@ def save(self):
super().save()


class USLDataErrorLogs(Model):
__keyspace__ = 'datamap'
__table_name__ = 'usl_data_error_logs'

id = columns.UUID(primary_key=True, default=uuid.uuid1)
usl_repository_name = columns.Text(required=True)
base_variable = columns.Text(required=True)
issue = columns.Text(required=True)
recommended_solution = columns.Text(required=True)
source_system_id = columns.UUID(required=True)
source_system_name = columns.Text(required=True)
created_at = columns.DateTime(required=True, default=datetime.utcnow())
is_latest = columns.Boolean(default=False)

def save(self):
self.started_at = datetime.utcnow()
super().save()


class DataDictionaries(Model):
__keyspace__ = 'datamap'
__table_name__ = 'data_dictionaries'
Expand Down
57 changes: 57 additions & 0 deletions routes/dictionary_mapper_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,63 @@ async def add_mapped_variables(baselookup:str, variables:List[object]):
return {"status":500, "message":e}


@router.post('/test/mapped_variables/{baselookup}')
async def test_mapped_variables(baselookup:str, variables:List[object], db_session: Session = Depends(get_db)):
try:
extractQuery = text(generate_query(baselookup))

cass_session = database.cassandra_session_factory()

with db_session as session:
result = session.execute(extractQuery)

columns = result.keys()
baseRepoLoaded = [dict(zip(columns, row)) for row in result]

# processed_results = [convert_datetime_to_iso(convert_none_to_null(result)) for result in baseRepoLoaded]
processed_results = [result for result in baseRepoLoaded]

# extractedValues = []
# for data in processed_results:
# extractedValues = [for key, value in data.items()]

# quoted_values = [
# 'NULL' if value is None
# else f"'{value}'" if isinstance(value, str)
# else f"'{value.strftime('%Y-%m-%d')}'" if isinstance(value, datetime.date) # Convert date to string
# else f"'{value}'" if (DataDictionaryTerms.objects.filter(dictionary=baselookup,
# term=key).allow_filtering().first()[
# "is_required"] == True)
# else str(value)
# for key, value in data.items()
# ]

list_of_issues =[]
for variableSet in variables:
# filteredData = [{variableSet["base_variable_mapped_to"]:obj[variableSet["base_variable_mapped_to"]]} for obj in processed_results]
if variableSet["base_variable_mapped_to"] !="PrimaryTableId":
filteredData = [obj[variableSet["base_variable_mapped_to"]] for obj in processed_results]

dictTerms = DataDictionaryTerms.objects.filter(dictionary=baselookup,
term=variableSet["base_variable_mapped_to"]).allow_filtering().first()

if dictTerms["is_required"] == True:
# mandatoryValuesFailed = [item for item in filteredData if item =="" or item ==None or item =="NULL"]
if "" in filteredData or None in filteredData or "NULL" in filteredData:
issueObj = {"base_variable":variableSet["base_variable_mapped_to"],
"issue":"*Variable is Mandatory. Data is expected in all records.",
"column_mapped":variableSet["columnname"],
"recommended_solution":"Ensure all records have this data"}
list_of_issues.append(issueObj)
# DataDictionaryTerms.objects.filter(dictionary=baselookup, term=variableSet["base_variable_mapped_to"]).allow_filtering().first()
# MappedVariables.create(tablename=variableSet["tablename"],columnname=variableSet["columnname"],
# datatype=variableSet["datatype"], base_repository=variableSet["base_repository"],
# base_variable_mapped_to=variableSet["base_variable_mapped_to"], join_by=variableSet["join_by"])
print("list_of_issues ",list_of_issues)
return {"data":list_of_issues}
except Exception as e:
return {"status":500, "message":e}

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.

@router.get('/generate_config/{baselookup}')
async def generate_config(baselookup:str):
try:
Expand Down

0 comments on commit 2707e8d

Please sign in to comment.