Skip to content

Commit

Permalink
Started reworking offline dq workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
maxtrevor committed Dec 12, 2023
1 parent ba7206c commit fa61b73
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 161 deletions.
20 changes: 8 additions & 12 deletions bin/workflows/pycbc_make_offline_search_workflow
Original file line number Diff line number Diff line change
Expand Up @@ -191,18 +191,14 @@ insps = wf.merge_single_detector_hdf_files(workflow, hdfbank,
# 'statfiles' is list of files used in calculating statistic
# 'dqfiles' is the subset of files containing data quality information
statfiles = []
dqfiles = []
dqfile_labels = []
dq_labels = workflow.cp.get_subsections('workflow-data_quality')
for dq_label in dq_labels:
dq_label_files = wf.setup_dq_reranking(workflow, dq_label, insps, hdfbank,
analyzable_segs, analyzable_file,
dq_segment_file,
output_dir='dq',
tags=['full_data'])
statfiles += dq_label_files
dqfiles += dq_label_files
dqfile_labels += len(dq_label_files) * [dq_label]

dqfiles, dqfile_labels = wf.setup_dq_reranking(workflow, insps,
hdfbank, analyzable_segs,
analyzable_file,
dq_segment_file,
output_dir='dq',
tags=['full_data'])
statfiles += dqfiles

statfiles += wf.setup_trigger_fitting(workflow, insps, hdfbank,
final_veto_file, final_veto_name,
Expand Down
297 changes: 148 additions & 149 deletions pycbc/workflow/dq.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,183 +22,182 @@
# =============================================================================
#

import os
import logging
from ligo import segments
import numpy
from pycbc.workflow.core import (FileList, Executable, Node,
File, SegFile, make_analysis_dir)
SegFile, make_analysis_dir)
from pycbc.workflow.datafind import setup_datafind_workflow

class PyCBCCalculateDQExecutable(Executable):
current_retention_level = Executable.ALL_TRIGGERS
def create_node(self, segment, frames):
start = int(segment[0])
end = int(segment[1])

class PyCBCBinTemplatesDQExecutable(Executable):
current_retention_level = Executable.MERGED_TRIGGERS

def create_node(self, workflow, ifo, template_bank_file, trigger_file):
node = Node(self)
node.add_input_list_opt('--frame-files', frames)
node.add_opt('--gps-start-time', start)
node.add_opt('--gps-end-time', end)
node.new_output_file_opt(segment, '.hdf', '--output-file')
node.add_opt('--ifo', ifo)
node.add_input_opt('--template-bank-file', template_bank_file)
node.add_input_opt('--trigger-file', trigger_file)
node.new_output_file_opt(workflow.analysis_time, '.hdf', '--output-file')
return node

class PyCBCRerankDQExecutable(Executable):

class PyCBCDQFlagFromTimeseriesExecutable(Executable):
current_retention_level = Executable.MERGED_TRIGGERS
def create_node(self, workflow, ifo, dq_type, dq_files, binned_rate_file):

def create_node(self, workflow, dq_label, analyzable_segs, dq_frames):
node = Node(self)
node.add_opt('--dq-type', dq_type)
node.add_opt('--ifo', ifo)
node.add_input_list_opt('--input-file', dq_files)
node.add_input_opt('--rate-file', binned_rate_file)
node.add_input_opt('--dq-label', dq_label)
node.add_input_list_opt('--analyzable-segs', analyzable_segs)
node.add_input_list_opt('--dq-frame-files', dq_frames)
node.new_output_file_opt(workflow.analysis_time, '.hdf',
'--output-file')
return node


class PyCBCBinTriggerRatesDQExecutable(Executable):
current_retention_level = Executable.MERGED_TRIGGERS
def create_node(self, workflow, ifo, dq_files, trig_file, bank_file):

def create_node(self, workflow, ifo, dq_label,
flag_file, trig_file, template_bins_file):
node = Node(self)
node.add_opt('--ifo', ifo)
node.add_input_opt('--bank-file', bank_file)
node.add_input_opt('--dq-label', dq_label)
node.add_input_opt('--template-bins-file', template_bins_file)
node.add_input_opt('--trig-file', trig_file)
node.add_input_list_opt('--dq-file', dq_files)
node.new_output_file_opt(workflow.analysis_time,'.hdf',
node.add_input_opt('--flag-file', flag_file)
node.new_output_file_opt(workflow.analysis_time, '.hdf',
'--output-file')
return node

class PyCBCCalculateDQFlagExecutable(Executable):
# current_retention_level = Executable.ALL_TRIGGERS

class PyCBCCombineDQExecutable(Executable):
current_retention_level = Executable.MERGED_TRIGGERS

def create_node(self, workflow, segment, dq_file, flag):
def create_node(self, workflow, dq_files):
node = Node(self)
# Executable objects are initialized with ifo information
start = int(segment[0])
end = int(segment[1])
node.add_opt('--ifo', self.ifo_string)
node.add_opt('--flag', flag)
node.add_opt('--gps-start-time', start)
node.add_opt('--gps-end-time', end)
node.add_input_opt('--dq-segments', dq_file)
node.new_output_file_opt(segment, '.hdf',
'--output-file')
node.add_input_list_opt('--dq-files', dq_files)
node.new_output_file_opt(workflow.analysis_time, '.hdf', '--output-file')
return node

def setup_dq_reranking(workflow, dq_label, insps, bank,
segs, analyzable_file, dq_file,
output_dir=None, tags=None):

def setup_dq_reranking(workflow, insps, bank,
analyzable_segs, analyzable_file,
dq_seg_file,
output_dir=None, tags=None):
make_analysis_dir(output_dir)
output = FileList()
output_files = FileList()
output_labels = []

dq_labels = workflow.cp.get_subsections('workflow-data_quality')
if tags:
dq_tags = tags + [dq_label]
dq_tags = tags + dq_labels
else:
dq_tags = [dq_label]
dq_type = workflow.cp.get_opt_tags("workflow-data_quality",
'dq-type', [dq_label])
if dq_type == 'timeseries':
if dq_label not in workflow.cp.get_subsections('workflow-datafind'):
msg = """No workflow-datafind section with dq tag.
Tags must be used in workflow-datafind sections "
if more than one source of data is used.
Strain data source must be tagged
workflow-datafind-hoft.
Consult the documentation for more info."""
raise ValueError(msg)
dq_ifos = workflow.cp.get_opt_tags("workflow-data_quality",
'ifos', [dq_label])
dq_ifos = dq_ifos.split(',')
dq_segs = {}
dq_segs_for_file = {}
for ifo in dq_ifos:
dq_segs[ifo] = segs[ifo]
dq_segs_for_file[ifo+':'+dq_label] = segs[ifo]
dq_segs_file = SegFile.from_segment_list_dict(dq_label,
dq_segs_for_file,
extension='.xml',
valid_segment=workflow.analysis_time,
directory=output_dir)
datafind_files, dq_file, dq_segs, dq_name = \
setup_datafind_workflow(workflow,
dq_segs, "datafind_dq",
seg_file=dq_segs_file,
tags=dq_tags)
for ifo in dq_ifos:
ifo_insp = [insp for insp in insps if (insp.ifo == ifo)]
assert len(ifo_insp)==1
ifo_insp = ifo_insp[0]

dq_files = FileList()
for seg in dq_segs[ifo]:
seg_frames = datafind_files.find_all_output_in_range(ifo, seg)
raw_exe = PyCBCCalculateDQExecutable(workflow.cp,
'calculate_dq', ifos=ifo,
out_dir=output_dir,
tags=dq_tags)
raw_node = raw_exe.create_node(seg, seg_frames)
workflow += raw_node
dq_files += raw_node.output_files

intermediate_exe = PyCBCBinTriggerRatesDQExecutable(workflow.cp,
'bin_trigger_rates_dq',
ifos=ifo,
out_dir=output_dir,
tags=dq_tags)
intermediate_node = intermediate_exe.create_node(workflow, ifo,
dq_files,
ifo_insp, bank)
workflow += intermediate_node
binned_rate_file = intermediate_node.output_file

new_exe = PyCBCRerankDQExecutable(workflow.cp,
'rerank_dq', ifos=ifo,
out_dir=output_dir,
tags=dq_tags)
new_node = new_exe.create_node(workflow, ifo, dq_label,
dq_files, binned_rate_file)
workflow += new_node
output += new_node.output_files
elif dq_type == 'flag':
flag_str = workflow.cp.get_opt_tags("workflow-data_quality",
'flag-name', dq_tags)
ifo = flag_str[:2]
dq_tags = dq_labels

dq_types = [workflow.cp.get_opt_tags('workflow-data_quality', 'dq-type', [dq_label])
for dq_label in dq_labels]
dq_ifos = [workflow.cp.get_opt_tags('workflow-data_quality', 'ifo', [dq_label])
for dq_label in dq_labels]
ifos = numpy.unique(dq_ifos)

for ifo in ifos:
# get the dq labels and types for this ifo
ifo_dq_labels = [dq_label for dq_label, dq_ifo in zip(dq_labels, dq_ifos)
if dq_ifo == ifo]
ifo_dq_types = [dq_type for dq_type, dq_ifo in zip(dq_types, dq_ifos)
if dq_ifo == ifo]

# get triggers for this ifo
ifo_insp = [insp for insp in insps if (insp.ifo == ifo)]
assert len(ifo_insp)==1
assert len(ifo_insp) == 1
ifo_insp = ifo_insp[0]
flag_name = flag_str
logging.info("Creating job for flag %s", flag_name)
dq_files = FileList()
for seg in segs[ifo]:
raw_exe = PyCBCCalculateDQFlagExecutable(workflow.cp,
'calculate_dqflag', ifos=ifo,
out_dir=output_dir,
tags=dq_tags)
raw_node = raw_exe.create_node(workflow, seg, dq_file,
flag_name)
workflow += raw_node
dq_files += raw_node.output_files
intermediate_exe = PyCBCBinTriggerRatesDQExecutable(workflow.cp,
'bin_trigger_rates_dq',
ifos=ifo,
out_dir=output_dir,
tags=dq_tags)
intermediate_node = intermediate_exe.create_node(workflow, ifo,
dq_files,
ifo_insp, bank)
workflow += intermediate_node
binned_rate_file = intermediate_node.output_file

new_exe = PyCBCRerankDQExecutable(workflow.cp,
'rerank_dq', ifos=ifo,
out_dir=output_dir,
tags=dq_tags)
new_node = new_exe.create_node(workflow, ifo, dq_label,
dq_files, binned_rate_file)
workflow += new_node
output += new_node.output_files
else:
msg = """Incorrect DQ type specified.
Only valid DQ types are 'flag'
and 'timeseries'.
Consult the documentation for more info."""
raise ValueError(msg)

return output
# calculate template bins for this ifo
bin_templates_exe = PyCBCBinTemplatesDQExecutable(
workflow.cp,
'bin_templates_dq',
ifos=ifo,
out_dir=output_dir,
tags=dq_tags)
bin_templates_node = bin_templates_exe.create_node(workflow, ifo, bank, ifo_insp)
workflow += bin_templates_node
template_bins_file = bin_templates_node.output_file

trigger_rate_files = FileList()
for dq_label, dq_type in zip(ifo_dq_labels, ifo_dq_types):
flag_file = None

# if dq is a timeseries, need to convert it to a flag
if dq_type == 'timeseries':
if dq_label not in workflow.cp.get_subsections('workflow-datafind'):
msg = """No workflow-datafind section with dq tag.
Tags must be used in workflow-datafind sections "
if more than one source of data is used.
Strain data source must be tagged
workflow-datafind-hoft.
Consult the documentation for more info."""
raise ValueError(msg)

# find timeseries frames
ifo_seg_dict = {ifo: analyzable_segs[ifo]}
dq_segs_for_file = {ifo+':'+dq_label: analyzable_segs[ifo]}
ts_dq_seg_file = SegFile.from_segment_list_dict(
dq_label,
dq_segs_for_file,
extension='.xml',
valid_segment=workflow.analysis_time,
directory=output_dir)
datafind_files, dq_file, dq_segs, dq_name = setup_datafind_workflow(
workflow,
ifo_seg_dict,
"datafind_dq",
seg_file=ts_dq_seg_file,
tags=dq_tags)

# calculate dq flag from timeseries
calculate_flag_exe = PyCBCDQFlagFromTimeseriesExecutable(
workflow.cp,
'calculate_dqflag',
ifos=ifo,
out_dir=output_dir,
tags=dq_tags)
calculate_flag_node = calculate_flag_exe.create_node(
workflow,
analyzable_segs,
datafind_files)
workflow += calculate_flag_node
flag_file = calculate_flag_node.output_file
else:
assert dq_type == 'flag'
flag_file = dq_seg_file

# calculate trigger rates during dq flags
bin_triggers_exe = PyCBCBinTriggerRatesDQExecutable(
workflow.cp,
'bin_trigger_rates_dq',
ifos=ifo,
out_dir=output_dir,
tags=dq_tags)
bin_triggers_node = bin_triggers_exe.create_node(
workflow,
ifo,
flag_file,
ifo_insp,
template_bins_file)
workflow += bin_triggers_node
trigger_rate_files += bin_triggers_node.output_files

# combine results from all dq inputs into one file
combine_dq_exe = PyCBCCombineDQExecutable(
workflow.cp,
'combine_dq',
ifos=ifo,
out_dir=output_dir,
tags=dq_tags)
combine_dq_node = combine_dq_exe.create_node(
workflow,
trigger_rate_files)
workflow += combine_dq_node
output_files += combine_dq_node.output_files
output_labels += [f'{ifo}_dq']

return output_files, output_labels

0 comments on commit fa61b73

Please sign in to comment.