diff --git a/ChangeLog b/ChangeLog
index 6291c063..2c47afc3 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -2,6 +2,21 @@
This file describes what tags were created and why
================================================================================
+Originator: mickelso
+Date: 14 October 2016
+Model: CESM_postprocessing
+Version: CESM_postprocessing_v0.1.4
+One-line: add chunking for timeseries
+
+ modified: Config/config_timeseries.xml
+ modified: Config/config_timeseries.xsd
+ modified: cesm_utils/cesm_utils/create_postprocess
+ modified: timeseries/setup.py
+ modified: timeseries/timeseries/cesm_tseries_generator.py
+ add: timeseries/timeseries/chunking.py
+
+================================================================================
+
Originator: aliceb
Date: 12 October 2016
Model: CESM_postprocessing
diff --git a/Config/config_timeseries.xml b/Config/config_timeseries.xml
index 79539158..dd60636c 100644
--- a/Config/config_timeseries.xml
+++ b/Config/config_timeseries.xml
@@ -11,77 +11,86 @@
atm
True
-
+
hist
TRUE
netcdf4c
proc/tseries/monthly
monthly
- 10
+ years
+ 10
-
+
hist
- FALSE
+ TRUE
netcdf4c
proc/tseries/daily
daily
- 1
+ years
+ 5
-
+
hist
- FALSE
+ TRUE
netcdf4c
proc/tseries/hourly6
hourly6
- 1
+ years
+ 1
-
+
hist
- FALSE
+ TRUE
netcdf4c
proc/tseries/hourly3
hourly3
- 1
+ years
+ 1
-
+
hist
- FALSE
+ TRUE
netcdf4c
proc/tseries/hourly1
hourly1
- 1
+ years
+ 1
-
+
hist
- FALSE
+ TRUE
netcdf4c
proc/tseries/min30
min30
- 1
+ years
+ 1
-
+
hist
FALSE
netcdf4c
undefined
undefined
- 1
+ years
+ 10
-
+
hist
FALSE
netcdf4c
undefined
undefined
- 1
+ years
+ 10
-
+
hist
FALSE
netcdf4c
undefined
undefined
- 1
+ years
+ 10
@@ -103,57 +112,63 @@
-
+
lnd
True
-
+
hist
TRUE
netcdf4c
proc/tseries/monthly
monthly
- 10
+ years
+ 10
-
+
hist
- FALSE
+ TRUE
netcdf4c
proc/tseries/daily
daily
- 1
+ years
+ 5
-
+
hist
- FALSE
+ TRUE
netcdf4c
proc/tseries/hourly6
hourly6
- 1
+ years
+ 1
-
+
hist
- FALSE
+ TRUE
netcdf4c
proc/tseries/hourly3
hourly3
- 1
+ years
+ 1
-
+
hist
- FALSE
+ TRUE
netcdf4c
proc/tseries/hourly1
hourly1
- 1
+ years
+ 1
-
+
hist
- FALSE
+ TRUE
netcdf4c
proc/tseries/min30
min30
- 1
+ years
+ 1
@@ -173,37 +188,41 @@
rof
True
-
+
hist
TRUE
netcdf4c
proc/tseries/monthly
monthly
- 10
+ years
+ 10
-
+
hist
- FALSE
+ TRUE
netcdf4c
proc/tseries/daily
daily
- 1
+ years
+ 5
-
+
hist
- FALSE
+ TRUE
netcdf4c
proc/tseries/hourly6
hourly6
- 1
+ years
+ 1
-
+
hist
- FALSE
+ TRUE
netcdf4c
proc/tseries/hourly3
hourly3
- 1
+ years
+ 1
@@ -223,37 +242,41 @@
rof
True
-
+
hist
TRUE
netcdf4c
proc/tseries/monthly
monthly
- 10
+ years
+ 10
-
+
hist
- FALSE
+ TRUE
netcdf4c
proc/tseries/daily
daily
- 1
+ years
+ 5
-
+
hist
- FALSE
+ TRUE
netcdf4c
proc/tseries/hourly6
hourly6
- 1
+ years
+ 1
-
+
hist
- FALSE
+ TRUE
netcdf4c
proc/tseries/hourly3
hourly3
- 1
+ years
+ 1
@@ -273,13 +296,14 @@
ice
True
-
+
hist
TRUE
netcdf4c
proc/tseries/monthly
monthly
- 10
+ years
+ 10
@@ -292,53 +316,59 @@
ocn
True
-
+
hist
TRUE
netcdf4c
proc/tseries/monthly
monthly
- 10
+ years
+ 10
-
+
hist
- FALSE
+ TRUE
netcdf4c
proc/tseries/daily
daily
- 1
+ years
+ 5
-
+
hist
- FALSE
+ TRUE
netcdf4c
proc/tseries/yearly
yearly
- 10
+ years
+ 10
-
+
hist
FALSE
netcdf4c
undefined
undefined
- 1
+ years
+ 10
-
+
hist
TRUE
netcdf4c
proc/tseries/annual
annual
- 10
+ years
+ 10
-
+
hist
- FALSE
+ TRUE
netcdf4c
proc/tseries/daily
daily
- 1
+ years
+ 5
@@ -351,13 +381,14 @@
glc
True
-
+
hist
- FALSE
+ TRUE
netcdf4c
proc/tseries/monthly
monthly
- 10
+ years
+ 10
@@ -366,13 +397,14 @@
wav
True
-
+
hist
- FALSE
+ TRUE
netcdf4c
proc/tseries/monthly
monthly
- 10
+ years
+ 10
diff --git a/Config/config_timeseries.xsd b/Config/config_timeseries.xsd
index 89ea20ce..2649c79b 100644
--- a/Config/config_timeseries.xsd
+++ b/Config/config_timeseries.xsd
@@ -14,7 +14,8 @@
-
+
+
@@ -26,7 +27,8 @@
-
+
+
diff --git a/cesm_utils/cesm_utils/create_postprocess b/cesm_utils/cesm_utils/create_postprocess
index 786bd9bd..979f78ce 100755
--- a/cesm_utils/cesm_utils/create_postprocess
+++ b/cesm_utils/cesm_utils/create_postprocess
@@ -569,7 +569,7 @@ def main(options):
run_tmpl = 'postprocess.tmpl'
# generate the timeseries batch submit script from template files
- postProcessCmd = 'cesm_tseries_generator.py'
+ postProcessCmd = 'cesm_tseries_generator.py --completechunk 1 '
processName = 'timeseries'
outFile = '{0}/{1}'.format(envDict['PP_CASE_PATH'],processName)
diff --git a/timeseries/setup.py b/timeseries/setup.py
index 67518c35..82e3edfa 100644
--- a/timeseries/setup.py
+++ b/timeseries/setup.py
@@ -50,7 +50,8 @@ def get_dependencies():
author_email="aliceb@ucar.edu",
packages=['timeseries'],
version=get_version(),
- scripts=['timeseries/cesm_tseries_generator.py'],
+ scripts=['timeseries/cesm_tseries_generator.py',
+ 'timeseries/chunking.py'],
#install_requires=get_requires(),
#dependency_links=get_dependencies(),
include_package_data=True,
diff --git a/timeseries/timeseries/cesm_tseries_generator.py b/timeseries/timeseries/cesm_tseries_generator.py
index d7fb5254..ae572496 100755
--- a/timeseries/timeseries/cesm_tseries_generator.py
+++ b/timeseries/timeseries/cesm_tseries_generator.py
@@ -34,6 +34,7 @@
import xml.etree.ElementTree as ET
from cesm_utils import cesmEnvLib
+import chunking
# import the MPI related module
from asaptools import partition, simplecomm, vprinter
@@ -59,6 +60,9 @@ def commandline_options():
parser.add_argument('--caseroot', nargs=1, required=True,
help='fully quailfied path to case root directory')
+ parser.add_argument('--completechunk', nargs=1, type=int, required=False,
+ help='1: do not create incomplete chunks, 0: create an incomplete chunk')
+
parser.add_argument('--standalone', action='store_true',
help='switch to indicate stand-alone post processing caseroot')
@@ -74,7 +78,7 @@ def commandline_options():
#==============================================================================================
# readArchiveXML - read the $CASEROOT/env_timeseries.xml file and build the pyReshaper classes
#==============================================================================================
-def readArchiveXML(caseroot, dout_s_root, casename, standalone, debug):
+def readArchiveXML(caseroot, dout_s_root, casename, standalone, completechunk, debug):
""" reads the $CASEROOT/env_timeseries.xml file and builds a fully defined list of
reshaper specifications to be passed to the pyReshaper tool.
@@ -83,6 +87,7 @@ def readArchiveXML(caseroot, dout_s_root, casename, standalone, debug):
dout_s_root (string) - short term archive root path
casename (string) - casename
standalone (boolean) - logical to indicate if postprocessing case is stand-alone or not
+ completechunk (boolean) - end on a ragid boundary if True. Otherwise, do not create incomplete chunks if False
"""
specifiers = list()
xml_tree = ET.ElementTree()
@@ -90,6 +95,9 @@ def readArchiveXML(caseroot, dout_s_root, casename, standalone, debug):
# get path to env_timeseries.xml file
env_timeseries = '{0}/env_timeseries.xml'.format(caseroot)
+ # read tseries log file to see if we've already started converting files, if so, where did we leave off
+ log = chunking.read_log('ts_status.log')
+
# check if the env_timeseries.xml file exists
if ( not os.path.isfile(env_timeseries) ):
err_msg = "cesm_tseries_generator.py ERROR: {0} does not exist.".format(env_timeseries)
@@ -159,57 +167,40 @@ def readArchiveXML(caseroot, dout_s_root, casename, standalone, debug):
# get a list of all the input files for this stream from the archive location
history_files = list()
in_file_path = '/'.join( [dout_s_root,rootdir,subdir] )
- all_in_files = os.listdir(in_file_path)
-
- # check that there are actually a list of history files to work with
- for in_file in all_in_files:
- if re.search(file_extension, in_file):
- # check to make sure this file ends in .nc and not something else
- if in_file.endswith('.nc'):
- history_files.append(in_file_path+"/"+in_file)
- else:
- print('cesm_tseries_generator.py WARNING - unable to operate on file {0}/{1}'.format(in_file_path,in_file))
-
- # sort the list of input history files in order to get the output suffix
- # from the first and last file
- if len(history_files) > 0:
- history_files.sort()
-
- start_file = history_files[0]
- start_file_parts = list()
- start_file_parts = start_file.split( "." )
- start_file_time = start_file_parts[-2]
-
- last_file = history_files[-1]
- last_file_parts = list()
- last_file_parts = last_file.split( "." )
- last_file_time = last_file_parts[-2]
-
- # get the actual component name from the history file
- # will also need to deal with the instance numbers based on the comp_name
- comp_name = last_file_parts[-4]
- stream = last_file_parts[-3]
-
- # check for pop.h nday1 and nyear1 history streams
- if last_file_parts[-3] in ["nday1","nyear1"]:
- comp_name = last_file_parts[-5]
- stream = last_file_parts[-4]+"."+last_file_parts[-3]
+
+ if file_spec.find("tseries_filecat_tper") is not None:
+ tper = file_spec.find("tseries_filecat_tper").text
+ if file_spec.find("tseries_filecat_n") is not None:
+ size = file_spec.find("tseries_filecat_n").text
+ comp_name = comp
+ stream = file_extension.split('.[')[0]
+
+ stream_dates,file_slices,cal,units = chunking.get_input_dates(in_file_path+'/*'+file_extension+'*.nc')
+ if comp+stream not in log.keys():
+ log[comp+stream] = {'slices':[],'index':0}
+ ts_log_dates = log[comp+stream]['slices']
+ index = log[comp+stream]['index']
+ files,dates,index = chunking.get_chunks(tper, index, size, stream_dates, ts_log_dates, cal, units, completechunk)
+ for d in dates:
+ log[comp+stream]['slices'].append(float(d))
+ log[comp+stream]['index']=index
+ for cn,cf in files.iteritems():
+
+ history_files = cf['fn']
+ start_time_parts = cf['start']
+ last_time_parts = cf['end']
# create the tseries output prefix needs to end with a "."
- tseries_output_prefix = tseries_output_dir+"/"+casename+"."+comp_name+"."+stream+"."
+ tseries_output_prefix = tseries_output_dir+"/"+casename+"."+comp_name+stream+"."
# format the time series variable output suffix based on the
# tseries_tper setting suffix needs to start with a "."
if tseries_tper == "yearly":
- tseries_output_suffix = "."+start_file_time+"-"+last_file_time+".nc"
+ tseries_output_suffix = "."+start_time_parts[0]+"-"+last_time_parts[0]+".nc"
elif tseries_tper == "monthly":
- start_time_parts = start_file_time.split( "-" )
- last_time_parts = last_file_time.split( "-" )
- tseries_output_suffix = "."+start_time_parts[0]+start_time_parts[1]+"-"+last_time_parts[0]+last_time_parts[1]+".nc"
- elif tseries_tper in ["weekly","daily","hourly6","hourly3","hourly1","min30"]:
- start_time_parts = start_file_time.split( "-" )
- last_time_parts = last_file_time.split( "-" )
tseries_output_suffix = "."+start_time_parts[0]+start_time_parts[1]+start_time_parts[2]+"-"+last_time_parts[0]+last_time_parts[1]+last_time_parts[2]+".nc"
+ elif tseries_tper in ["weekly","daily","hourly6","hourly3","hourly1","min30"]:
+ tseries_output_suffix = "."+start_time_parts[0]+start_time_parts[1]+start_time_parts[2]+start_time_parts[3]+"-"+last_time_parts[0]+last_time_parts[1]+last_time_parts[2]+last_time_parts[3]+".nc"
# get a reshaper specification object
spec = specification.create_specifier()
@@ -230,9 +221,53 @@ def readArchiveXML(caseroot, dout_s_root, casename, standalone, debug):
# append this spec to the list of specifiers
specifiers.append(spec)
+ return specifiers,log
+
- return specifiers
+def divide_comm(scomm, l_spec):
+
+ '''
+ Divide the communicator into subcommunicators, leaving rank one to hand out reshaper jobs to run.
+ The part of the parallelization will be handled by this script, with the parallelization now over
+ CESM output streams and chunks. The reshaper will the compute the variables in parallel.
+ Input:
+ scomm (simplecomm) - communicator to be divided (currently MIP_COMM_WORLD)
+ l_spec(int) - the number of reshaper specifiers(# of output stream and # of chunks)
+
+ Output:
+ inter_comm(simplecomm) - this rank's subcommunicator it belongs to
+ num_of_groups(int) - the total number of subcommunicators
+ '''
+ min_procs_per_spec = 16
+ size = scomm.get_size()
+ rank = scomm.get_rank()-1
+
+ # the global master needs to be in its own subcommunicator
+ # ideally it would not be in any, but the divide function
+ # requires all ranks to participate in the call
+ if rank == -1:
+ group = ((size/min_procs_per_spec)%l_spec)+1
+ if l_spec == 1:
+ num_of_groups = 1
+ else:
+ num_of_groups = (size/min_procs_per_spec)
+ else:
+ temp_color = (rank // min_procs_per_spec) % l_spec
+ if l_spec == 1:
+ num_of_groups = 1
+ else:
+ num_of_groups = (size/min_procs_per_spec)
+ if (temp_color == num_of_groups):
+ temp_color = temp_color - 1
+ groups = []
+ for g in range(0,num_of_groups+1):
+ groups.append(g)
+ group = groups[temp_color]
+
+ inter_comm,multi_comm = scomm.divide(group)
+
+ return inter_comm,num_of_groups
#======
# main
#======
@@ -258,23 +293,60 @@ def main(options, scomm, rank, size):
# loading the specifiers from the env_timeseries.xml only needs to run on the master task (rank=0)
if rank == 0:
- specifiers = readArchiveXML(caseroot, cesmEnv['DOUT_S_ROOT'], cesmEnv['CASE'], options.standalone, debug)
+ dout_s_root = cesmEnv['DOUT_S_ROOT']
+ case = cesmEnv['CASE']
+ specifiers,log = readArchiveXML(caseroot, dout_s_root, case, options.standalone, options.completechunk[0], debug)
scomm.sync()
# specifiers is a list of pyreshaper specification objects ready to pass to the reshaper
specifiers = scomm.partition(specifiers, func=partition.Duplicate(), involved=True)
- # create the PyReshaper object - uncomment when multiple specifiers is allowed
- reshpr = reshaper.create_reshaper(specifiers, serial=False, verbosity=debug)
-
- # Run the conversion (slice-to-series) process
- reshpr.convert()
+ if len(specifiers) > 0:
+ # setup subcommunicators to do streams and chunks in parallel
+ # everyone participates except for root
+ inter_comm, lsubcomms = divide_comm(scomm, len(specifiers))
+ color = inter_comm.get_color()
+ lsize = inter_comm.get_size()
+ lrank = inter_comm.get_rank()
+
+ GWORK_TAG = 10 # global comm mpi tag
+ LWORK_TAG = 20 # local comm mpi tag
+ # global root - hands out specifiers to work on. When complete, it must tell each subcomm all work is done.
+ if (rank == 0):
+ for i in range(0,len(specifiers)): # hand out all specifiers
+ scomm.ration(data=i, tag=GWORK_TAG)
+ for i in range(0,lsubcomms): # complete, signal this to all subcomms
+ scomm.ration(data=-99, tag=GWORK_TAG)
+
+ # subcomm root - performs the same tasks as other subcomm ranks, but also gets the specifier to work on and sends
+ # this information to all ranks within subcomm
+ elif (lrank == 0):
+ i = -999
+ while i != -99:
+ i = scomm.ration(tag=GWORK_TAG) # recv from global
+ for x in range(1,lsize):
+ inter_comm.ration(i, LWORK_TAG) # send to local ranks
+ if i != -99:
+ # create the PyReshaper object - uncomment when multiple specifiers is allowed
+ reshpr = reshaper.create_reshaper(specifiers[i], serial=False, verbosity=debug, simplecomm=inter_comm)
+ # Run the conversion (slice-to-series) process
+ reshpr.convert()
+
+ # all subcomm ranks - recv the specifier to work on and call the reshaper
+ else:
+ i = -999
+ while i != -99:
+ i = inter_comm.ration(tag=LWORK_TAG) # recv from local root
+ if i != -99:
+ # create the PyReshaper object - uncomment when multiple specifiers is allowed
+ reshpr = reshaper.create_reshaper(specifiers[i], serial=False, verbosity=debug, simplecomm=inter_comm)
+ # Run the conversion (slice-to-series) process
+ reshpr.convert()
- # Print timing diagnostics
- reshpr.print_diagnostics()
-
- return 0
+ if rank == 0:
+ chunking.write_log('ts_status.log', log) # Update system log with the dates that were just converted
+ scomm.sync()
#===================================
if __name__ == "__main__":
@@ -290,18 +362,9 @@ def main(options, scomm, rank, size):
if rank == 0:
print('cesm_tseries_generator INFO Running on {0} cores'.format(size))
- try:
- status = main(options, scomm, rank, size)
- if rank == 0:
- print('************************************************************')
- print('Successfully completed generating variable time-series files')
- print('************************************************************')
- sys.exit(status)
-
-## except RunTimeError as error:
-
- except Exception as error:
- print(str(error))
- if options.backtrace:
- traceback.print_exc()
- sys.exit(1)
+ main(options, scomm, rank, size)
+ if rank == 0:
+ print('************************************************************')
+ print('Successfully completed generating variable time-series files')
+ print('************************************************************')
+
diff --git a/timeseries/timeseries/chunking.py b/timeseries/timeseries/chunking.py
new file mode 100755
index 00000000..6528b812
--- /dev/null
+++ b/timeseries/timeseries/chunking.py
@@ -0,0 +1,261 @@
+#!/usr/bin/env python2
+
+import glob, json, os
+import netCDF4 as nc
+import cf_units
+import datetime
+
+def get_input_dates(glob_str):
+
+ '''
+ Open up all of the files that match the search string and get
+ the dates within the files. Also get the number of slices within
+ each file, what calendar it uses and the time unit.
+
+ Input:
+ glob_str(string) - the search path to get files
+
+ Output:
+ stream_dates(dictionary) - keys->date, values->the file where this slice is located
+ file_slices(dictionary) - keys->filename, values->the number of slices found in the file
+ calendar(string) - the name of the calendar type (ie, noleap, ...)
+ units(string) - the calendar unit (possibly in the form 'days since....')
+ '''
+ stream_files = glob.glob(glob_str)
+
+ stream_dates = {}
+ file_slices = {}
+ att = {}
+ print glob_str
+
+ if len(stream_files) < 1:
+ return stream_dates, file_slices, None, None
+
+ for fn in sorted(stream_files):
+ print 'opening ',fn
+ # open file and get time dimension
+ f = nc.Dataset(fn,"r")
+ all_t = f.variables['time']
+
+ # add the file name are how many slices it contains
+ file_slices[fn] = len(all_t)
+
+ # add all dates and which file they are located in
+ for t in all_t:
+ stream_dates[t] = fn
+
+ # get all attributes of time in order to get cal and units
+ for a in all_t.ncattrs():
+ att[a] = all_t.__getattribute__(a)
+
+ return stream_dates,file_slices,att['calendar'],att['units']
+
+def get_cesm_date(fn,t=None):
+
+ '''
+ Open a netcdf file and return its datestamp
+
+ Input:
+ fn(string) - the filename to get date from
+ t(string) - string indicating if the file is a beiginning or end (optional)
+
+ Output:
+ an array that includes year,month,day,hour is string format with correct number of digits
+ '''
+
+ f = nc.Dataset(fn,"r")
+ all_t = f.variables['time']
+
+ att={}
+ for a in all_t.ncattrs():
+ att[a] = all_t.__getattribute__(a)
+
+ if ('bounds' in att.keys()):
+ if t == 'b':
+ d = f.variables[att['bounds']][0][0]
+ elif t == 'e':
+ l = len(f.variables[att['bounds']])
+ d = (f.variables[att['bounds']][l-1][1])-1
+ elif t == 'ee':
+ l = len(f.variables[att['bounds']])
+ d = (f.variables[att['bounds']][l-1][1])
+ else:
+ d = f.variables['time'][1]
+
+ d1 = cf_units.num2date(d,att['units'],att['calendar'])
+
+ return [str(d1.year).zfill(4),str(d1.month).zfill(2),str(d1.day).zfill(2),str(d1.hour).zfill(2)]
+
+
+def get_chunk_range(tper, size, start, cal, units):
+
+ '''
+ Figures out the end date of the chunk based on the start date
+
+ Input:
+ tper(string) - the time period to use when figuring out chunk size (year, month, day, hour)
+ size(int) - the size of the chunk used in coordination with tper
+ start(float) - the date stamp to start count from
+ cal(string) - the calendar to use to figure out chunk size
+ units(string) - the units to use to figure out chunk size
+
+ Output:
+ start(float) - the start date of the chunk
+ end(float) - the end date of the chunk
+ '''
+
+ # Get the first date
+ d1 = cf_units.num2date(start, units, cal)
+
+ # Figure out how many days each chunk should be
+ if 'day' in tper: #day
+ end = float(start) + int(size)
+
+ elif 'hour' in tper: #hour
+ end = (float(size)/24.0) + float(start)
+
+ elif 'month' in tper: #month
+ m2 = (int(d1.month)+(int(size)%12))
+ y2 = (int(d1.year)+(int(size)/12))
+ if m2 > 12:
+ y2 = y2 + 1
+ m2 = m2 - 12
+ d2 = datetime.datetime(y2, m2, d1.day, d1.hour, d1.minute)
+ end = cf_units.date2num(d2, units, cal)
+
+ elif 'year' in tper: #year
+ d2 = datetime.datetime(int(size)+d1.year, d1.month, d1.day, d1.hour, d1.minute)
+ end = cf_units.date2num(d2, units, cal)
+
+ return start, end
+
+def get_chunks(tper, index, size, stream_dates, ts_log_dates, cal, units, s):
+
+ '''
+ Figure out what chunks there are to do for a particular CESM output stream
+
+ Input:
+ tper(string) - the time period to use when figuring out chunk size (year, month, day, hour)
+ index(int) - an integer indicating which index in the tper and size list to start from.
+ this option gives users to specify different chunk sizes.
+ size(int) - the size of the chunk used in coordination with tper
+ stream_dates(dictionary) - keys->date, values->the file where this slice is located
+ ts_log_dates(list) - a list of all of the dates that have been converted already - used to
+ avoid duplication
+ cal(string) - the calendar to use to figure out chunk size
+ units(string) - the units to use to figure out chunk size
+ s(string) - flag to determine if we need to wait until we have all data before we create a chunk or
+ if it's okay to do an incomplete chunk
+
+ Output:
+ files(dictionary) - keys->chunk, values->a list of all files needed for this chunk and the start and end dates
+ dates(list) - all of the dates that will be in this chunk
+ index(int) - the last index to be used in the tper and size list
+ '''
+
+ # remove the times in ts_log_dates from stream_dates because
+ # these have already been created
+ du = cf_units.Unit(units)
+ to_do = []
+ for d in sorted(stream_dates.keys()):
+ if d not in ts_log_dates:
+ to_do.append(d)
+
+ files = {}
+ dates = []
+ i = 0
+ e = False
+ chunk_n = 0
+
+ tper_list = tper.split(",")
+ size_list = size.split(",")
+ if len(tper_list) != len(size_list):
+ print 'Error: The length of requested time periods for chunks does not match the length of requested chunk sizes', tper_list, size_list
+
+ if len(to_do)>1:
+ while e is False:
+ # get the new range
+ start,end = get_chunk_range(tper_list[index], size_list[index], to_do[i], cal, units)
+ if index != len(tper_list)-1:
+ index = index + 1
+ #walk through and map dates within this range to files
+ cfiles = []
+ cdates = []
+ while to_do[i] < end and e is False:
+ fn = stream_dates[to_do[i]]
+ if fn not in cfiles:
+ cfiles.append(fn)
+ cdates.append(to_do[i])
+ print 'adding ',fn,to_do[i]
+ i = i + 1
+ # am I passed the dates I have? If so, exit loop and don't add to list.
+ # these will be converted when more data exists
+ if i >= len(to_do)-1:
+ fn = stream_dates[to_do[i]]
+ if fn not in cfiles:
+ cfiles.append(fn)
+ cdates.append(to_do[i])
+ print 'adding ',fn,to_do[i]
+ if s==1:
+ print '#################################'
+ print 'Not appending: '
+ print cfiles
+ print 'dates:(',cdates,')'
+ print '#################################'
+ else: # user indicated that they would like to end with an incomplete chunk
+ files[chunk_n] = {}
+ files[chunk_n]['fn'] = sorted(cfiles)
+ files[chunk_n]['start'] = get_cesm_date(cfiles[0],t='b')
+ files[chunk_n]['end'] = get_cesm_date(cfiles[-1],t='e')
+ for cd in sorted(cdates):
+ dates.append(cd)
+ e = True
+ # Have a complete set. Append file and date info.
+ if e is False:
+ files[chunk_n] = {}
+ s_cdates = sorted(cdates)
+ files[chunk_n]['fn'] = sorted(cfiles)
+ files[chunk_n]['start'] = get_cesm_date(cfiles[0],t='b')
+ files[chunk_n]['end'] = get_cesm_date(cfiles[-1],t='e')
+ for cd in sorted(cdates):
+ dates.append(cd)
+ chunk_n = chunk_n+1
+
+ return files, dates, index
+
+
+def write_log(log_fn, log):
+
+ '''
+ Write (or append) a json file with the date stamps that have been coverted
+ and what tper and size list index to next start on.
+
+ Input:
+ log_fn(string) - the name of the log file to write to
+ log(dictionary) - keys->file streams, values->the dates that have been
+ converted and next indexes to use for size and tper lists
+ '''
+ with open(log_fn, 'w') as f:
+ json.dump(log, f)
+
+
+def read_log(log_fn):
+
+ '''
+ Read in the json log file in order to know which files have already been converted
+
+ Input:
+ log_fn(string) - the name of the log file to write to
+
+ Output:
+ d(dictionary) - will be empty if it the file doesn't exist or contain a dictionary
+ with keys->file streams, values->the dates that have been
+ converted and next indexes to use for size and tper lists
+ '''
+ if os.path.isfile(log_fn):
+ with open(log_fn, 'r') as f:
+ d = json.load(f)
+ return d
+ else:
+ return {}
+