Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

524 add row skipping & 479 add "503" error handling #590

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions python/housinginsights/ingestion/Cleaners.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
values.
"""

from abc import ABCMeta, abstractclassmethod, abstractmethod
from abc import ABCMeta, abstractmethod
from datetime import datetime
import dateutil.parser as dateparser
import os
Expand Down Expand Up @@ -63,7 +63,6 @@ def clean(self, row, row_num):
# TODO: add replace_null method as required for an implementation (#176)
pass


def add_proj_addre_lookup_from_mar(self):
"""
Adds an in-memory lookup table of the contents of the current
Expand All @@ -85,7 +84,6 @@ def add_ssl_nlihc_lookup(self):
result = proxy.fetchall()
self.ssl_nlihc_lookup = {d[0]:d[1] for d in result}


def get_nlihc_id_if_exists(self, mar_ids_string, ssl=None):
"Checks for record in project table with matching MAR id."

Expand All @@ -110,7 +108,6 @@ def get_nlihc_id_if_exists(self, mar_ids_string, ssl=None):
#If we don't find a match
return self.null_value


# TODO: figure out what is the point of this method...it looks incomplete
def field_meta(self, field):
for field_meta in self.fields:
Expand Down Expand Up @@ -300,6 +297,10 @@ def add_mar_id(self, row, address_id_col_name='ADDRESS_ID'):


#TODO this is odd, find out why we have both ADDRESS_ID column and mar_id column
# The reason was because there were instances where proj_address_id
# were not valid mar_id values (JKwening) - if prescat has cleaned
# this up then this is no longer an issue; if not we should consider
# reverting back to original code
if proj_address_id != self.null_value:
row['mar_id'] = proj_address_id
return row
Expand All @@ -325,6 +326,7 @@ def add_mar_id(self, row, address_id_col_name='ADDRESS_ID'):

elif address != self.null_value:
# check whether address is valid - it has street number
# TODO - use address processing code built for prescat_addre table
try:
str_num = address.split(' ')[0]
int(str_num)
Expand All @@ -336,9 +338,9 @@ def add_mar_id(self, row, address_id_col_name='ADDRESS_ID'):

if result:
#Handle case of mar_api returning something but it not being an address
if (result['returnDataset'] == None or
result['returnDataset'] == {} or
result['sourceOperation'] == 'DC Intersection'):
if (result['returnDataset'] is None or
result['returnDataset'] == {} or
result['sourceOperation'] == 'DC Intersection'):

result = None

Expand Down Expand Up @@ -386,7 +388,6 @@ def add_geocode_from_mar(self, row):
latitude = row['Proj_lat']
longitude = row['Proj_lon']


# only do mar api lookup if we have a null geocode value
if self.null_value in [ward, neighbor_cluster,
neighborhood_cluster_desc, zipcode, anc,
Expand All @@ -400,7 +401,7 @@ def add_geocode_from_mar(self, row):
except KeyError:
return row

#if there were no null values in the geocodable fields
# if there were no null values in the geocodable fields
else:
return row

Expand Down Expand Up @@ -546,7 +547,6 @@ def add_mar_tract_lookup(self):
result = proxy.fetchall()
self.mar_tract_lookup = {d[0]:d[1] for d in result}


def add_census_tract_from_mar(self, row, column_name='mar_id',
lat_lon_col_names=('LATITUDE', 'LONGITUDE'),
x_y_coords_col_names=('X', 'Y'),
Expand Down Expand Up @@ -667,6 +667,7 @@ def clean(self, row, row_num=None):
row = self.replace_nulls(row, null_values=['N', 'NA', '', None])
return row


class ProjectCleaner(CleanerBase):
def clean(self, row, row_num=None):
row = self.replace_nulls(row, null_values=['N', '', None])
Expand Down Expand Up @@ -854,11 +855,13 @@ def clean(self,row,row_num=None):

return row


class ProjectAddressCleaner(CleanerBase):
def clean(self,row,row_num=None):
row = self.replace_nulls(row)
return row



class ZillowCleaner(CleanerBase):
"""
Incomplete Cleaner - adding data to the code so we have it when needed (was doing analysis on this)
Expand Down
78 changes: 29 additions & 49 deletions python/housinginsights/ingestion/LoadData.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@
from housinginsights.tools.logger import HILogger
logger = HILogger(name=__file__, logfile="ingestion.log")


class LoadData(object):

def __init__(self, database_choice=None, meta_path=None,
manifest_path=None, keep_temp_files=True,
drop_tables=False,debug=False):
drop_tables=False, debug=False):
"""
Initializes the class with optional arguments. The default behaviour
is to load the local database with data tracked from meta.json
Expand All @@ -82,6 +83,7 @@ def __init__(self, database_choice=None, meta_path=None,
self.database_choice = 'docker_database'
else:
self.database_choice = database_choice

if meta_path is None:
meta_path = os.path.abspath(os.path.join(_scripts_path,
'meta.json'))
Expand All @@ -90,13 +92,10 @@ def __init__(self, database_choice=None, meta_path=None,
'manifest.csv'))
self._keep_temp_files = keep_temp_files



# load given meta.json and manifest.csv files into memory
self.meta = ingestionfunctions.load_meta_data(meta_path)
self.manifest = Manifest(manifest_path)


# setup engine for database_choice
self.engine = dbtools.get_database_engine(self.database_choice)

Expand All @@ -106,8 +105,7 @@ def __init__(self, database_choice=None, meta_path=None,
self._failed_table_count = 0

self.drop_tables = drop_tables
self.debug=debug

self.debug = debug

def _drop_tables(self):
"""
Expand All @@ -131,13 +129,13 @@ def _drop_tables(self):
return query_result

def remove_tables(self, tables):
'''
"""
Used when you want to update only part of the database. Drops the table
and deletes all associated rows in the sql manifest. Run this before
updating data in tables that have added or removed columns.

tables: a list of table names to be deleted
'''
"""
if 'all' in tables:
self._drop_tables()
logger.info('Dropped all tables from database')
Expand All @@ -164,10 +162,9 @@ def remove_tables(self, tables):

except ProgrammingError as e:
logger.error("Couldn't remove table {}".format(table))
if self.debug == True:
if self.debug:
raise e


def _meta_json_to_database(self):
"""
Makes sure we have a meta table in the database.
Expand Down Expand Up @@ -377,16 +374,18 @@ def rebuild(self):
len(self.manifest)))
self._process_data_file(manifest_row=manifest_row)
processed_data_ids.append(manifest_row['unique_data_id'])
except:
logger.exception("Unable to process {}".format(manifest_row['unique_data_id']))
except Exception as e:
logger.exception("Unable to process {}".format(manifest_row['unique_data_id']))
if self.debug:
raise e

return processed_data_ids

def recalculate_database(self):
'''
"""
An alternative to 'rebuild' and 'update_database' methods - if no new data has been added but
changes to the calculations are made, re-run the calculation routines.
'''
"""
try:
#self._automap() #not yet used - created for potential use by calculated fields
self._create_zone_facts_table()
Expand All @@ -399,11 +398,11 @@ def recalculate_database(self):
return None

def _automap(self):
'''
"""
Adding this in case it is useful for the update scripts for _populate_calculated_project_fields
Did not end up using it for REAC score, but leaving it in case it is useful for future.
Mimics automap method used in api
'''
"""
from sqlalchemy.ext.automap import automap_base

Base = automap_base()
Expand All @@ -423,10 +422,10 @@ def _automap(self):
#self._WmataInfo = Base.classes.wmata_info

def _populate_calculated_project_fields(self):
'''
"""
Adds values for calculated fields to the project table
Assumes the columns have already been created due to meta.json
'''
"""
conn = self.engine.connect()


Expand Down Expand Up @@ -659,7 +658,6 @@ def _create_zone_facts_table(self):
if self.debug:
raise e


def _populate_zone_facts_table(self,res_units_by_zone_type):
"""
Populates the zone_facts table with the calculated fields data and
Expand Down Expand Up @@ -1251,46 +1249,28 @@ def _load_single_file(self, table_name, manifest_row, csv_reader,
# TODO - once cleaning is done, write row to psv file
# TODO - consider using queue: once empty update db with psv data
total_rows = len(csv_reader)
for idx, data_row in enumerate(csv_reader):
for idx, data_row in enumerate(csv_reader, 1):
if idx % 100 == 0:
print(" on row ~{} of {}".format(idx,total_rows), end='\r', flush=True)
print(" on row ~{} of {}".format(
idx, total_rows), end='\r', flush=True)

try:
data_row.update(meta_only_fields) # insert other field dict
clean_data_row = cleaner.clean(data_row, idx)
if clean_data_row is not None:
csv_writer.write(clean_data_row)
except Exception as e:
logger.error("Error when trying to clean row index {} from the manifest_row {}".format(idx,manifest_row))
if self.debug == True:
logger.error("Error when trying to clean row index {} from"
" the manifest_row {}".format(idx,
manifest_row))
if self.debug:
raise e


# with concurrent.futures.ThreadPoolExecutor(
# max_workers=100) as executor:
# future_data = {executor.submit(
# self._clean_data, idx, data_row, cleaner, table_name,
# csv_reader.keys): (
# idx, data_row) for idx, data_row in enumerate(csv_reader)}
# for future in concurrent.futures.as_completed(future_data):
# clean_data_row = future.result()
# if clean_data_row is not None:
# csv_writer.write(clean_data_row)
#
# csv_writer.close()
#
# # write the data to the database
# self._update_database(sql_interface=sql_interface)
#
# if not self._keep_temp_files:
# csv_writer.remove_file()
# end_time = time()
# print("\nRun time= %s" % (end_time - start_time))

csv_writer.close()

# write the data to the database
self._update_database(sql_interface=sql_interface, manifest_row = manifest_row)
self._update_database(sql_interface=sql_interface,
manifest_row=manifest_row)

if not self._keep_temp_files:
csv_writer.remove_file()
Expand Down Expand Up @@ -1340,7 +1320,7 @@ def _update_database(self, sql_interface, manifest_row):

# TODO Need to figure out how to revert the removal if loading doesn't work??
self._remove_existing_data(manifest_row=manifest_row)
self._remove_table_if_empty(manifest_row = manifest_row)
self._remove_table_if_empty(manifest_row=manifest_row)

# create table if it doesn't exist
sql_interface.create_table_if_necessary()
Expand Down Expand Up @@ -1400,7 +1380,7 @@ def main(passed_arguments):
manifest_path=manifest_path,
keep_temp_files=keep_temp_files,
drop_tables=drop_tables,
debug = passed_arguments.debug)
debug=passed_arguments.debug)

#Remove tables before starting ingestion process
if passed_arguments.remove_tables:
Expand All @@ -1414,7 +1394,7 @@ def main(passed_arguments):
else:
loader.rebuild()

if passed_arguments.skip_calculations==False:
if passed_arguments.skip_calculations is False:
loader.recalculate_database()
else:
logger.info("Skipping recalculation of calculated database fields")
Expand Down
Loading