From cfa3c74af84867e681f67750657c39d063173d97 Mon Sep 17 00:00:00 2001 From: sherimickelson Date: Fri, 14 Oct 2016 15:56:44 -0600 Subject: [PATCH 1/2] Add chunking into timeseries --- ChangeLog | 15 + Config/config_timeseries.xml | 204 ++++++++------ Config/config_timeseries.xsd | 6 +- cesm_utils/cesm_utils/create_postprocess | 2 +- timeseries/setup.py | 3 +- .../timeseries/cesm_tseries_generator.py | 205 +++++++++----- timeseries/timeseries/chunking.py | 261 ++++++++++++++++++ 7 files changed, 536 insertions(+), 160 deletions(-) create mode 100755 timeseries/timeseries/chunking.py diff --git a/ChangeLog b/ChangeLog index 6291c063..2c47afc3 100644 --- a/ChangeLog +++ b/ChangeLog @@ -2,6 +2,21 @@ This file describes what tags were created and why ================================================================================ +Originator: mickelso +Date: 14 October 2016 +Model: CESM_postprocessing +Version: CESM_postprocessing_v0.1.4 +One-line: add chunking for timeseries + + modified: Config/config_timeseries.xml + modified: Config/config_timeseries.xsd + modified: cesm_utils/cesm_utils/create_postprocess + modified: timeseries/setup.py + modified: timeseries/timeseries/cesm_tseries_generator.py + add: timeseries/timeseries/chunking.py + +================================================================================ + Originator: aliceb Date: 12 October 2016 Model: CESM_postprocessing diff --git a/Config/config_timeseries.xml b/Config/config_timeseries.xml index 79539158..dd60636c 100644 --- a/Config/config_timeseries.xml +++ b/Config/config_timeseries.xml @@ -11,77 +11,86 @@ atm True - + hist TRUE netcdf4c proc/tseries/monthly monthly - 10 + years + 10 - + hist - FALSE + TRUE netcdf4c proc/tseries/daily daily - 1 + years + 5 - + hist - FALSE + TRUE netcdf4c proc/tseries/hourly6 hourly6 - 1 + years + 1 - + hist - FALSE + TRUE netcdf4c proc/tseries/hourly3 hourly3 - 1 + years + 1 - + hist - FALSE + TRUE netcdf4c proc/tseries/hourly1 hourly1 - 1 + years + 1 - + hist - FALSE + TRUE netcdf4c proc/tseries/min30 min30 - 1 + years + 1 - + hist FALSE netcdf4c undefined undefined - 1 + years + 10 - + hist FALSE netcdf4c undefined undefined - 1 + years + 10 - + hist FALSE netcdf4c undefined undefined - 1 + years + 10 @@ -103,57 +112,63 @@ - + lnd True - + hist TRUE netcdf4c proc/tseries/monthly monthly - 10 + years + 10 - + hist - FALSE + TRUE netcdf4c proc/tseries/daily daily - 1 + years + 5 - + hist - FALSE + TRUE netcdf4c proc/tseries/hourly6 hourly6 - 1 + years + 1 - + hist - FALSE + TRUE netcdf4c proc/tseries/hourly3 hourly3 - 1 + years + 1 - + hist - FALSE + TRUE netcdf4c proc/tseries/hourly1 hourly1 - 1 + years + 1 - + hist - FALSE + TRUE netcdf4c proc/tseries/min30 min30 - 1 + years + 1 @@ -173,37 +188,41 @@ rof True - + hist TRUE netcdf4c proc/tseries/monthly monthly - 10 + years + 10 - + hist - FALSE + TRUE netcdf4c proc/tseries/daily daily - 1 + years + 5 - + hist - FALSE + TRUE netcdf4c proc/tseries/hourly6 hourly6 - 1 + years + 1 - + hist - FALSE + TRUE netcdf4c proc/tseries/hourly3 hourly3 - 1 + years + 1 @@ -223,37 +242,41 @@ rof True - + hist TRUE netcdf4c proc/tseries/monthly monthly - 10 + years + 10 - + hist - FALSE + TRUE netcdf4c proc/tseries/daily daily - 1 + years + 5 - + hist - FALSE + TRUE netcdf4c proc/tseries/hourly6 hourly6 - 1 + years + 1 - + hist - FALSE + TRUE netcdf4c proc/tseries/hourly3 hourly3 - 1 + years + 1 @@ -273,13 +296,14 @@ ice True - + hist TRUE netcdf4c proc/tseries/monthly monthly - 10 + years + 10 @@ -292,53 +316,59 @@ ocn True - + hist TRUE netcdf4c proc/tseries/monthly monthly - 10 + years + 10 - + hist - FALSE + TRUE netcdf4c proc/tseries/daily daily - 1 + years + 5 - + hist - FALSE + TRUE netcdf4c proc/tseries/yearly yearly - 10 + years + 10 - + hist FALSE netcdf4c undefined undefined - 1 + years + 10 - + hist TRUE netcdf4c proc/tseries/annual annual - 10 + years + 10 - + hist - FALSE + TRUE netcdf4c proc/tseries/daily daily - 1 + years + 5 @@ -351,13 +381,14 @@ glc True - + hist - FALSE + TRUE netcdf4c proc/tseries/monthly monthly - 10 + years + 10 @@ -366,13 +397,14 @@ wav True - + hist - FALSE + TRUE netcdf4c proc/tseries/monthly monthly - 10 + years + 10 diff --git a/Config/config_timeseries.xsd b/Config/config_timeseries.xsd index 89ea20ce..2649c79b 100644 --- a/Config/config_timeseries.xsd +++ b/Config/config_timeseries.xsd @@ -14,7 +14,8 @@ - + + @@ -26,7 +27,8 @@ - + + diff --git a/cesm_utils/cesm_utils/create_postprocess b/cesm_utils/cesm_utils/create_postprocess index 786bd9bd..979f78ce 100755 --- a/cesm_utils/cesm_utils/create_postprocess +++ b/cesm_utils/cesm_utils/create_postprocess @@ -569,7 +569,7 @@ def main(options): run_tmpl = 'postprocess.tmpl' # generate the timeseries batch submit script from template files - postProcessCmd = 'cesm_tseries_generator.py' + postProcessCmd = 'cesm_tseries_generator.py --completechunk 1 ' processName = 'timeseries' outFile = '{0}/{1}'.format(envDict['PP_CASE_PATH'],processName) diff --git a/timeseries/setup.py b/timeseries/setup.py index 67518c35..82e3edfa 100644 --- a/timeseries/setup.py +++ b/timeseries/setup.py @@ -50,7 +50,8 @@ def get_dependencies(): author_email="aliceb@ucar.edu", packages=['timeseries'], version=get_version(), - scripts=['timeseries/cesm_tseries_generator.py'], + scripts=['timeseries/cesm_tseries_generator.py', + 'timeseries/chunking.py'], #install_requires=get_requires(), #dependency_links=get_dependencies(), include_package_data=True, diff --git a/timeseries/timeseries/cesm_tseries_generator.py b/timeseries/timeseries/cesm_tseries_generator.py index d7fb5254..0181d1bb 100755 --- a/timeseries/timeseries/cesm_tseries_generator.py +++ b/timeseries/timeseries/cesm_tseries_generator.py @@ -34,6 +34,7 @@ import xml.etree.ElementTree as ET from cesm_utils import cesmEnvLib +import chunking # import the MPI related module from asaptools import partition, simplecomm, vprinter @@ -59,6 +60,9 @@ def commandline_options(): parser.add_argument('--caseroot', nargs=1, required=True, help='fully quailfied path to case root directory') + parser.add_argument('--completechunk', nargs=1, type=int, required=False, + help='1: do not create incomplete chunks, 0: create an incomplete chunk') + parser.add_argument('--standalone', action='store_true', help='switch to indicate stand-alone post processing caseroot') @@ -74,7 +78,7 @@ def commandline_options(): #============================================================================================== # readArchiveXML - read the $CASEROOT/env_timeseries.xml file and build the pyReshaper classes #============================================================================================== -def readArchiveXML(caseroot, dout_s_root, casename, standalone, debug): +def readArchiveXML(caseroot, dout_s_root, casename, standalone, completechunk, debug): """ reads the $CASEROOT/env_timeseries.xml file and builds a fully defined list of reshaper specifications to be passed to the pyReshaper tool. @@ -83,6 +87,7 @@ def readArchiveXML(caseroot, dout_s_root, casename, standalone, debug): dout_s_root (string) - short term archive root path casename (string) - casename standalone (boolean) - logical to indicate if postprocessing case is stand-alone or not + completechunk (boolean) - end on a ragid boundary if True. Otherwise, do not create incomplete chunks if False """ specifiers = list() xml_tree = ET.ElementTree() @@ -90,6 +95,9 @@ def readArchiveXML(caseroot, dout_s_root, casename, standalone, debug): # get path to env_timeseries.xml file env_timeseries = '{0}/env_timeseries.xml'.format(caseroot) + # read tseries log file to see if we've already started converting files, if so, where did we leave off + log = chunking.read_log('ts_status.log') + # check if the env_timeseries.xml file exists if ( not os.path.isfile(env_timeseries) ): err_msg = "cesm_tseries_generator.py ERROR: {0} does not exist.".format(env_timeseries) @@ -159,57 +167,40 @@ def readArchiveXML(caseroot, dout_s_root, casename, standalone, debug): # get a list of all the input files for this stream from the archive location history_files = list() in_file_path = '/'.join( [dout_s_root,rootdir,subdir] ) - all_in_files = os.listdir(in_file_path) - - # check that there are actually a list of history files to work with - for in_file in all_in_files: - if re.search(file_extension, in_file): - # check to make sure this file ends in .nc and not something else - if in_file.endswith('.nc'): - history_files.append(in_file_path+"/"+in_file) - else: - print('cesm_tseries_generator.py WARNING - unable to operate on file {0}/{1}'.format(in_file_path,in_file)) - - # sort the list of input history files in order to get the output suffix - # from the first and last file - if len(history_files) > 0: - history_files.sort() - - start_file = history_files[0] - start_file_parts = list() - start_file_parts = start_file.split( "." ) - start_file_time = start_file_parts[-2] - - last_file = history_files[-1] - last_file_parts = list() - last_file_parts = last_file.split( "." ) - last_file_time = last_file_parts[-2] - - # get the actual component name from the history file - # will also need to deal with the instance numbers based on the comp_name - comp_name = last_file_parts[-4] - stream = last_file_parts[-3] - - # check for pop.h nday1 and nyear1 history streams - if last_file_parts[-3] in ["nday1","nyear1"]: - comp_name = last_file_parts[-5] - stream = last_file_parts[-4]+"."+last_file_parts[-3] + + if file_spec.find("tseries_filecat_tper") is not None: + tper = file_spec.find("tseries_filecat_tper").text + if file_spec.find("tseries_filecat_n") is not None: + size = file_spec.find("tseries_filecat_n").text + comp_name = comp + stream = file_extension.split('.[')[0] + + stream_dates,file_slices,cal,units = chunking.get_input_dates(in_file_path+'/*'+file_extension+'*.nc') + if comp+stream not in log.keys(): + log[comp+stream] = {'slices':[],'index':0} + ts_log_dates = log[comp+stream]['slices'] + index = log[comp+stream]['index'] + files,dates,index = chunking.get_chunks(tper, index, size, stream_dates, ts_log_dates, cal, units, completechunk) + for d in dates: + log[comp+stream]['slices'].append(float(d)) + log[comp+stream]['index']=index + for cn,cf in files.iteritems(): + + history_files = cf['fn'] + start_time_parts = cf['start'] + last_time_parts = cf['end'] # create the tseries output prefix needs to end with a "." - tseries_output_prefix = tseries_output_dir+"/"+casename+"."+comp_name+"."+stream+"." + tseries_output_prefix = tseries_output_dir+"/"+casename+"."+comp_name+stream+"." # format the time series variable output suffix based on the # tseries_tper setting suffix needs to start with a "." if tseries_tper == "yearly": - tseries_output_suffix = "."+start_file_time+"-"+last_file_time+".nc" + tseries_output_suffix = "."+start_time_parts[0]+"-"+last_time_parts[0]+".nc" elif tseries_tper == "monthly": - start_time_parts = start_file_time.split( "-" ) - last_time_parts = last_file_time.split( "-" ) - tseries_output_suffix = "."+start_time_parts[0]+start_time_parts[1]+"-"+last_time_parts[0]+last_time_parts[1]+".nc" - elif tseries_tper in ["weekly","daily","hourly6","hourly3","hourly1","min30"]: - start_time_parts = start_file_time.split( "-" ) - last_time_parts = last_file_time.split( "-" ) tseries_output_suffix = "."+start_time_parts[0]+start_time_parts[1]+start_time_parts[2]+"-"+last_time_parts[0]+last_time_parts[1]+last_time_parts[2]+".nc" + elif tseries_tper in ["weekly","daily","hourly6","hourly3","hourly1","min30"]: + tseries_output_suffix = "."+start_time_parts[0]+start_time_parts[1]+start_time_parts[2]+start_time_parts[3]+"-"+last_time_parts[0]+last_time_parts[1]+last_time_parts[2]+last_time_parts[3]+".nc" # get a reshaper specification object spec = specification.create_specifier() @@ -230,9 +221,53 @@ def readArchiveXML(caseroot, dout_s_root, casename, standalone, debug): # append this spec to the list of specifiers specifiers.append(spec) + return specifiers,log + - return specifiers +def divide_comm(scomm, l_spec): + + ''' + Divide the communicator into subcommunicators, leaving rank one to hand out reshaper jobs to run. + The part of the parallelization will be handled by this script, with the parallelization now over + CESM output streams and chunks. The reshaper will the compute the variables in parallel. + Input: + scomm (simplecomm) - communicator to be divided (currently MIP_COMM_WORLD) + l_spec(int) - the number of reshaper specifiers(# of output stream and # of chunks) + + Output: + inter_comm(simplecomm) - this rank's subcommunicator it belongs to + num_of_groups(int) - the total number of subcommunicators + ''' + min_procs_per_spec = 16 + size = scomm.get_size() + rank = scomm.get_rank()-1 + + # the global master needs to be in its own subcommunicator + # ideally it would not be in any, but the divide function + # requires all ranks to participate in the call + if rank == -1: + group = ((size/min_procs_per_spec)%l_spec)+1 + if l_spec == 1: + num_of_groups = 1 + else: + num_of_groups = (size/min_procs_per_spec) + else: + temp_color = (rank // min_procs_per_spec) % l_spec + if l_spec == 1: + num_of_groups = 1 + else: + num_of_groups = (size/min_procs_per_spec) + if (temp_color == num_of_groups): + temp_color = temp_color - 1 + groups = [] + for g in range(0,num_of_groups+1): + groups.append(g) + group = groups[temp_color] + + inter_comm,multi_comm = scomm.divide(group) + + return inter_comm,num_of_groups #====== # main #====== @@ -258,23 +293,62 @@ def main(options, scomm, rank, size): # loading the specifiers from the env_timeseries.xml only needs to run on the master task (rank=0) if rank == 0: - specifiers = readArchiveXML(caseroot, cesmEnv['DOUT_S_ROOT'], cesmEnv['CASE'], options.standalone, debug) + dout_s_root = cesmEnv['DOUT_S_ROOT'] + case = cesmEnv['CASE'] + specifiers,log = readArchiveXML(caseroot, dout_s_root, case, options.standalone, options.completechunk[0], debug) scomm.sync() # specifiers is a list of pyreshaper specification objects ready to pass to the reshaper specifiers = scomm.partition(specifiers, func=partition.Duplicate(), involved=True) - # create the PyReshaper object - uncomment when multiple specifiers is allowed - reshpr = reshaper.create_reshaper(specifiers, serial=False, verbosity=debug) - - # Run the conversion (slice-to-series) process - reshpr.convert() + if len(specifiers) > 0: + # setup subcommunicators to do streams and chunks in parallel + # everyone participates except for root + inter_comm, lsubcomms = divide_comm(scomm, len(specifiers)) + color = inter_comm.get_color() + lsize = inter_comm.get_size() + lrank = inter_comm.get_rank() + + GWORK_TAG = 10 # global comm mpi tag + LWORK_TAG = 20 # local comm mpi tag + # global root - hands out specifiers to work on. When complete, it must tell each subcomm all work is done. + if (rank == 0): + for i in range(0,len(specifiers)): # hand out all specifiers + scomm.ration(data=i, tag=GWORK_TAG) + for i in range(0,lsubcomms): # complete, signal this to all subcomms + scomm.ration(data=-99, tag=GWORK_TAG) + + # subcomm root - performs the same tasks as other subcomm ranks, but also gets the specifier to work on and sends + # this information to all ranks within subcomm + elif (lrank == 0): + i = -999 + while i != -99: + i = scomm.ration(tag=GWORK_TAG) # recv from global + for x in range(1,lsize): + inter_comm.ration(i, LWORK_TAG) # send to local ranks + if i != -99: + # create the PyReshaper object - uncomment when multiple specifiers is allowed + reshpr = reshaper.create_reshaper(specifiers[i], serial=False, verbosity=debug, simplecomm=inter_comm) + # Run the conversion (slice-to-series) process + reshpr.convert() + # Print timing diagnostics + reshpr.print_diagnostics() + + # all subcomm ranks - recv the specifier to work on and call the reshaper + else: + i = -999 + while i != -99: + i = inter_comm.ration(tag=LWORK_TAG) # recv from local root + if i != -99: + # create the PyReshaper object - uncomment when multiple specifiers is allowed + reshpr = reshaper.create_reshaper(specifiers[i], serial=False, verbosity=debug, simplecomm=inter_comm) + # Run the conversion (slice-to-series) process + reshpr.convert() - # Print timing diagnostics - reshpr.print_diagnostics() - - return 0 + if rank == 0: + chunking.write_log('ts_status.log', log) # Update system log with the dates that were just converted + scomm.sync() #=================================== if __name__ == "__main__": @@ -290,18 +364,9 @@ def main(options, scomm, rank, size): if rank == 0: print('cesm_tseries_generator INFO Running on {0} cores'.format(size)) - try: - status = main(options, scomm, rank, size) - if rank == 0: - print('************************************************************') - print('Successfully completed generating variable time-series files') - print('************************************************************') - sys.exit(status) - -## except RunTimeError as error: - - except Exception as error: - print(str(error)) - if options.backtrace: - traceback.print_exc() - sys.exit(1) + main(options, scomm, rank, size) + if rank == 0: + print('************************************************************') + print('Successfully completed generating variable time-series files') + print('************************************************************') + diff --git a/timeseries/timeseries/chunking.py b/timeseries/timeseries/chunking.py new file mode 100755 index 00000000..6528b812 --- /dev/null +++ b/timeseries/timeseries/chunking.py @@ -0,0 +1,261 @@ +#!/usr/bin/env python2 + +import glob, json, os +import netCDF4 as nc +import cf_units +import datetime + +def get_input_dates(glob_str): + + ''' + Open up all of the files that match the search string and get + the dates within the files. Also get the number of slices within + each file, what calendar it uses and the time unit. + + Input: + glob_str(string) - the search path to get files + + Output: + stream_dates(dictionary) - keys->date, values->the file where this slice is located + file_slices(dictionary) - keys->filename, values->the number of slices found in the file + calendar(string) - the name of the calendar type (ie, noleap, ...) + units(string) - the calendar unit (possibly in the form 'days since....') + ''' + stream_files = glob.glob(glob_str) + + stream_dates = {} + file_slices = {} + att = {} + print glob_str + + if len(stream_files) < 1: + return stream_dates, file_slices, None, None + + for fn in sorted(stream_files): + print 'opening ',fn + # open file and get time dimension + f = nc.Dataset(fn,"r") + all_t = f.variables['time'] + + # add the file name are how many slices it contains + file_slices[fn] = len(all_t) + + # add all dates and which file they are located in + for t in all_t: + stream_dates[t] = fn + + # get all attributes of time in order to get cal and units + for a in all_t.ncattrs(): + att[a] = all_t.__getattribute__(a) + + return stream_dates,file_slices,att['calendar'],att['units'] + +def get_cesm_date(fn,t=None): + + ''' + Open a netcdf file and return its datestamp + + Input: + fn(string) - the filename to get date from + t(string) - string indicating if the file is a beiginning or end (optional) + + Output: + an array that includes year,month,day,hour is string format with correct number of digits + ''' + + f = nc.Dataset(fn,"r") + all_t = f.variables['time'] + + att={} + for a in all_t.ncattrs(): + att[a] = all_t.__getattribute__(a) + + if ('bounds' in att.keys()): + if t == 'b': + d = f.variables[att['bounds']][0][0] + elif t == 'e': + l = len(f.variables[att['bounds']]) + d = (f.variables[att['bounds']][l-1][1])-1 + elif t == 'ee': + l = len(f.variables[att['bounds']]) + d = (f.variables[att['bounds']][l-1][1]) + else: + d = f.variables['time'][1] + + d1 = cf_units.num2date(d,att['units'],att['calendar']) + + return [str(d1.year).zfill(4),str(d1.month).zfill(2),str(d1.day).zfill(2),str(d1.hour).zfill(2)] + + +def get_chunk_range(tper, size, start, cal, units): + + ''' + Figures out the end date of the chunk based on the start date + + Input: + tper(string) - the time period to use when figuring out chunk size (year, month, day, hour) + size(int) - the size of the chunk used in coordination with tper + start(float) - the date stamp to start count from + cal(string) - the calendar to use to figure out chunk size + units(string) - the units to use to figure out chunk size + + Output: + start(float) - the start date of the chunk + end(float) - the end date of the chunk + ''' + + # Get the first date + d1 = cf_units.num2date(start, units, cal) + + # Figure out how many days each chunk should be + if 'day' in tper: #day + end = float(start) + int(size) + + elif 'hour' in tper: #hour + end = (float(size)/24.0) + float(start) + + elif 'month' in tper: #month + m2 = (int(d1.month)+(int(size)%12)) + y2 = (int(d1.year)+(int(size)/12)) + if m2 > 12: + y2 = y2 + 1 + m2 = m2 - 12 + d2 = datetime.datetime(y2, m2, d1.day, d1.hour, d1.minute) + end = cf_units.date2num(d2, units, cal) + + elif 'year' in tper: #year + d2 = datetime.datetime(int(size)+d1.year, d1.month, d1.day, d1.hour, d1.minute) + end = cf_units.date2num(d2, units, cal) + + return start, end + +def get_chunks(tper, index, size, stream_dates, ts_log_dates, cal, units, s): + + ''' + Figure out what chunks there are to do for a particular CESM output stream + + Input: + tper(string) - the time period to use when figuring out chunk size (year, month, day, hour) + index(int) - an integer indicating which index in the tper and size list to start from. + this option gives users to specify different chunk sizes. + size(int) - the size of the chunk used in coordination with tper + stream_dates(dictionary) - keys->date, values->the file where this slice is located + ts_log_dates(list) - a list of all of the dates that have been converted already - used to + avoid duplication + cal(string) - the calendar to use to figure out chunk size + units(string) - the units to use to figure out chunk size + s(string) - flag to determine if we need to wait until we have all data before we create a chunk or + if it's okay to do an incomplete chunk + + Output: + files(dictionary) - keys->chunk, values->a list of all files needed for this chunk and the start and end dates + dates(list) - all of the dates that will be in this chunk + index(int) - the last index to be used in the tper and size list + ''' + + # remove the times in ts_log_dates from stream_dates because + # these have already been created + du = cf_units.Unit(units) + to_do = [] + for d in sorted(stream_dates.keys()): + if d not in ts_log_dates: + to_do.append(d) + + files = {} + dates = [] + i = 0 + e = False + chunk_n = 0 + + tper_list = tper.split(",") + size_list = size.split(",") + if len(tper_list) != len(size_list): + print 'Error: The length of requested time periods for chunks does not match the length of requested chunk sizes', tper_list, size_list + + if len(to_do)>1: + while e is False: + # get the new range + start,end = get_chunk_range(tper_list[index], size_list[index], to_do[i], cal, units) + if index != len(tper_list)-1: + index = index + 1 + #walk through and map dates within this range to files + cfiles = [] + cdates = [] + while to_do[i] < end and e is False: + fn = stream_dates[to_do[i]] + if fn not in cfiles: + cfiles.append(fn) + cdates.append(to_do[i]) + print 'adding ',fn,to_do[i] + i = i + 1 + # am I passed the dates I have? If so, exit loop and don't add to list. + # these will be converted when more data exists + if i >= len(to_do)-1: + fn = stream_dates[to_do[i]] + if fn not in cfiles: + cfiles.append(fn) + cdates.append(to_do[i]) + print 'adding ',fn,to_do[i] + if s==1: + print '#################################' + print 'Not appending: ' + print cfiles + print 'dates:(',cdates,')' + print '#################################' + else: # user indicated that they would like to end with an incomplete chunk + files[chunk_n] = {} + files[chunk_n]['fn'] = sorted(cfiles) + files[chunk_n]['start'] = get_cesm_date(cfiles[0],t='b') + files[chunk_n]['end'] = get_cesm_date(cfiles[-1],t='e') + for cd in sorted(cdates): + dates.append(cd) + e = True + # Have a complete set. Append file and date info. + if e is False: + files[chunk_n] = {} + s_cdates = sorted(cdates) + files[chunk_n]['fn'] = sorted(cfiles) + files[chunk_n]['start'] = get_cesm_date(cfiles[0],t='b') + files[chunk_n]['end'] = get_cesm_date(cfiles[-1],t='e') + for cd in sorted(cdates): + dates.append(cd) + chunk_n = chunk_n+1 + + return files, dates, index + + +def write_log(log_fn, log): + + ''' + Write (or append) a json file with the date stamps that have been coverted + and what tper and size list index to next start on. + + Input: + log_fn(string) - the name of the log file to write to + log(dictionary) - keys->file streams, values->the dates that have been + converted and next indexes to use for size and tper lists + ''' + with open(log_fn, 'w') as f: + json.dump(log, f) + + +def read_log(log_fn): + + ''' + Read in the json log file in order to know which files have already been converted + + Input: + log_fn(string) - the name of the log file to write to + + Output: + d(dictionary) - will be empty if it the file doesn't exist or contain a dictionary + with keys->file streams, values->the dates that have been + converted and next indexes to use for size and tper lists + ''' + if os.path.isfile(log_fn): + with open(log_fn, 'r') as f: + d = json.load(f) + return d + else: + return {} + From cfcda4ae3ed2cc8fbd8d2d3f1ecbfb797a3876e9 Mon Sep 17 00:00:00 2001 From: sherimickelson Date: Mon, 17 Oct 2016 09:23:56 -0600 Subject: [PATCH 2/2] Bug Fix: removed call to print reshaper timing stats This was only being called by the root of the subcommunicator. This was causing the code to hang because there are collective calls within this portion of the pyreshaper. --- timeseries/timeseries/cesm_tseries_generator.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/timeseries/timeseries/cesm_tseries_generator.py b/timeseries/timeseries/cesm_tseries_generator.py index 0181d1bb..ae572496 100755 --- a/timeseries/timeseries/cesm_tseries_generator.py +++ b/timeseries/timeseries/cesm_tseries_generator.py @@ -331,8 +331,6 @@ def main(options, scomm, rank, size): reshpr = reshaper.create_reshaper(specifiers[i], serial=False, verbosity=debug, simplecomm=inter_comm) # Run the conversion (slice-to-series) process reshpr.convert() - # Print timing diagnostics - reshpr.print_diagnostics() # all subcomm ranks - recv the specifier to work on and call the reshaper else: