From 82161330bbad292af86de107b6bf77a7f185627c Mon Sep 17 00:00:00 2001 From: James McManus Date: Wed, 16 Feb 2022 15:11:14 -0500 Subject: [PATCH] Revised documentation --- createHarvestFileMeta.py | 35 +++++++++++---- createIngestData.py | 8 ++-- createIngestSourceMeta.py | 15 ++++--- createIngestStationMeta.py | 68 +++++++++++++++++++++------- ingestData.py | 90 ++++++++++++++++++++++++++------------ 5 files changed, 158 insertions(+), 58 deletions(-) diff --git a/createHarvestFileMeta.py b/createHarvestFileMeta.py index 69b8e57..0638613 100644 --- a/createHarvestFileMeta.py +++ b/createHarvestFileMeta.py @@ -2,12 +2,11 @@ # coding: utf-8 # Import python modules -import argparse, glob, sys, os, datetime, psycopg2, pdb +import argparse, glob, sys, os, datetime, psycopg2 import pandas as pd -from psycopg2.extensions import AsIs from loguru import logger -# This function queries the harvest_gauge_data_file table using a file_name, an pulls out the +# This function queries the drf_harvest_data_file_meta table using a file_name, an pulls out the # file_name, and file_date_time, if the file_name exists in the table. def getFileDateTime(inputFile): try: @@ -34,19 +33,23 @@ def getFileDateTime(inputFile): cur.close() conn.close() + # Return DataFrame return(df) # If exception print error except (Exception, psycopg2.DatabaseError) as error: print(error) -# This function takes a input a directory path and outputFile, and used them to read the input file -# and add station_id(s) that are extracted from the apsviz_gauges database. +# This function takes an input directory path and input dataset, and uses them to create a file list +# that is ingested into the drf_harvest_data_file_meta table, and used to ingest the data files. def createFileList(inputDir, inputDataset): + # Search for files in the inputDir that have inputDataset name in them, and generate a list of files found dirInputFiles = glob.glob(inputDir+inputDataset+"*.csv") - + + # Define dirOutputFiles variable dirOutputFiles = [] - + + # Loop through dirInputFiles list, fine all files that do not have meta in their name, and add them to dirOutputFiles for dirInputFile in dirInputFiles: if dirInputFile.find('meta') == -1: #print(dirInputFile) @@ -54,8 +57,10 @@ def createFileList(inputDir, inputDataset): else: continue + # Define outputList variable outputList = [] + # Loop through dirOutputFiles, generate new variables and add them to outputList for dirOutputFile in dirOutputFiles: dir_path = dirOutputFile.split(inputDataset)[0] file_name = dirOutputFile.split('/')[-1] @@ -63,7 +68,7 @@ def createFileList(inputDir, inputDataset): checkFile = getFileDateTime(file_name) checked_file = checkFile.count() - #pdb.set_trace() + if checked_file['file_date_time'] > 0: version = checkFile['version'][-1]+1 #print('There is another file') @@ -92,11 +97,14 @@ def createFileList(inputDir, inputDataset): outputList.append([dir_path,file_name,data_date_time,data_begin_time,data_end_time,file_date_time,source,content_info,ingested,version,overlap_past_file_date_time]) + # Convert outputList to a DataFrame df = pd.DataFrame(outputList, columns=['dir_path', 'file_name', 'data_date_time', 'data_begin_time', 'data_end_time', 'file_date_time', 'source', 'content_info', 'ingested', 'version', 'overlap_past_file_date_time']) + # Get first time, and last time from the list of files. This will be used in the filename, to enable checking for time overlap in files first_time = df['data_date_time'][0] last_time = df['data_date_time'].iloc[-1] + # Return DataFrame first time, and last time return(df, first_time, last_time) # Main program function takes args as input, which contains the outputDir, and outputFile values. @@ -113,9 +121,17 @@ def main(args): inputDataset = args.inputDataset logger.info('Start processing source data for dataset '+inputDataset+'.') + + # Get DataFrame file list, and time variables by running the createFileList function df, first_time, last_time = createFileList(inputDir, inputDataset) + + # Get current date current_date = datetime.date.today() + + # Create output file name outputFile = 'harvest_files_'+inputDataset+'_'+first_time.strip()+'_'+last_time.strip()+'_'+current_date.strftime("%b-%d-%Y")+'.csv' + + # Write DataFrame containing list of files to a csv file df.to_csv(outputDir+outputFile, index=False) logger.info('Finished processing source data for dataset '+inputDataset+'.') @@ -129,6 +145,9 @@ def main(args): parser.add_argument("--outputDir", action="store", dest="outputDir") parser.add_argument("--inputDataset", action="store", dest="inputDataset") + # Parse input arguments args = parser.parse_args() + + # Run main main(args) diff --git a/createIngestData.py b/createIngestData.py index ad989ae..2d11c77 100644 --- a/createIngestData.py +++ b/createIngestData.py @@ -8,6 +8,8 @@ from psycopg2.extensions import AsIs from loguru import logger +# This function takes a dataset name as input, and uses it to query the drf_harvest_data_file_met table, creating a list +# of filenames. The list is converted to a DataFrame and returned. def getInputFiles(inputDataset): try: # Create connection to database and get cursor @@ -26,7 +28,7 @@ def getInputFiles(inputDataset): ORDER BY data_date_time""", {'source': inputDataset}) - # convert query output to Pandas dataframe + # convert query output to Pandas DataFrame df = pd.DataFrame(cur.fetchall(), columns=['dir_path','file_name']) # Close cursor and database connection @@ -81,7 +83,7 @@ def getObsSourceID(source_archive,station_tuples): print(error) # This function takes as input the data_source (hsofs...), and a list of station_id(s), and returns source_id(s) for -# model data from the gauge_source table in the apsviz_gauges database. This funciton specifically gets source_id(s) for +# model data from the drf_gauge_source table in the apsviz_gauges database. This funciton specifically gets source_id(s) for # model data, such as from ADCIRC. The data_source, such is hsofs, is the grid that is used in the ADCIRC run. def getModelSourceID(data_source,station_tuples): try: @@ -117,7 +119,7 @@ def getModelSourceID(data_source,station_tuples): except (Exception, psycopg2.DatabaseError) as error: print(error) -# This function takes as input a directory input path, directory output path and a filename, and returns a csv file +# This function takes as input a directory input path, directory output path and a filename, and returns a csv # file that containes gauge data. the function uses the getObsSourceID and getModelSourceID functions above to get # a list of existing source ids that it includes in the gauge data to enable joining the gauge data table with # gauge source table. The function adds a timemark, that it gets from the input file name. The timemark values can diff --git a/createIngestSourceMeta.py b/createIngestSourceMeta.py index cebc510..527ca72 100644 --- a/createIngestSourceMeta.py +++ b/createIngestSourceMeta.py @@ -7,8 +7,8 @@ from psycopg2.extensions import AsIs from loguru import logger -# This function takes a list of station names as input, and uses them to query the apsviz_gauges database, and return a list -# of station id(s). +# This function takes a gauge location type (COASTAL, TIDAL or RIVERS), and uses it to query the drf_gauge_station table, +# and return a list of station id(s), and station names. def getStationID(locationType): try: # Create connection to database and get cursor @@ -41,16 +41,16 @@ def getStationID(locationType): print(error) # This function takes a input a directory path and outputFile, and used them to read the input file -# and add station_id(s) that are extracted from the apsviz_gauges database. +# and add station_id(s) that are extracted from the drf_gauge_station table in theapsviz_gauges database. def addMeta(outputDir, outputFile): - # Extract list of stations from dataframe for query database using the getStationID function, + # Extract list of stations from dataframe for query database using the getStationID function locationType = outputFile.split('_')[2] df = getStationID(locationType) # Get source name from outputFilee source = outputFile.split('_')[0] - # Check if source is ADCIRC + # Check if source is ADCIRC, contrails or noaa, and make appropriate additions to DataFrame if source == 'adcirc': # Get source_name and data_source from outputFile, and add them to the dataframe along # with the source_archive value @@ -95,6 +95,8 @@ def main(args): outputFile = args.outputFile logger.info('Start processing source data for file '+outputFile+'.') + + # Run addMeta function addMeta(outputDir, outputFile) logger.info('Finished processing source data for file '+outputFile+'.') @@ -107,6 +109,9 @@ def main(args): parser.add_argument("--outputDir", action="store", dest="outputDir") parser.add_argument("--outputFile", action="store", dest="outputFile") + # Parse input arguments args = parser.parse_args() + + # Run main main(args) diff --git a/createIngestStationMeta.py b/createIngestStationMeta.py index 7bf055f..2dc178a 100644 --- a/createIngestStationMeta.py +++ b/createIngestStationMeta.py @@ -39,6 +39,8 @@ def getGeometry(lon, lat): except (Exception, psycopg2.DatabaseError) as error: print(error) +# This function takes not input, and returns a DataFrame that contains a list of NOAA stations that it extracted from the noaa_stations +# table. The data in the noaa_stations table was obtained from NOAA's api.tidesandcurrents.noaa.gov API. def getNOAAStations(): try: # Create connection to database and get cursor @@ -61,13 +63,16 @@ def getNOAAStations(): cur.close() conn.close() - # return first row + # Return DataFrame return(df) # If exception print error except (Exception, psycopg2.DatabaseError) as error: print(error) +# This function takes a gauge location type (COASTAL, TIDAL or RIVERS) as input, and returns a DataFrame that contains a list of NCEM stations +# that are extracted from the dbo_gages_all table. The dbo_gages_all table contains data from an Excel file (dbo_GAGES_ALL.xlsx) that was +# obtained from Tom Langan at NCEM. def getNCEMStations(locationType): try: # Create connection to database and get cursor @@ -98,7 +103,7 @@ def getNCEMStations(locationType): cur.close() conn.close() - # return first row + # Return DataFrame return(df) # If exception print error @@ -106,9 +111,9 @@ def getNCEMStations(locationType): print(error) -# This function queriers the original noaa station table extracting station information, and -# returns a dataframe. It uses the information from the table along with Nominatim -# and ZipCodeDatabase to generate and address from latitude and longitude values. +# This function queriers the original NOAA station table (noaa_stations), using the getNOAAStations function, +# extracting station information, and returns a dataframe. It uses the information from the table along with +# Nominatim and ZipCodeDatabase to generate and address from latitude and longitude values. def addNOAAMeta(locationType): # Create instance of Nominatim, and ZipCodeDatabase geolocator = Nominatim(user_agent="geoapiExercises") @@ -137,7 +142,8 @@ def addNOAAMeta(locationType): country_code = address.get('country_code', '').strip() country.append(country_code) - # Check if address is in the US + # Check if address is in the US, if it is get county and state information. If it is not use blank string for county and state + # information. if country_code == 'us': try: # Extract zipcode address using the ZipCodeDatabase instance, by inputing the zipcode from @@ -147,16 +153,26 @@ def addNOAAMeta(locationType): state.append(zipcode.state.lower()) county.append(address.get('county', '').replace('County', '').strip()) except: + # If there is an exception get state information from the us module + # NEED TO TAKE A CLOSER LOOK AT THIS, AND SEE IF I CAN USE AN IF STATEMENT TO FIND THE PROBLEM, INSTEAD OF USING EXCEPTION stateinfo = us.states.lookup(address.get('state', '').strip()) try: + # Append state name and county name to the state and county variables state.append(stateinfo.abbr.lower()) county.append(address.get('county', '').replace('County', '').strip()) except: + # If there is an exception check county information to see if county is Lajas, and if not check to see if county is Mayagüez, + # and if not define county as blank string + # NEED TO TAKE A CLOSER LOOK AT THIS, AND SEE IF I CAN USE AN IF STATEMENT TO FIND THE PROBLEM, INSTEAD OF USING EXCEPTION countyname = address.get('county', '').replace('County', '').strip() + + # If countyname is Lajas define state as pr if countyname == 'Lajas': state.append('pr') county.append(countyname) else: + # Else if county is not Lajas, check to see if city is Mayagüez, and if it is define state as pr, and append city to county. + # If city is not Mayagüez, then append blank string for state. city = address.get('city', '').strip() if city == 'Mayagüez': state.append('pr') @@ -174,9 +190,11 @@ def addNOAAMeta(locationType): state.append(address.get('state', '').strip()) county.append(address.get('county', '').replace('County', '').strip()) - + + # Append geometry to geom variable geom.append(getGeometry(lon, lat)) - + + # Add meta to DataFrame df['gauge_owner'] = 'NOAA/NOS' df['location_type'] = locationType df['tz'] = 'gmt' @@ -185,35 +203,51 @@ def addNOAAMeta(locationType): df['county'] = county df['geom'] = geom df.columns= df.columns.str.lower() - #df = df.rename(columns={'station': 'station_name'}) df = df.rename(columns={'name': 'location_name'}) + + # Reorder columns in DataFrame newColsOrder = ["station_name","lat","lon","tz","gauge_owner","location_name","location_type","country","state","county","geom"] df=df.reindex(columns=newColsOrder) - + + # Return DataFrame return(df) +# This function queriers the original NCEM station table (db_gages_all), using the getNCEMStations function, +# extracting station information, and returns a dataframe. def addNCEMMeta(locationType): - # NEED TO GET THIS INFORMATION FROM CONTRAILS INSTEAD OF RELYING ON AN EXCEL FILE + # Run the getNCEMStation, which outputs a DataFrame the contains a list of NCEM stations queried from the + # db_gages_all table, which contains the original NCEM station meta data. df = getNCEMStations(locationType.lower()) + # Rename columns df = df.rename(columns={'latitude':'lat','longitude':'lon','site_id':'station_name', 'name':'location_name','owner': 'gauge_owner'}) + + # Convert all column name to lower case df.columns= df.columns.str.lower() + + # Add variables to DataFrame df['tz'] = 'gmt' df['location_type'] = locationType df['country'] = 'us' df['state'] = 'nc' + + # Reorder column names and reset index values newColsOrder = ["station_name","lat","lon","tz","gauge_owner","location_name","location_type","country","state","county"] df=df.reindex(columns=newColsOrder) df.reset_index(drop=True, inplace=True) - + + # Define geometry variable geom = [] - + + # Loop from the DataFrame of stations, and use the lon and lat values to get the geomtry values, using the getGeometry function for index, row in df.iterrows(): geom.append(getGeometry(row['lon'], row['lat'])) - + + # Add geometry value to DataFrame df['geom'] = geom - + + # Return DataFrame return(df) # Main program function takes args as input, which contains the outputDir, and outputFile values. @@ -232,6 +266,7 @@ def main(args): dataset = outputFile.split('_')[0] locationType = outputFile.split('_')[2] + # Check if dataset is noaa, contrails if dataset == 'noaa': # If dataset is noaa run the addNOAAMeta function and write output to csv file logger.info('Start processing NOAA stations.') @@ -257,6 +292,9 @@ def main(args): parser.add_argument("--outputDir", action="store", dest="outputDir") parser.add_argument("--outputFile", action="store", dest="outputFile") + # Parse arguments args = parser.parse_args() + + # Run main main(args) diff --git a/ingestData.py b/ingestData.py index 0e68b5f..9c20fff 100644 --- a/ingestData.py +++ b/ingestData.py @@ -6,6 +6,9 @@ import pandas as pd from loguru import logger +# This function takes an dataset name as input and uses it to query the drf_harvest_data_file_meta table, +# creating a DataFrame that contains a list of data files to ingest. The ingest directory is the directory +# path in the apsviz-timeseriesdb database container. def getHarvestDataFileMeta(inputDataset): try: # Create connection to database and get cursor @@ -33,19 +36,25 @@ def getHarvestDataFileMeta(inputDataset): # Return Pandas dataframe if inputDataset == 'adcirc': + # Limit to 40 files at a time return(df.head(40)) else: + # Limit to 20 files at a time return(df.head(20)) # If exception print error except (Exception, psycopg2.DatabaseError) as error: print(error) -def ingestHarvestDataFileMeta(inputDir, outputDir): +# This function takes an input directory and ingest directory as input. It uses the input directory to seach for +# harvest_files that need to be ingested. It uses the ingest directory to define the path of the harvest_file +# to ingesting. The ingest directory is the directory path in the apsviz-timeseriesdb database container. +def ingestHarvestDataFileMeta(inputDir, ingestDir): inputFiles = glob.glob(inputDir+"harvest_files_*.csv") for infoFile in inputFiles: - outPathFile = outputDir+infoFile.split('/')[-1] + # Create list of data info files, to be ingested by searching the input directory for data info files. + ingestPathFile = ingestDir+infoFile.split('/')[-1] try: # Create connection to database and get cursor @@ -59,10 +68,10 @@ def ingestHarvestDataFileMeta(inputDir, outputDir): # Run query cur.execute("""COPY drf_harvest_data_file_meta(dir_path,file_name,data_date_time,data_begin_time,data_end_time,file_date_time,source,content_info,ingested,version,overlap_past_file_date_time) - FROM %(out_path_file)s + FROM %(ingest_path_file)s DELIMITER ',' CSV HEADER""", - {'out_path_file': outPathFile}) + {'ingest_path_file': ingestPathFile}) # Commit ingest conn.commit() @@ -75,11 +84,17 @@ def ingestHarvestDataFileMeta(inputDir, outputDir): except (Exception, psycopg2.DatabaseError) as error: print(error) -def ingestStation(inputDir, outputDir): +# This function takes an input directory and an ingest directory as input. The input directory is used to search for geom +# station files that are to be ingested. The ingest directory is used to define the path of the file to be ingested. The +# ingest directory is the directory path in the apsviz-timeseriesdb database container. +def ingestStation(inputDir, ingestDir): + # Create list of geom files, to be ingested by searching the input directory for geom files. inputFiles = glob.glob(inputDir+"geom_*.csv") - + + # Loop thru geom file list, ingesting each one for geomFile in inputFiles: - outPathFile = outputDir+geomFile.split('/')[-1] + # Define the ingest path and file using the ingest directory and the geom file name + ingestPathFile = ingestDir+geomFile.split('/')[-1] try: # Create connection to database and get cursor @@ -93,10 +108,10 @@ def ingestStation(inputDir, outputDir): # Run query cur.execute("""COPY drf_gauge_station(station_name,lat,lon,tz,gauge_owner,location_name,location_type,country,state,county,geom) - FROM %(out_path_file)s + FROM %(ingest_path_file)s DELIMITER ',' CSV HEADER""", - {'out_path_file': outPathFile}) + {'ingest_path_file': ingestPathFile}) # Commit ingest conn.commit() @@ -109,11 +124,17 @@ def ingestStation(inputDir, outputDir): except (Exception, psycopg2.DatabaseError) as error: print(error) -def ingestSource(inputDir, outputDir): +# This function takes an input directory and ingest directory as input. It uses the input directory to search for source +# csv files, that were created by the createIngestSourceMeta.py program. It uses the ingest directory to define the path +# of the file that is to be ingested. The ingest directory is the directory path in the apsviz-timeseriesdb database container. +def ingestSource(inputDir, ingestDir): + # Create list of source files, to be ingested by searching the input directory for source files. inputFiles = glob.glob(inputDir+"source_*.csv") + # Loop thru source file list, ingesting each one for sourceFile in inputFiles: - outPathFile = outputDir+sourceFile.split('/')[-1] + # Define the ingest path and file using the ingest directory and the source file name + ingestPathFile = ingestDir+sourceFile.split('/')[-1] try: # Create connection to database and get cursor @@ -127,10 +148,10 @@ def ingestSource(inputDir, outputDir): # Run query cur.execute("""COPY drf_gauge_source(station_id,data_source,source_name,source_archive) - FROM %(out_path_file)s + FROM %(ingest_path_file)s DELIMITER ',' CSV HEADER""", - {'out_path_file': outPathFile}) + {'ingest_path_file': ingestPathFile}) # Commit ingest conn.commit() @@ -143,12 +164,22 @@ def ingestSource(inputDir, outputDir): except (Exception, psycopg2.DatabaseError) as error: print(error) -def ingestData(inputDir, outputDir, inputDataset): +# This function takes an ingest directory and input dataset as input, and uses them to run the getHarvestDataFileMeta +# function and define the ingestPathFile variable. The getHarvestDataFileMeta function produces a DataFrame (dfDirFiles) +# that contains a list of data files, that are queried from the drf_harvest_data_file_meta table. These files are then +# ingested into the drf_gauge_data table. After the data has been ingested, from a file, the column "ingested", in the +# drf_harvest_data_file_meta table, is updated from False to True. The ingest directory is the directory path in the +# apsviz-timeseriesdb database container. +def ingestData(ingestDir, inputDataset): + # Get DataFrame the contains list of data files that need to be ingested dfDirFiles = getHarvestDataFileMeta(inputDataset) + # Loop thru DataFrame ingesting each data file for index, row in dfDirFiles.iterrows(): - updateFile = row[0] - outPathFile = outputDir+'data_copy_'+updateFile + # Get name of file, that needs to be ingested, from DataFrame, and create data_copy file name and output path + # (ingestPathFile) outsaved to the DataIngesting directory area where is the be ingested using the copy command. + ingestFile = row[0] + ingestPathFile = ingestDir+'data_copy_'+ingestFile try: # Create connection to database and get cursor @@ -162,10 +193,10 @@ def ingestData(inputDir, outputDir, inputDataset): # Run query cur.execute("""COPY drf_gauge_data(source_id,timemark,time,water_level) - FROM %(out_path_file)s + FROM %(ingest_path_file)s DELIMITER ',' CSV HEADER""", - {'out_path_file': outPathFile}) + {'ingest_path_file': ingestPathFile}) # Commit ingest conn.commit() @@ -175,7 +206,7 @@ def ingestData(inputDir, outputDir, inputDataset): SET ingested = True WHERE file_name = %(update_file)s """, - {'update_file': updateFile}) + {'update_file': ingestFile}) # Commit update conn.commit() @@ -188,6 +219,7 @@ def ingestData(inputDir, outputDir, inputDataset): except (Exception, psycopg2.DatabaseError) as error: print(error) +# This function takes not input, and creates the drf_gauge_station_source_data view. def createView(): try: # Create connection to database and get cursor @@ -234,7 +266,7 @@ def createView(): except (Exception, psycopg2.DatabaseError) as error: print(error) -# Main program function takes args as input, which contains the inputDir, inputTask, and inputDataset values. +# Main program function takes args as input, which contains the inputDir, ingestDir, inputTask, and inputDataset values. @logger.catch def main(args): # Add logger @@ -244,42 +276,46 @@ def main(args): # Extract args variables inputDir = args.inputDir - outputDir = args.outputDir + ingestDir = args.ingestDir inputTask = args.inputTask inputDataset = args.inputDataset + # Check if inputTask if file, station, source, data or view, and run appropriate function if inputTask.lower() == 'file': logger.info('Ingesting input file information.') - ingestHarvestDataFileMeta(inputDir, outputDir) + ingestHarvestDataFileMeta(inputDir, ingestDir) logger.info('Ingested input file information.') elif inputTask.lower() == 'station': logger.info('Ingesting station data.') - ingestStation(inputDir, outputDir) + ingestStation(inputDir, ingestDir) logger.info('Ingested station data.') elif inputTask.lower() == 'source': logger.info('Ingesting source data.') - ingestSource(inputDir, outputDir) + ingestSource(inputDir, ingestDir) logger.info('ingested source data.') elif inputTask.lower() == 'data': logger.info('Ingesting data for dataset '+inputDataset+'.') - ingestData(inputDir, outputDir, inputDataset) + ingestData(ingestDir, inputDataset) logger.info('Ingested data for dataset '+inputDataset+'.') elif inputTask.lower() == 'view': logger.info('Creating view.') createView() logger.info('Created view.') -# Run main function takes inputDir, inputTask, and inputDataset as input. +# Run main function takes inputDir, ingestDir, inputTask, and inputDataset as input. if __name__ == "__main__": """ This is executed when run from the command line """ parser = argparse.ArgumentParser() # Optional argument which requires a parameter (eg. -d test) parser.add_argument("--inputDir", action="store", dest="inputDir") - parser.add_argument("--outputDir", action="store", dest="outputDir") + parser.add_argument("--ingestDir", action="store", dest="ingestDir") parser.add_argument("--inputTask", action="store", dest="inputTask") parser.add_argument("--inputDataset", action="store", dest="inputDataset") + # Parse arguments args = parser.parse_args() + + # Run main main(args)