From 2f6572538bf5f76c00ce23c814db731858c81382 Mon Sep 17 00:00:00 2001 From: John Kwening Date: Sun, 24 Sep 2017 21:20:18 -0400 Subject: [PATCH 1/2] added missing self.debug line --- python/housinginsights/ingestion/LoadData.py | 78 ++++++++------------ 1 file changed, 29 insertions(+), 49 deletions(-) diff --git a/python/housinginsights/ingestion/LoadData.py b/python/housinginsights/ingestion/LoadData.py index e3f41fca..dc17b108 100644 --- a/python/housinginsights/ingestion/LoadData.py +++ b/python/housinginsights/ingestion/LoadData.py @@ -58,11 +58,12 @@ from housinginsights.tools.logger import HILogger logger = HILogger(name=__file__, logfile="ingestion.log") + class LoadData(object): def __init__(self, database_choice=None, meta_path=None, manifest_path=None, keep_temp_files=True, - drop_tables=False,debug=False): + drop_tables=False, debug=False): """ Initializes the class with optional arguments. The default behaviour is to load the local database with data tracked from meta.json @@ -82,6 +83,7 @@ def __init__(self, database_choice=None, meta_path=None, self.database_choice = 'docker_database' else: self.database_choice = database_choice + if meta_path is None: meta_path = os.path.abspath(os.path.join(_scripts_path, 'meta.json')) @@ -90,13 +92,10 @@ def __init__(self, database_choice=None, meta_path=None, 'manifest.csv')) self._keep_temp_files = keep_temp_files - - # load given meta.json and manifest.csv files into memory self.meta = ingestionfunctions.load_meta_data(meta_path) self.manifest = Manifest(manifest_path) - # setup engine for database_choice self.engine = dbtools.get_database_engine(self.database_choice) @@ -106,8 +105,7 @@ def __init__(self, database_choice=None, meta_path=None, self._failed_table_count = 0 self.drop_tables = drop_tables - self.debug=debug - + self.debug = debug def _drop_tables(self): """ @@ -131,13 +129,13 @@ def _drop_tables(self): return query_result def remove_tables(self, tables): - ''' + """ Used when you want to update only part of the database. Drops the table and deletes all associated rows in the sql manifest. Run this before updating data in tables that have added or removed columns. tables: a list of table names to be deleted - ''' + """ if 'all' in tables: self._drop_tables() logger.info('Dropped all tables from database') @@ -164,10 +162,9 @@ def remove_tables(self, tables): except ProgrammingError as e: logger.error("Couldn't remove table {}".format(table)) - if self.debug == True: + if self.debug: raise e - def _meta_json_to_database(self): """ Makes sure we have a meta table in the database. @@ -377,16 +374,18 @@ def rebuild(self): len(self.manifest))) self._process_data_file(manifest_row=manifest_row) processed_data_ids.append(manifest_row['unique_data_id']) - except: - logger.exception("Unable to process {}".format(manifest_row['unique_data_id'])) + except Exception as e: + logger.exception("Unable to process {}".format(manifest_row['unique_data_id'])) + if self.debug: + raise e return processed_data_ids def recalculate_database(self): - ''' + """ An alternative to 'rebuild' and 'update_database' methods - if no new data has been added but changes to the calculations are made, re-run the calculation routines. - ''' + """ try: #self._automap() #not yet used - created for potential use by calculated fields self._create_zone_facts_table() @@ -399,11 +398,11 @@ def recalculate_database(self): return None def _automap(self): - ''' + """ Adding this in case it is useful for the update scripts for _populate_calculated_project_fields Did not end up using it for REAC score, but leaving it in case it is useful for future. Mimics automap method used in api - ''' + """ from sqlalchemy.ext.automap import automap_base Base = automap_base() @@ -423,10 +422,10 @@ def _automap(self): #self._WmataInfo = Base.classes.wmata_info def _populate_calculated_project_fields(self): - ''' + """ Adds values for calculated fields to the project table Assumes the columns have already been created due to meta.json - ''' + """ conn = self.engine.connect() @@ -659,7 +658,6 @@ def _create_zone_facts_table(self): if self.debug: raise e - def _populate_zone_facts_table(self,res_units_by_zone_type): """ Populates the zone_facts table with the calculated fields data and @@ -1251,9 +1249,10 @@ def _load_single_file(self, table_name, manifest_row, csv_reader, # TODO - once cleaning is done, write row to psv file # TODO - consider using queue: once empty update db with psv data total_rows = len(csv_reader) - for idx, data_row in enumerate(csv_reader): + for idx, data_row in enumerate(csv_reader, 1): if idx % 100 == 0: - print(" on row ~{} of {}".format(idx,total_rows), end='\r', flush=True) + print(" on row ~{} of {}".format( + idx, total_rows), end='\r', flush=True) try: data_row.update(meta_only_fields) # insert other field dict @@ -1261,36 +1260,17 @@ def _load_single_file(self, table_name, manifest_row, csv_reader, if clean_data_row is not None: csv_writer.write(clean_data_row) except Exception as e: - logger.error("Error when trying to clean row index {} from the manifest_row {}".format(idx,manifest_row)) - if self.debug == True: + logger.error("Error when trying to clean row index {} from" + " the manifest_row {}".format(idx, + manifest_row)) + if self.debug: raise e - - # with concurrent.futures.ThreadPoolExecutor( - # max_workers=100) as executor: - # future_data = {executor.submit( - # self._clean_data, idx, data_row, cleaner, table_name, - # csv_reader.keys): ( - # idx, data_row) for idx, data_row in enumerate(csv_reader)} - # for future in concurrent.futures.as_completed(future_data): - # clean_data_row = future.result() - # if clean_data_row is not None: - # csv_writer.write(clean_data_row) - # - # csv_writer.close() - # - # # write the data to the database - # self._update_database(sql_interface=sql_interface) - # - # if not self._keep_temp_files: - # csv_writer.remove_file() - # end_time = time() - # print("\nRun time= %s" % (end_time - start_time)) - csv_writer.close() # write the data to the database - self._update_database(sql_interface=sql_interface, manifest_row = manifest_row) + self._update_database(sql_interface=sql_interface, + manifest_row=manifest_row) if not self._keep_temp_files: csv_writer.remove_file() @@ -1340,7 +1320,7 @@ def _update_database(self, sql_interface, manifest_row): # TODO Need to figure out how to revert the removal if loading doesn't work?? self._remove_existing_data(manifest_row=manifest_row) - self._remove_table_if_empty(manifest_row = manifest_row) + self._remove_table_if_empty(manifest_row=manifest_row) # create table if it doesn't exist sql_interface.create_table_if_necessary() @@ -1400,7 +1380,7 @@ def main(passed_arguments): manifest_path=manifest_path, keep_temp_files=keep_temp_files, drop_tables=drop_tables, - debug = passed_arguments.debug) + debug=passed_arguments.debug) #Remove tables before starting ingestion process if passed_arguments.remove_tables: @@ -1414,7 +1394,7 @@ def main(passed_arguments): else: loader.rebuild() - if passed_arguments.skip_calculations==False: + if passed_arguments.skip_calculations is False: loader.recalculate_database() else: logger.info("Skipping recalculation of calculated database fields") From 7657eed0b4122dcdc74bc972b850e1f57dad9028 Mon Sep 17 00:00:00 2001 From: John Kwening Date: Thu, 28 Sep 2017 00:00:19 -0400 Subject: [PATCH 2/2] added method to better handle and log 503 errors from mar api calls --- python/housinginsights/ingestion/Cleaners.py | 25 +++-- python/housinginsights/sources/mar.py | 111 ++++++++++++++++--- 2 files changed, 110 insertions(+), 26 deletions(-) diff --git a/python/housinginsights/ingestion/Cleaners.py b/python/housinginsights/ingestion/Cleaners.py index f1f7c59a..502043e6 100644 --- a/python/housinginsights/ingestion/Cleaners.py +++ b/python/housinginsights/ingestion/Cleaners.py @@ -4,7 +4,7 @@ values. """ -from abc import ABCMeta, abstractclassmethod, abstractmethod +from abc import ABCMeta, abstractmethod from datetime import datetime import dateutil.parser as dateparser import os @@ -63,7 +63,6 @@ def clean(self, row, row_num): # TODO: add replace_null method as required for an implementation (#176) pass - def add_proj_addre_lookup_from_mar(self): """ Adds an in-memory lookup table of the contents of the current @@ -85,7 +84,6 @@ def add_ssl_nlihc_lookup(self): result = proxy.fetchall() self.ssl_nlihc_lookup = {d[0]:d[1] for d in result} - def get_nlihc_id_if_exists(self, mar_ids_string, ssl=None): "Checks for record in project table with matching MAR id." @@ -110,7 +108,6 @@ def get_nlihc_id_if_exists(self, mar_ids_string, ssl=None): #If we don't find a match return self.null_value - # TODO: figure out what is the point of this method...it looks incomplete def field_meta(self, field): for field_meta in self.fields: @@ -300,6 +297,10 @@ def add_mar_id(self, row, address_id_col_name='ADDRESS_ID'): #TODO this is odd, find out why we have both ADDRESS_ID column and mar_id column + # The reason was because there were instances where proj_address_id + # were not valid mar_id values (JKwening) - if prescat has cleaned + # this up then this is no longer an issue; if not we should consider + # reverting back to original code if proj_address_id != self.null_value: row['mar_id'] = proj_address_id return row @@ -325,6 +326,7 @@ def add_mar_id(self, row, address_id_col_name='ADDRESS_ID'): elif address != self.null_value: # check whether address is valid - it has street number + # TODO - use address processing code built for prescat_addre table try: str_num = address.split(' ')[0] int(str_num) @@ -336,9 +338,9 @@ def add_mar_id(self, row, address_id_col_name='ADDRESS_ID'): if result: #Handle case of mar_api returning something but it not being an address - if (result['returnDataset'] == None or - result['returnDataset'] == {} or - result['sourceOperation'] == 'DC Intersection'): + if (result['returnDataset'] is None or + result['returnDataset'] == {} or + result['sourceOperation'] == 'DC Intersection'): result = None @@ -386,7 +388,6 @@ def add_geocode_from_mar(self, row): latitude = row['Proj_lat'] longitude = row['Proj_lon'] - # only do mar api lookup if we have a null geocode value if self.null_value in [ward, neighbor_cluster, neighborhood_cluster_desc, zipcode, anc, @@ -400,7 +401,7 @@ def add_geocode_from_mar(self, row): except KeyError: return row - #if there were no null values in the geocodable fields + # if there were no null values in the geocodable fields else: return row @@ -546,7 +547,6 @@ def add_mar_tract_lookup(self): result = proxy.fetchall() self.mar_tract_lookup = {d[0]:d[1] for d in result} - def add_census_tract_from_mar(self, row, column_name='mar_id', lat_lon_col_names=('LATITUDE', 'LONGITUDE'), x_y_coords_col_names=('X', 'Y'), @@ -667,6 +667,7 @@ def clean(self, row, row_num=None): row = self.replace_nulls(row, null_values=['N', 'NA', '', None]) return row + class ProjectCleaner(CleanerBase): def clean(self, row, row_num=None): row = self.replace_nulls(row, null_values=['N', '', None]) @@ -854,11 +855,13 @@ def clean(self,row,row_num=None): return row + class ProjectAddressCleaner(CleanerBase): def clean(self,row,row_num=None): row = self.replace_nulls(row) return row - + + class ZillowCleaner(CleanerBase): """ Incomplete Cleaner - adding data to the code so we have it when needed (was doing analysis on this) diff --git a/python/housinginsights/sources/mar.py b/python/housinginsights/sources/mar.py index 10869cf5..c3aefd11 100644 --- a/python/housinginsights/sources/mar.py +++ b/python/housinginsights/sources/mar.py @@ -1,5 +1,6 @@ from pprint import pprint +from time import sleep from housinginsights.sources.base import BaseApiConn from housinginsights.sources.models.mar import MarResult, FIELDS @@ -16,11 +17,68 @@ class MarApiConn(BaseApiConn): BASEURL = 'http://citizenatlas.dc.gov/newwebservices/locationverifier.asmx' - def __init__(self,baseurl=None,proxies=None,database_choice=None, debug=False): + def __init__(self, baseurl=None, proxies=None, + database_choice=None, debug=False): super().__init__(baseurl=MarApiConn.BASEURL) + self.error_counter = 0 + self.SLEEP_BASE_INT = 0.05 # default value for sleep interval in secs + self.MAX_RETRY = 3 # controls max number retries until exception thrown - - def find_addr_string(self, address,output_type=None, output_file=None): + def _handle_status_code_error(self, status_code, retry_secs, lookup_method, + *args): + """ + Helper function for error code handling - currently only used for 503 + status error codes. Could be easily expanded for others if we run + into more. + + If MAX_RETRY is exceeded, the error logged and an exception is thrown. + + :param status_code: passed status code that was flagged in response + :param retry_secs: try to capture indicated recommended server wait + time in seconds + :param lookup_method: string representation of the method that was + called that lead to the response error code thrown + :param args: the necessary args needed to retry the method again + """ + self.error_counter += 1 + err = "{0} :An error occurred during request: status {1} - " \ + "method called {2}" + + # if exceed MAX_RETRY or not 503 code - log error and throw exception + if self.error_counter > self.MAX_RETRY or status_code != 503: + self.error_counter = 0 # reset counter + err.format('FAIL', status_code, lookup_method) + logger.exception(err) + raise Exception(err) + else: + err.format('TRYING AGAIN', status_code, lookup_method) + logger.exception(err) + + # sleep for a moment + if retry_secs is None: + sleep(self.SLEEP_BASE_INT * self.error_counter) + else: + sleep(retry_secs) # based on server provided period of time + + # retry previously called method again + if lookup_method == 'find_location': + location, output_type, output_file = args + self.find_location(location, output_type, output_file) + elif lookup_method == 'reverse_geocode': + xcoord, ycoord, output_type, output_file = args + self.reverse_geocode(xcoord, ycoord, output_type, output_file) + elif lookup_method == 'find_addr_string': + address, output_type, output_file = args + self.find_addr_string(address, output_type, output_file) + elif lookup_method == 'reverse_lat_lng_geocode': + latitude, longitude, output_type, output_file = args + self.reverse_lat_lng_geocode(latitude, longitude, output_type, + output_file) + elif lookup_method == 'reverse_address_id': + aid, output_type, output_file = args + self.reverse_address_id(aid, output_type, output_file) + + def find_addr_string(self, address, output_type=None, output_file=None): """ Get information about an address by using a complete address string @@ -43,9 +101,13 @@ def find_addr_string(self, address,output_type=None, output_file=None): result = self.get('/verifyDCAddressThrouString2', params=params) if result.status_code != 200: - err = "An error occurred during request: status {0}" - raise Exception(err.format(result.status_code)) - + self._handle_status_code_error(result.status_code, + result.headers['retry-after'], + 'find_addr_string', address, + output_type, output_file) + + # reset counter + self.error_counter = 0 return result.json() def find_location(self, location, output_type=None, @@ -76,15 +138,19 @@ def find_location(self, location, output_type=None, } result = self.get('/findLocation2', params=params) if result.status_code != 200: - err = "An error occurred during request: status {0}" - logger.exception(err.format(result.status_code)) - raise Exception(err.format(result.status_code)) + self._handle_status_code_error(result.status_code, + result.headers['retry-after'], + 'find_location', location, + output_type, output_file) if output_type == 'stdout': pprint(result.json()) elif output_type == 'csv': data = result.json()['returnDataset']['Table1'] results = [MarResult(address).data for address in data] self.result_to_csv(FIELDS, results, output_file) + + # reset counter + self.error_counter = 0 return result.json() def reverse_geocode(self, xcoord, ycoord, output_type=None, @@ -116,14 +182,19 @@ def reverse_geocode(self, xcoord, ycoord, output_type=None, } result = self.get('/reverseGeocoding2', params=params) if result.status_code != 200: - err = "An error occurred during request: status {0}" - raise Exception(err.format(result.status_code)) + self._handle_status_code_error(result.status_code, + result.headers['retry-after'], + 'reverse_geocode', xcoord, ycoord, + output_type, output_file) if output_type == 'stdout': pprint(result.json()) elif output_type == 'csv': data = result.json()['Table1'] results = [MarResult(address) for address in data] self.result_to_csv(FIELDS, results, output_file) + + # reset counter + self.error_counter = 0 return result.json() def get_condo_count(self, location, output_type=None, @@ -219,14 +290,19 @@ def reverse_lat_lng_geocode(self, latitude, longitude, output_type=None, } result = self.get('/reverseLatLngGeocoding2', params=params) if result.status_code != 200: - err = "An error occurred during request: status {0}" - raise Exception(err.format(result.status_code)) + self._handle_status_code_error(result.status_code, + result.headers['retry-after'], + 'reverse_lat_lng_geocode', latitude, + longitude, output_type, output_file) if output_type == 'stdout': pprint(result.json()) elif output_type == 'csv': data = result.json()['Table1'] results = [MarResult(address) for address in data] self.result_to_csv(FIELDS, results, output_file) + + # reset counter + self.error_counter = 0 return result.json() def reverse_address_id(self, aid, output_type=None, output_file=None): @@ -253,12 +329,17 @@ def reverse_address_id(self, aid, output_type=None, output_file=None): } result = self.get('/findAID2', params=params) if result.status_code != 200: - err = "An error occurred during request: status {0}" - raise Exception(err.format(result.status_code)) + self._handle_status_code_error(result.status_code, + result.headers['retry-after'], + 'reverse_address_id', aid, + output_type, output_file) if output_type == 'stdout': pprint(result.json()) elif output_type == 'csv': data = result.json()['Table1'] results = [MarResult(address) for address in data] self.result_to_csv(FIELDS, results, output_file) + + # reset counter + self.error_counter = 0 return result.json()