Skip to content

Commit

Permalink
Revised documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
jmpmcmanus committed Feb 16, 2022
1 parent a9a5f8c commit 8216133
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 58 deletions.
35 changes: 27 additions & 8 deletions createHarvestFileMeta.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@
# coding: utf-8

# Import python modules
import argparse, glob, sys, os, datetime, psycopg2, pdb
import argparse, glob, sys, os, datetime, psycopg2
import pandas as pd
from psycopg2.extensions import AsIs
from loguru import logger

# This function queries the harvest_gauge_data_file table using a file_name, an pulls out the
# This function queries the drf_harvest_data_file_meta table using a file_name, an pulls out the
# file_name, and file_date_time, if the file_name exists in the table.
def getFileDateTime(inputFile):
try:
Expand All @@ -34,36 +33,42 @@ def getFileDateTime(inputFile):
cur.close()
conn.close()

# Return DataFrame
return(df)

# If exception print error
except (Exception, psycopg2.DatabaseError) as error:
print(error)

# This function takes a input a directory path and outputFile, and used them to read the input file
# and add station_id(s) that are extracted from the apsviz_gauges database.
# This function takes an input directory path and input dataset, and uses them to create a file list
# that is ingested into the drf_harvest_data_file_meta table, and used to ingest the data files.
def createFileList(inputDir, inputDataset):
# Search for files in the inputDir that have inputDataset name in them, and generate a list of files found
dirInputFiles = glob.glob(inputDir+inputDataset+"*.csv")


# Define dirOutputFiles variable
dirOutputFiles = []


# Loop through dirInputFiles list, fine all files that do not have meta in their name, and add them to dirOutputFiles
for dirInputFile in dirInputFiles:
if dirInputFile.find('meta') == -1:
#print(dirInputFile)
dirOutputFiles.append(dirInputFile)
else:
continue

# Define outputList variable
outputList = []

# Loop through dirOutputFiles, generate new variables and add them to outputList
for dirOutputFile in dirOutputFiles:
dir_path = dirOutputFile.split(inputDataset)[0]
file_name = dirOutputFile.split('/')[-1]
data_date_time = file_name.split('_')[-1].split('.')[0]

checkFile = getFileDateTime(file_name)
checked_file = checkFile.count()
#pdb.set_trace()

if checked_file['file_date_time'] > 0:
version = checkFile['version'][-1]+1
#print('There is another file')
Expand Down Expand Up @@ -92,11 +97,14 @@ def createFileList(inputDir, inputDataset):

outputList.append([dir_path,file_name,data_date_time,data_begin_time,data_end_time,file_date_time,source,content_info,ingested,version,overlap_past_file_date_time])

# Convert outputList to a DataFrame
df = pd.DataFrame(outputList, columns=['dir_path', 'file_name', 'data_date_time', 'data_begin_time', 'data_end_time', 'file_date_time', 'source', 'content_info', 'ingested', 'version', 'overlap_past_file_date_time'])

# Get first time, and last time from the list of files. This will be used in the filename, to enable checking for time overlap in files
first_time = df['data_date_time'][0]
last_time = df['data_date_time'].iloc[-1]

# Return DataFrame first time, and last time
return(df, first_time, last_time)

# Main program function takes args as input, which contains the outputDir, and outputFile values.
Expand All @@ -113,9 +121,17 @@ def main(args):
inputDataset = args.inputDataset

logger.info('Start processing source data for dataset '+inputDataset+'.')

# Get DataFrame file list, and time variables by running the createFileList function
df, first_time, last_time = createFileList(inputDir, inputDataset)

# Get current date
current_date = datetime.date.today()

# Create output file name
outputFile = 'harvest_files_'+inputDataset+'_'+first_time.strip()+'_'+last_time.strip()+'_'+current_date.strftime("%b-%d-%Y")+'.csv'

# Write DataFrame containing list of files to a csv file
df.to_csv(outputDir+outputFile, index=False)
logger.info('Finished processing source data for dataset '+inputDataset+'.')

Expand All @@ -129,6 +145,9 @@ def main(args):
parser.add_argument("--outputDir", action="store", dest="outputDir")
parser.add_argument("--inputDataset", action="store", dest="inputDataset")

# Parse input arguments
args = parser.parse_args()

# Run main
main(args)

8 changes: 5 additions & 3 deletions createIngestData.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from psycopg2.extensions import AsIs
from loguru import logger

# This function takes a dataset name as input, and uses it to query the drf_harvest_data_file_met table, creating a list
# of filenames. The list is converted to a DataFrame and returned.
def getInputFiles(inputDataset):
try:
# Create connection to database and get cursor
Expand All @@ -26,7 +28,7 @@ def getInputFiles(inputDataset):
ORDER BY data_date_time""",
{'source': inputDataset})

# convert query output to Pandas dataframe
# convert query output to Pandas DataFrame
df = pd.DataFrame(cur.fetchall(), columns=['dir_path','file_name'])

# Close cursor and database connection
Expand Down Expand Up @@ -81,7 +83,7 @@ def getObsSourceID(source_archive,station_tuples):
print(error)

# This function takes as input the data_source (hsofs...), and a list of station_id(s), and returns source_id(s) for
# model data from the gauge_source table in the apsviz_gauges database. This funciton specifically gets source_id(s) for
# model data from the drf_gauge_source table in the apsviz_gauges database. This funciton specifically gets source_id(s) for
# model data, such as from ADCIRC. The data_source, such is hsofs, is the grid that is used in the ADCIRC run.
def getModelSourceID(data_source,station_tuples):
try:
Expand Down Expand Up @@ -117,7 +119,7 @@ def getModelSourceID(data_source,station_tuples):
except (Exception, psycopg2.DatabaseError) as error:
print(error)

# This function takes as input a directory input path, directory output path and a filename, and returns a csv file
# This function takes as input a directory input path, directory output path and a filename, and returns a csv
# file that containes gauge data. the function uses the getObsSourceID and getModelSourceID functions above to get
# a list of existing source ids that it includes in the gauge data to enable joining the gauge data table with
# gauge source table. The function adds a timemark, that it gets from the input file name. The timemark values can
Expand Down
15 changes: 10 additions & 5 deletions createIngestSourceMeta.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from psycopg2.extensions import AsIs
from loguru import logger

# This function takes a list of station names as input, and uses them to query the apsviz_gauges database, and return a list
# of station id(s).
# This function takes a gauge location type (COASTAL, TIDAL or RIVERS), and uses it to query the drf_gauge_station table,
# and return a list of station id(s), and station names.
def getStationID(locationType):
try:
# Create connection to database and get cursor
Expand Down Expand Up @@ -41,16 +41,16 @@ def getStationID(locationType):
print(error)

# This function takes a input a directory path and outputFile, and used them to read the input file
# and add station_id(s) that are extracted from the apsviz_gauges database.
# and add station_id(s) that are extracted from the drf_gauge_station table in theapsviz_gauges database.
def addMeta(outputDir, outputFile):
# Extract list of stations from dataframe for query database using the getStationID function,
# Extract list of stations from dataframe for query database using the getStationID function
locationType = outputFile.split('_')[2]
df = getStationID(locationType)

# Get source name from outputFilee
source = outputFile.split('_')[0]

# Check if source is ADCIRC
# Check if source is ADCIRC, contrails or noaa, and make appropriate additions to DataFrame
if source == 'adcirc':
# Get source_name and data_source from outputFile, and add them to the dataframe along
# with the source_archive value
Expand Down Expand Up @@ -95,6 +95,8 @@ def main(args):
outputFile = args.outputFile

logger.info('Start processing source data for file '+outputFile+'.')

# Run addMeta function
addMeta(outputDir, outputFile)
logger.info('Finished processing source data for file '+outputFile+'.')

Expand All @@ -107,6 +109,9 @@ def main(args):
parser.add_argument("--outputDir", action="store", dest="outputDir")
parser.add_argument("--outputFile", action="store", dest="outputFile")

# Parse input arguments
args = parser.parse_args()

# Run main
main(args)

68 changes: 53 additions & 15 deletions createIngestStationMeta.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ def getGeometry(lon, lat):
except (Exception, psycopg2.DatabaseError) as error:
print(error)

# This function takes not input, and returns a DataFrame that contains a list of NOAA stations that it extracted from the noaa_stations
# table. The data in the noaa_stations table was obtained from NOAA's api.tidesandcurrents.noaa.gov API.
def getNOAAStations():
try:
# Create connection to database and get cursor
Expand All @@ -61,13 +63,16 @@ def getNOAAStations():
cur.close()
conn.close()

# return first row
# Return DataFrame
return(df)

# If exception print error
except (Exception, psycopg2.DatabaseError) as error:
print(error)

# This function takes a gauge location type (COASTAL, TIDAL or RIVERS) as input, and returns a DataFrame that contains a list of NCEM stations
# that are extracted from the dbo_gages_all table. The dbo_gages_all table contains data from an Excel file (dbo_GAGES_ALL.xlsx) that was
# obtained from Tom Langan at NCEM.
def getNCEMStations(locationType):
try:
# Create connection to database and get cursor
Expand Down Expand Up @@ -98,17 +103,17 @@ def getNCEMStations(locationType):
cur.close()
conn.close()

# return first row
# Return DataFrame
return(df)

# If exception print error
except (Exception, psycopg2.DatabaseError) as error:
print(error)


# This function queriers the original noaa station table extracting station information, and
# returns a dataframe. It uses the information from the table along with Nominatim
# and ZipCodeDatabase to generate and address from latitude and longitude values.
# This function queriers the original NOAA station table (noaa_stations), using the getNOAAStations function,
# extracting station information, and returns a dataframe. It uses the information from the table along with
# Nominatim and ZipCodeDatabase to generate and address from latitude and longitude values.
def addNOAAMeta(locationType):
# Create instance of Nominatim, and ZipCodeDatabase
geolocator = Nominatim(user_agent="geoapiExercises")
Expand Down Expand Up @@ -137,7 +142,8 @@ def addNOAAMeta(locationType):
country_code = address.get('country_code', '').strip()
country.append(country_code)

# Check if address is in the US
# Check if address is in the US, if it is get county and state information. If it is not use blank string for county and state
# information.
if country_code == 'us':
try:
# Extract zipcode address using the ZipCodeDatabase instance, by inputing the zipcode from
Expand All @@ -147,16 +153,26 @@ def addNOAAMeta(locationType):
state.append(zipcode.state.lower())
county.append(address.get('county', '').replace('County', '').strip())
except:
# If there is an exception get state information from the us module
# NEED TO TAKE A CLOSER LOOK AT THIS, AND SEE IF I CAN USE AN IF STATEMENT TO FIND THE PROBLEM, INSTEAD OF USING EXCEPTION
stateinfo = us.states.lookup(address.get('state', '').strip())
try:
# Append state name and county name to the state and county variables
state.append(stateinfo.abbr.lower())
county.append(address.get('county', '').replace('County', '').strip())
except:
# If there is an exception check county information to see if county is Lajas, and if not check to see if county is Mayagüez,
# and if not define county as blank string
# NEED TO TAKE A CLOSER LOOK AT THIS, AND SEE IF I CAN USE AN IF STATEMENT TO FIND THE PROBLEM, INSTEAD OF USING EXCEPTION
countyname = address.get('county', '').replace('County', '').strip()

# If countyname is Lajas define state as pr
if countyname == 'Lajas':
state.append('pr')
county.append(countyname)
else:
# Else if county is not Lajas, check to see if city is Mayagüez, and if it is define state as pr, and append city to county.
# If city is not Mayagüez, then append blank string for state.
city = address.get('city', '').strip()
if city == 'Mayagüez':
state.append('pr')
Expand All @@ -174,9 +190,11 @@ def addNOAAMeta(locationType):
state.append(address.get('state', '').strip())

county.append(address.get('county', '').replace('County', '').strip())


# Append geometry to geom variable
geom.append(getGeometry(lon, lat))


# Add meta to DataFrame
df['gauge_owner'] = 'NOAA/NOS'
df['location_type'] = locationType
df['tz'] = 'gmt'
Expand All @@ -185,35 +203,51 @@ def addNOAAMeta(locationType):
df['county'] = county
df['geom'] = geom
df.columns= df.columns.str.lower()
#df = df.rename(columns={'station': 'station_name'})
df = df.rename(columns={'name': 'location_name'})

# Reorder columns in DataFrame
newColsOrder = ["station_name","lat","lon","tz","gauge_owner","location_name","location_type","country","state","county","geom"]
df=df.reindex(columns=newColsOrder)


# Return DataFrame
return(df)

# This function queriers the original NCEM station table (db_gages_all), using the getNCEMStations function,
# extracting station information, and returns a dataframe.
def addNCEMMeta(locationType):
# NEED TO GET THIS INFORMATION FROM CONTRAILS INSTEAD OF RELYING ON AN EXCEL FILE
# Run the getNCEMStation, which outputs a DataFrame the contains a list of NCEM stations queried from the
# db_gages_all table, which contains the original NCEM station meta data.
df = getNCEMStations(locationType.lower())

# Rename columns
df = df.rename(columns={'latitude':'lat','longitude':'lon','site_id':'station_name',
'name':'location_name','owner': 'gauge_owner'})

# Convert all column name to lower case
df.columns= df.columns.str.lower()

# Add variables to DataFrame
df['tz'] = 'gmt'
df['location_type'] = locationType
df['country'] = 'us'
df['state'] = 'nc'

# Reorder column names and reset index values
newColsOrder = ["station_name","lat","lon","tz","gauge_owner","location_name","location_type","country","state","county"]
df=df.reindex(columns=newColsOrder)
df.reset_index(drop=True, inplace=True)


# Define geometry variable
geom = []


# Loop from the DataFrame of stations, and use the lon and lat values to get the geomtry values, using the getGeometry function
for index, row in df.iterrows():
geom.append(getGeometry(row['lon'], row['lat']))


# Add geometry value to DataFrame
df['geom'] = geom


# Return DataFrame
return(df)

# Main program function takes args as input, which contains the outputDir, and outputFile values.
Expand All @@ -232,6 +266,7 @@ def main(args):
dataset = outputFile.split('_')[0]
locationType = outputFile.split('_')[2]

# Check if dataset is noaa, contrails
if dataset == 'noaa':
# If dataset is noaa run the addNOAAMeta function and write output to csv file
logger.info('Start processing NOAA stations.')
Expand All @@ -257,6 +292,9 @@ def main(args):
parser.add_argument("--outputDir", action="store", dest="outputDir")
parser.add_argument("--outputFile", action="store", dest="outputFile")

# Parse arguments
args = parser.parse_args()

# Run main
main(args)

Loading

0 comments on commit 8216133

Please sign in to comment.