Skip to content

Commit

Permalink
Add try except to catch det_match failures (#1077)
Browse files Browse the repository at this point in the history
* error check for get_groups

* further fixes

* fix for multilayer_preprocess_tod

* fix get_groups

* fix group order

* revert to dict

* performance improvements

* extra fixes

* updates for handling returns

* add check if temp dir exists

* fix logger

* updates for multilayer preproc

* rearrange group removal

* address comments

* fix space

* fix typo

* improve docstring

---------

Co-authored-by: Michael McCrackan <[email protected]>
Co-authored-by: Michael McCrackan <[email protected]>
  • Loading branch information
3 people authored Jan 3, 2025
1 parent 6f04c1c commit 86d99c2
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 109 deletions.
105 changes: 83 additions & 22 deletions sotodlib/preprocess/preprocess_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,19 +176,25 @@ def get_groups(obs_id, configs, context):
groups : list of list of int
The list of groups of detectors.
"""
group_by = np.atleast_1d(configs['subobs'].get('use', 'detset'))
for i, gb in enumerate(group_by):
if gb.startswith('dets:'):
group_by[i] = gb.split(':',1)[1]

if (gb == 'detset') and (len(group_by) == 1):
groups = context.obsfiledb.get_detsets(obs_id)
return group_by, [[g] for g in groups]

det_info = context.get_det_info(obs_id)
rs = det_info.subset(keys=group_by).distinct()
groups = [[b for a,b in r.items()] for r in rs]
return group_by, groups
try:
group_by = np.atleast_1d(configs['subobs'].get('use', 'detset'))
for i, gb in enumerate(group_by):
if gb.startswith('dets:'):
group_by[i] = gb.split(':',1)[1]

if (gb == 'detset') and (len(group_by) == 1):
groups = context.obsfiledb.get_detsets(obs_id)
return group_by, [[g] for g in groups], None

det_info = context.get_det_info(obs_id)
rs = det_info.subset(keys=group_by).distinct()
groups = [[b for a,b in r.items()] for r in rs]
return group_by, groups, None
except Exception as e:
error = f'Failed get groups for: {obs_id}'
errmsg = f'{type(e)}: {e}'
tb = ''.join(traceback.format_tb(e.__traceback__))
return [], [], [error, errmsg, tb]


def get_preprocess_db(configs, group_by, logger=None):
Expand Down Expand Up @@ -388,8 +394,14 @@ def multilayer_load_and_preprocess(obs_id, configs_init, configs_proc,
configs_proc, context_proc = get_preprocess_context(configs_proc, context_proc)
meta_proc = context_proc.get_meta(obs_id, dets=dets, meta=meta)

group_by_init, groups_init = get_groups(obs_id, configs_init, context_init)
group_by_proc, groups_proc = get_groups(obs_id, configs_proc, context_proc)
group_by_init, groups_init, error_init = get_groups(obs_id, configs_init, context_init)
group_by_proc, groups_proc, error_proc = get_groups(obs_id, configs_proc, context_proc)

if error_init is not None:
raise ValueError(f"{error_init[0]}\n{error_init[1]}\n{error_init[2]}")

if error_proc is not None:
raise ValueError(f"{error_proc[0]}\n{error_proc[1]}\n{error_proc[2]}")

if (group_by_init != group_by_proc).any():
raise ValueError('init and proc groups do not match')
Expand Down Expand Up @@ -451,7 +463,7 @@ def find_db(obs_id, configs, dets, context=None, logger=None):
configs = yaml.safe_load(open(configs, "r"))
if context is None:
context = core.Context(configs["context_file"])
group_by, _ = get_groups(obs_id, configs, context)
group_by, _, _ = get_groups(obs_id, configs, context)
cur_groups = [list(np.fromiter(dets.values(), dtype='<U32'))]
dbexist = True
if os.path.exists(configs['archive']['index']):
Expand Down Expand Up @@ -560,7 +572,7 @@ def save_group_and_cleanup(obs_id, configs, context=None, subdir='temp',
if context is None:
context = core.Context(configs["context_file"])

group_by, groups = get_groups(obs_id, configs, context)
group_by, groups, error = get_groups(obs_id, configs, context)

all_groups = groups.copy()
for g in all_groups:
Expand All @@ -583,6 +595,49 @@ def save_group_and_cleanup(obs_id, configs, context=None, subdir='temp',
except OSError as e:
# remove if it can't be opened
os.remove(outputs_grp['temp_file'])
return error


def cleanup_obs(obs_id, policy_dir, errlog, configs, context=None,
subdir='temp', remove=False):
"""
For a given obs id, this function will search the policy_dir directory
if it exists for any files with that obsnum in their filename. If any are
found, it will run save_group_and_cleanup for that obs id.
Arguments
---------
obs_id: str
Obs id to check and clean up
policy_dir: str
Directory to temp per-group output files
errlog: fpath
Filepath to error logging file.
configs: fpath or dict
Filepath or dictionary containing the preprocess configuration file.
context: core.Context
Optional. Context object used for data loading/querying.
subdir: str
Optional. Subdirectory to save the output files into.
remove: bool
Optional. Default is False. Whether to remove a file if found.
Used when ``overwrite`` is True in driving functions.
"""

if os.path.exists(policy_dir):
found = False
for f in os.listdir(policy_dir):
if obs_id in f:
found = True
break

if found:
error = save_group_and_cleanup(obs_id, configs, context,
subdir=subdir, remove=remove)
if error is not None:
f = open(errlog, 'a')
f.write(f'\n{time.time()}, cleanup error\n{error[0]}\n{error[2]}\n')
f.close()


def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger=None,
Expand Down Expand Up @@ -657,9 +712,12 @@ def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger=
if context_proc is None:
context_proc = core.Context(configs_proc["context_file"])

group_by, groups = get_groups(obs_id, configs_proc, context_proc)
group_by, groups, error = get_groups(obs_id, configs_proc, context_proc)
else:
group_by, groups = get_groups(obs_id, configs_init, context_init)
group_by, groups, error = get_groups(obs_id, configs_init, context_init)

if error is not None:
return error[0], [error[1], error[2]], [error[1], error[2]], None

all_groups = groups.copy()
cur_groups = [list(np.fromiter(dets.values(), dtype='<U32'))]
Expand All @@ -674,11 +732,13 @@ def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger=
error = 'no_group_overlap'
return error, [obs_id, dets], [obs_id, dets], None

db_init_exist = find_db(obs_id, configs_init, dets, context_init)
db_init_exist = find_db(obs_id, configs_init, dets, context_init,
logger=logger)

db_proc_exist = False
if configs_proc is not None:
db_proc_exist = find_db(obs_id, configs_proc, dets, context_proc)
db_proc_exist = find_db(obs_id, configs_proc, dets, context_proc,
logger=logger)

if (not db_init_exist) and db_proc_exist and (not overwrite):
logger.info('dependent db requires initial db if not overwriting')
Expand Down Expand Up @@ -882,7 +942,8 @@ def cleanup_mandb(error, outputs, configs, logger=None, overwrite=False):
errlog = os.path.join(folder, 'errlog.txt')
f = open(errlog, 'a')
f.write(f'{time.time()}, {error}\n')
f.write(f'\t{outputs[0]}\n\t{outputs[1]}\n')
if outputs is not None:
f.write(f'\t{outputs[0]}\n\t{outputs[1]}\n')
f.close()


Expand Down
99 changes: 40 additions & 59 deletions sotodlib/site_pipeline/multilayer_preprocess_tod.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,30 @@ def multilayer_preprocess_tod(obs_id,
configs_proc = yaml.safe_load(open(configs_proc, "r"))
context_proc = core.Context(configs_proc["context_file"])

group_by_proc, groups_proc = pp_util.get_groups(obs_id, configs_proc, context_proc)
group_by_init, groups_init, error_init = pp_util.get_groups(obs_id, configs_init, context_init)
group_by_proc, groups_proc, error_proc = pp_util.get_groups(obs_id, configs_proc, context_proc)

if error_init is not None:
if run_parallel:
return error_init[0], [None, None], [None, None]
else:
return

if error_proc is not None:
if run_parallel:
return error_proc[0], [None, None], [None, None]
else:
return

if len(groups_init) > 0 and len(groups_proc) > 0:
if (group_by_init != group_by_proc).any():
raise ValueError('init and proc groups do not match')

all_groups_proc = groups_proc.copy()
for g in all_groups_proc:
if g not in groups_init:
groups_proc.remove(g)
continue
if group_list is not None:
if g not in group_list:
groups_proc.remove(g)
Expand Down Expand Up @@ -281,76 +301,37 @@ def main(configs_init: str,
logger.warning(f"No observations returned from query: {query}")

# clean up lingering files from previous incomplete runs
policy_dir_init = os.path.join(os.path.dirname(configs_init['archive']['policy']['filename']), 'temp')
policy_dir_proc = os.path.join(os.path.dirname(configs_proc['archive']['policy']['filename']), 'temp_proc')
for obs in obs_list:
obs_id = obs['obs_id']
pp_util.save_group_and_cleanup(obs_id, configs_init, context_init,
subdir='temp', remove=overwrite)
pp_util.save_group_and_cleanup(obs_id, configs_proc, context_proc,
subdir='temp_proc', remove=overwrite)
pp_util.cleanup_obs(obs_id, policy_dir_init, errlog, configs_init, context_init,
subdir='temp', remove=overwrite)
pp_util.cleanup_obs(obs_id, policy_dir_proc, errlog, configs_proc, context_proc,
subdir='temp_proc', remove=overwrite)

run_list = []

if overwrite or not os.path.exists(configs_proc['archive']['index']):
# run on all if database doesn't exist
for obs in obs_list:
group_by_init, groups_init = pp_util.get_groups(obs["obs_id"], configs_init, context_init)
group_by_proc, groups_proc = pp_util.get_groups(obs["obs_id"], configs_proc, context_proc)

if (group_by_init != group_by_proc).any():
raise ValueError('init and proc groups do not match')

all_groups_proc = groups_proc.copy()
for g in all_groups_proc:
if g not in groups_init:
groups_proc.remove(g)

run_list.append( (obs, groups_proc) )
#run on all if database doesn't exist
run_list = [ (o,None) for o in obs_list]
group_by_proc = np.atleast_1d(configs_proc['subobs'].get('use', 'detset'))
else:
db = core.metadata.ManifestDb(configs_proc['archive']['index'])
for obs in obs_list:
x = db.inspect({'obs:obs_id': obs["obs_id"]})
group_by_init, groups_init = pp_util.get_groups(obs["obs_id"], configs_init, context_init)
group_by_proc, groups_proc = pp_util.get_groups(obs["obs_id"], configs_proc, context_proc)

if (group_by_init != group_by_proc).any():
raise ValueError('init and proc groups do not match')

all_groups_proc = groups_proc.copy()
for g in all_groups_proc:
if g not in groups_init:
groups_proc.remove(g)

if x is None or len(x) == 0:
run_list.append( (obs, groups_proc) )
elif len(x) != len(groups_proc):
[groups_proc.remove([a[f'dets:{gb}'] for gb in group_by_proc]) for a in x]
run_list.append( (obs, groups_proc) )
run_list.append( (obs, None) )
else:
group_by_proc, groups_proc, _ = pp_util.get_groups(obs["obs_id"], configs_proc, context_proc)
if len(x) != len(groups_proc):
[groups_proc.remove([a[f'dets:{gb}'] for gb in group_by_proc]) for a in x]
run_list.append( (obs, groups_proc) )

logger.info(f'Run list created with {len(run_list)} obsids')

# Expects archive policy filename to be <path>/<filename>.h5 and then this adds
# <path>/<filename>_<xxx>.h5 where xxx is a number that increments up from 0
# whenever the file size exceeds 10 GB.
nfile_init = 0
folder_init = os.path.dirname(configs_init['archive']['policy']['filename'])
basename_init = os.path.splitext(configs_init['archive']['policy']['filename'])[0]
dest_file_init = basename_init + '_' + str(nfile_init).zfill(3) + '.h5'
if not(os.path.exists(folder_init)):
os.makedirs(folder_init)
while os.path.exists(dest_file_init) and os.path.getsize(dest_file_init) > 10e9:
nfile_init += 1
dest_file_init = basename_init + '_' + str(nfile_init).zfill(3) + '.h5'

nfile_proc = 0
folder_proc = os.path.dirname(configs_proc['archive']['policy']['filename'])
basename_proc = os.path.splitext(configs_proc['archive']['policy']['filename'])[0]
dest_file_proc = basename_proc + '_' + str(nfile_proc).zfill(3) + '.h5'
if not(os.path.exists(folder_proc)):
os.makedirs(folder_proc)
while os.path.exists(dest_file_proc) and os.path.getsize(dest_file_proc) > 10e9:
nfile_proc += 1
dest_file_proc = basename_proc + '_' + str(nfile_proc).zfill(3) + '.h5'

# run write_block obs-ids in parallel at once then write all to the sqlite db.
with ProcessPoolExecutor(nproc) as exe:
futures = [exe.submit(multilayer_preprocess_tod, obs_id=r[0]['obs_id'],
Expand All @@ -372,12 +353,12 @@ def main(configs_init: str,
continue
futures.remove(future)

if err is None:
if db_datasets_init:
logger.info(f'Processing future result db_dataset: {db_datasets_init}')
if db_datasets_init:
for db_dataset in db_datasets_init:
pp_util.cleanup_mandb(err, db_dataset, configs_init, logger, overwrite)
for db_dataset in db_datasets_init:
pp_util.cleanup_mandb(err, db_dataset, configs_init, logger, overwrite)

if db_datasets_proc:
logger.info(f'Processing future dependent result db_dataset: {db_datasets_proc}')
for db_dataset in db_datasets_proc:
pp_util.cleanup_mandb(err, db_dataset, configs_proc, logger, overwrite)
Expand Down
49 changes: 21 additions & 28 deletions sotodlib/site_pipeline/preprocess_tod.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def dummy_preproc(obs_id, group_list, logger,
error = None
outputs = []
context = core.Context(configs["context_file"])
group_by, groups = pp_util.get_groups(obs_id, configs, context)
group_by, groups, error = pp_util.get_groups(obs_id, configs, context)
pipe = Pipeline(configs["process_pipe"], plot_dir=configs["plot_dir"], logger=logger)
for group in groups:
logger.info(f"Beginning run for {obs_id}:{group}")
Expand Down Expand Up @@ -83,7 +83,14 @@ def preprocess_tod(obs_id,
configs = yaml.safe_load(open(configs, "r"))

context = core.Context(configs["context_file"])
group_by, groups = pp_util.get_groups(obs_id, configs, context)
group_by, groups, error = pp_util.get_groups(obs_id, configs, context)

if error is not None:
if run_parallel:
return error[0], [None, None]
else:
return

all_groups = groups.copy()
for g in all_groups:
if group_list is not None:
Expand Down Expand Up @@ -325,46 +332,32 @@ def main(
logger.warning(f"No observations returned from query: {query}")

# clean up lingering files from previous incomplete runs
policy_dir = os.path.join(os.path.dirname(configs['archive']['policy']['filename']), 'temp')
for obs in obs_list:
obs_id = obs['obs_id']
pp_util.save_group_and_cleanup(obs_id, configs, context,
subdir='temp', remove=overwrite)
pp_util.cleanup_obs(obs_id, policy_dir, errlog, configs, context,
subdir='temp', remove=overwrite)

run_list = []

if overwrite or not os.path.exists(configs['archive']['index']):
#run on all if database doesn't exist
for obs in obs_list:
group_by, groups = pp_util.get_groups(obs["obs_id"], configs, context)
run_list.append( (obs, groups) )# = [ (o, groups) for o in obs_list]
run_list = [ (o,None) for o in obs_list]
group_by = np.atleast_1d(configs['subobs'].get('use', 'detset'))
else:
db = core.metadata.ManifestDb(configs['archive']['index'])
for obs in obs_list:
x = db.inspect({'obs:obs_id': obs["obs_id"]})
group_by, groups = pp_util.get_groups(obs["obs_id"], configs, context)
if x is None or len(x) == 0:
run_list.append( (obs, None) )
elif len(x) != len(groups):
[groups.remove([a[f'dets:{gb}'] for gb in group_by]) for a in x]
run_list.append( (obs, groups) )
else:
group_by, groups, _ = pp_util.get_groups(obs["obs_id"], configs, context)
if len(x) != len(groups):
[groups.remove([a[f'dets:{gb}'] for gb in group_by]) for a in x]
run_list.append( (obs, groups) )

logger.info(f'Run list created with {len(run_list)} obsids')

# Expects archive policy filename to be <path>/<filename>.h5 and then this adds
# <path>/<filename>_<xxx>.h5 where xxx is a number that increments up from 0
# whenever the file size exceeds 10 GB.
nfile = 0
folder = os.path.dirname(configs['archive']['policy']['filename'])
basename = os.path.splitext(configs['archive']['policy']['filename'])[0]
dest_file = basename + '_' + str(nfile).zfill(3) + '.h5'
if not(os.path.exists(folder)):
os.makedirs(folder)
while os.path.exists(dest_file) and os.path.getsize(dest_file) > 10e9:
nfile += 1
dest_file = basename + '_' + str(nfile).zfill(3) + '.h5'

logger.info(f'Starting dest_file set to {dest_file}')

# Run write_block obs-ids in parallel at once then write all to the sqlite db.
with ProcessPoolExecutor(nproc) as exe:
futures = [exe.submit(preprocess_tod, obs_id=r[0]['obs_id'],
Expand All @@ -385,8 +378,8 @@ def main(
continue
futures.remove(future)

logger.info(f'Processing future result db_dataset: {db_datasets}')
if err is None and db_datasets:
if db_datasets:
logger.info(f'Processing future result db_dataset: {db_datasets}')
for db_dataset in db_datasets:
pp_util.cleanup_mandb(err, db_dataset, configs, logger)

Expand Down

0 comments on commit 86d99c2

Please sign in to comment.