-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcreateHarvestFileMeta.py
153 lines (118 loc) · 6.04 KB
/
createHarvestFileMeta.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#!/usr/bin/env python
# coding: utf-8
# Import python modules
import argparse, glob, sys, os, datetime, psycopg2
import pandas as pd
from loguru import logger
# This function queries the drf_harvest_data_file_meta table using a file_name, an pulls out the
# file_name, and file_date_time, if the file_name exists in the table.
def getFileDateTime(inputFile):
try:
# Create connection to database and get cursor
conn = psycopg2.connect("dbname='apsviz_gauges' user='apsviz_gauges' host='localhost' port='5432' password='apsviz_gauges'")
cur = conn.cursor()
# Set enviromnent
cur.execute("""SET CLIENT_ENCODING TO UTF8""")
cur.execute("""SET STANDARD_CONFORMING_STRINGS TO ON""")
cur.execute("""BEGIN""")
# Run query
cur.execute("""SELECT dir_path, file_name, file_date_time, ingested, version, overlap_past_file_date_time
FROM drf_harvest_data_file_meta
WHERE file_name = %(input_file)s
ORDER BY file_date_time""",
{'input_file': inputFile})
# convert query output to Pandas dataframe
df = pd.DataFrame(cur.fetchall(), columns=['dir_path', 'file_name', 'file_date_time', 'ingested', 'version', 'overlap_past_file_date_time'])
# Close cursor and database connection
cur.close()
conn.close()
# Return DataFrame
return(df)
# If exception print error
except (Exception, psycopg2.DatabaseError) as error:
print(error)
# This function takes an input directory path and input dataset, and uses them to create a file list
# that is ingested into the drf_harvest_data_file_meta table, and used to ingest the data files.
def createFileList(inputDir, inputDataset):
# Search for files in the inputDir that have inputDataset name in them, and generate a list of files found
dirInputFiles = glob.glob(inputDir+inputDataset+"*.csv")
# Define dirOutputFiles variable
dirOutputFiles = []
# Loop through dirInputFiles list, fine all files that do not have meta in their name, and add them to dirOutputFiles
for dirInputFile in dirInputFiles:
if dirInputFile.find('meta') == -1:
#print(dirInputFile)
dirOutputFiles.append(dirInputFile)
else:
continue
# Define outputList variable
outputList = []
# Loop through dirOutputFiles, generate new variables and add them to outputList
for dirOutputFile in dirOutputFiles:
dir_path = dirOutputFile.split(inputDataset)[0]
file_name = dirOutputFile.split('/')[-1]
data_date_time = file_name.split('_')[-1].split('.')[0]
checkFile = getFileDateTime(file_name)
checked_file = checkFile.count()
if checked_file['file_date_time'] > 0:
version = checkFile['version'][-1]+1
#print('There is another file')
elif checked_file['file_date_time'] == 0:
version = 1
#print('There is not another file')
else:
sys.exit('Somethings wrong with the query!')
df = pd.read_csv(dirOutputFile)
data_begin_time = df['TIME'].min()
data_end_time = df['TIME'].max()
file_date_time = datetime.datetime.fromtimestamp(os.path.getctime(dirOutputFile))
source = inputDataset.split('_')[0]
if source == 'adcirc':
content_info = file_name.split('_')[2]+'_'+file_name.split('_')[3]
elif source == 'contrails':
content_info = file_name.split('_')[2]
elif source == 'noaa':
content_info = 'None'
ingested = 'False'
overlap_past_file_date_time = 'False'
outputList.append([dir_path,file_name,data_date_time,data_begin_time,data_end_time,file_date_time,source,content_info,ingested,version,overlap_past_file_date_time])
# Convert outputList to a DataFrame
df = pd.DataFrame(outputList, columns=['dir_path', 'file_name', 'data_date_time', 'data_begin_time', 'data_end_time', 'file_date_time', 'source', 'content_info', 'ingested', 'version', 'overlap_past_file_date_time'])
# Get first time, and last time from the list of files. This will be used in the filename, to enable checking for time overlap in files
first_time = df['data_date_time'][0]
last_time = df['data_date_time'].iloc[-1]
# Return DataFrame first time, and last time
return(df, first_time, last_time)
# Main program function takes args as input, which contains the outputDir, and outputFile values.
@logger.catch
def main(args):
# Add logger
logger.remove()
log_path = os.getenv('LOG_PATH', os.path.join(os.path.dirname(__file__), 'logs'))
logger.add(log_path+'/createHarvestFileMeta.log', level='DEBUG')
# Extract args variables
inputDir = args.inputDir
outputDir = args.outputDir
inputDataset = args.inputDataset
logger.info('Start processing source data for dataset '+inputDataset+'.')
# Get DataFrame file list, and time variables by running the createFileList function
df, first_time, last_time = createFileList(inputDir, inputDataset)
# Get current date
current_date = datetime.date.today()
# Create output file name
outputFile = 'harvest_files_'+inputDataset+'_'+first_time.strip()+'_'+last_time.strip()+'_'+current_date.strftime("%b-%d-%Y")+'.csv'
# Write DataFrame containing list of files to a csv file
df.to_csv(outputDir+outputFile, index=False)
logger.info('Finished processing source data for dataset '+inputDataset+'.')
# Run main function takes outputDir, and outputFile as input.
if __name__ == "__main__":
""" This is executed when run from the command line """
parser = argparse.ArgumentParser()
# Optional argument which requires a parameter (eg. -d test)
parser.add_argument("--inputDir", action="store", dest="inputDir")
parser.add_argument("--outputDir", action="store", dest="outputDir")
parser.add_argument("--inputDataset", action="store", dest="inputDataset")
# Parse input arguments
args = parser.parse_args()
# Run main
main(args)