Skip to content

Commit

Permalink
Allow dq stat to be used online
Browse files Browse the repository at this point in the history
  • Loading branch information
maxtrevor committed Dec 13, 2023
1 parent 8a5998f commit e061cc8
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 68 deletions.
68 changes: 46 additions & 22 deletions bin/pycbc_live
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,20 @@ launched with mpirun.
See https://arxiv.org/abs/1805.11174 for an overview."""

import sys
import argparse, numpy, pycbc, logging, cProfile, h5py, lal, json
import argparse
import numpy
import pycbc
import logging
import cProfile
import h5py
import lal
import json
import os.path
import itertools
import platform
import subprocess
import multiprocessing
from multiprocessing.dummy import threading
from matplotlib import use
use('agg')
from shutil import which
from mpi4py import MPI as mpi
from astropy.time import Time
Expand All @@ -44,6 +49,8 @@ from pycbc.filter import resample
from pycbc.psd import estimate
from pycbc.psd import variation
from pycbc.live import snr_optimizer
from matplotlib import use
use('agg')

# Use cached class-based FFTs in the resample and estimate module
resample.USE_CACHING_FOR_LFILTER = True
Expand All @@ -62,11 +69,13 @@ def ptof(p, ft):
"""
return numpy.log1p(-p) / -ft


def ftop(f, ft):
"""Convert FAR to p-value via foreground time `ft`.
"""
return 1 - numpy.exp(-ft * f)


def combine_ifar_pvalue(ifar, pvalue, livetime):
"""Convert original IFAR to p-value and combine with followup p-value.
"""
Expand All @@ -82,7 +91,11 @@ class LiveEventManager(object):
def __init__(self, args, bank):
self.low_frequency_cutoff = args.low_frequency_cutoff
self.bank = bank
self.skymap_only_ifos = [] if args.skymap_only_ifos is None else list(set(args.skymap_only_ifos))

if args.skymap_only_ifos is None:
self.skymap_only_ifos = []
else:
self.skymap_only_ifos = list(set(args.skymap_only_ifos))

# Figure out what we are supposed to process within the pool of MPI processes
self.comm = mpi.COMM_WORLD
Expand Down Expand Up @@ -321,8 +334,8 @@ class LiveEventManager(object):
buff_size = \
pycbc.waveform.get_waveform_filter_length_in_time(apr, **p)

tlen = self.bank.round_up((buff_size + min_buffer) \
* self.bank.sample_rate)
tlen = self.bank.round_up((buff_size + min_buffer)
* self.bank.sample_rate)
flen = int(tlen / 2 + 1)
delta_f = self.bank.sample_rate / float(tlen)
cmd = 'timeout {} '.format(args.snr_opt_timeout)
Expand Down Expand Up @@ -419,7 +432,7 @@ class LiveEventManager(object):

# Save the command which would be used:
snroc_fname = os.path.join(out_dir_path, 'snr_optimize_command.txt')
with open(snroc_fname,'w') as snroc_file:
with open(snroc_fname, 'w') as snroc_file:
snroc_file.write(cmd)

return cmd, out_dir_path
Expand Down Expand Up @@ -779,7 +792,7 @@ class LiveEventManager(object):
('zero_half_width', float),
('taper_width', float)]
f['{}/gates'.format(ifo)] = \
numpy.array(gates[ifo], dtype=gate_dtype)
numpy.array(gates[ifo], dtype=gate_dtype)

for ifo in (store_psd or {}):
if store_psd[ifo] is not None:
Expand All @@ -796,7 +809,7 @@ parser.add_argument('--low-frequency-cutoff', help="low frequency cutoff", type=
parser.add_argument('--sample-rate', help="output sample rate", type=int)
parser.add_argument('--chisq-bins', help="Number of chisq bins")
parser.add_argument('--analysis-chunk', type=int, required=True,
help="Amount of data to produce triggers in a block")
help="Amount of data to produce triggers in a block")

parser.add_argument('--snr-threshold', type=float,
help='SNR threshold for generating a trigger')
Expand All @@ -819,6 +832,8 @@ parser.add_argument('--idq-state-channel', action=MultiDetMultiColonOptionAction
parser.add_argument('--idq-threshold', type=float,
help='Threshold used to veto triggers at times of '
'low iDQ False Alarm Probability')
parser.add_argument('--idq-reweighting', action='store_true',default=False,
help='Reweight triggers based on iDQ False Alarm Probability')
parser.add_argument('--data-quality-channel',
action=MultiDetMultiColonOptionAction,
help="Channel containing data quality information. Used "
Expand Down Expand Up @@ -1067,7 +1082,7 @@ if evnt.rank == 0 and args.enable_gracedb_upload:
evnt.create_gdb()
response = evnt.gracedb.ping()
logging.info('GraceDB ping response: %s %s time elapsed: %s',
response.status, response.reason, response.elapsed.total_seconds())
response.status, response.reason, response.elapsed.total_seconds())

# I'm not the root, so do some actual filtering.
with ctx:
Expand Down Expand Up @@ -1139,6 +1154,7 @@ with ctx:
))

my_coinc_id = 999999

def set_coinc_id(i):
global my_coinc_id
my_coinc_id = i
Expand Down Expand Up @@ -1170,7 +1186,7 @@ with ctx:
# main analysis loop
data_end = lambda: data_reader[tuple(data_reader.keys())[0]].end_time
last_bg_dump_time = int(data_end())
psd_count = {ifo:0 for ifo in ifos}
psd_count = {ifo: 0 for ifo in ifos}

# Create dicts to track whether the psd has been recalculated and to hold
# psd variation filters
Expand Down Expand Up @@ -1219,7 +1235,7 @@ with ctx:
evnt.commit_results((results, data_end()))
else:
psds = {ifo: data_reader[ifo].psd for ifo in data_reader
if data_reader[ifo].psd is not None}
if data_reader[ifo].psd is not None}

# Collect together the single detector triggers
if evnt.size > 1:
Expand Down Expand Up @@ -1247,18 +1263,27 @@ with ctx:
for key in results[ifo]:
if len(results[ifo][key]):
results[ifo][key] = results[ifo][key][idx]
if data_reader[ifo].idq is not None:
logging.info("Checking %s's iDQ information", ifo)
if data_reader[ifo].idq:
logging.info("Reading %s's iDQ information", ifo)
start = data_reader[ifo].start_time
times = results[ifo]['end_time']
idx = data_reader[ifo].idq.indices_of_flag(
flag_active = data_reader[ifo].idq.flag_at_times(
start, valid_pad, times,
padding=data_reader[ifo].dq_padding)
logging.info('Keeping %d/%d %s triggers after iDQ',
len(idx), len(times), ifo)
for key in results[ifo]:
if len(results[ifo][key]):
results[ifo][key] = results[ifo][key][idx]

if args.idq_reweighting:
logging.info('iDQ flagged %d/%d %s triggers',
numpy.sum(flag_active), len(times), ifo)
results[ifo]['dq_state'] = flag_active.astype(int)
else:
# use idq as a veto
keep = numpy.logical_not(flag_active)
logging.info('Keeping %d/%d %s triggers after iDQ',
numpy.sum(keep), len(times), ifo)
for key in results[ifo]:
if len(results[ifo][key]):
results[ifo][key] = \
results[ifo][key][keep]

# Calculate and add the psd variation for the results
if args.psd_variation:
Expand Down Expand Up @@ -1344,8 +1369,7 @@ with ctx:
tdiff = pycbc.gps_now() - t1
lag = float(pycbc.gps_now() - data_end())
logging.info('Took %1.2f, duty factor of %.2f, lag %.2f s, %d live detectors',
tdiff, tdiff / valid_pad, lag, len(evnt.live_detectors)
)
tdiff, tdiff / valid_pad, lag, len(evnt.live_detectors))

if args.output_status is not None and evnt.rank == 0:
if lag > 120:
Expand Down
42 changes: 24 additions & 18 deletions pycbc/events/coinc.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@
coincident triggers.
"""

import numpy, logging, pycbc.pnutils, pycbc.conversions, copy, lal
import numpy
import logging
import copy
import lal
import pycbc.pnutils
import pycbc.conversions
from pycbc.detector import Detector, ppdets
from pycbc.conversions import mchirp_from_mass1_mass2
from .eventmgr_cython import coincbuffer_expireelements
Expand Down Expand Up @@ -68,9 +73,9 @@ def background_bin_from_string(background_bins, data):

for bin_type, boundary in zip(bin_type_list, boundary_list):
if boundary[0:2] == 'lt':
member_func = lambda vals, bd=boundary : vals < float(bd[2:])
member_func = lambda vals, bd=boundary: vals < float(bd[2:])
elif boundary[0:2] == 'gt':
member_func = lambda vals, bd=boundary : vals > float(bd[2:])
member_func = lambda vals, bd=boundary: vals > float(bd[2:])
else:
raise RuntimeError("Can't parse boundary condition! Must begin "
"with 'lt' or 'gt'")
Expand Down Expand Up @@ -415,7 +420,7 @@ def cluster_coincs_multiifo(stat, time_coincs, timeslide_id, slide, window,
return numpy.array([])

time_avg_num = []
#find number of ifos and mean time over participating ifos for each coinc
# find number of ifos and mean time over participating ifos for each coinc
for tc in time_coinc_zip:
time_avg_num.append(mean_if_greater_than_zero(tc))

Expand Down Expand Up @@ -686,7 +691,8 @@ def check_expired_triggers(self, buffer_index):
if (buf_len - val_end) + val_start > invalid_limit:
# If self.resize_invalid_fraction of stored triggers are expired
# or are not set, free up memory
self.buffer_expire[buffer_index] = self.buffer_expire[buffer_index][val_start:val_end].copy()
self.buffer_expire[buffer_index] = \
self.buffer_expire[buffer_index][val_start:val_end].copy()
self.buffer[buffer_index] = self.buffer[buffer_index][val_start:val_end].copy()
self.valid_ends[buffer_index] -= val_start
self.valid_starts[buffer_index] = 0
Expand All @@ -704,8 +710,7 @@ class CoincExpireBuffer(object):
multiple expiration vectors.
"""

def __init__(self, expiration, ifos,
initial_size=2**20, dtype=numpy.float32):
def __init__(self, expiration, ifos, initial_size=2**20, dtype=numpy.float32):
"""
Parameters
----------
Expand Down Expand Up @@ -1065,8 +1070,8 @@ def set_singles_buffer(self, results):
# Create a ring buffer for each template ifo combination
for ifo in self.ifos:
self.singles[ifo] = MultiRingBuffer(self.num_templates,
self.buffer_size,
self.singles_dtype)
self.buffer_size,
self.singles_dtype)

def _add_singles_to_buffer(self, results, ifos):
"""Add single detector triggers to the internal buffer
Expand Down Expand Up @@ -1107,7 +1112,7 @@ def _add_singles_to_buffer(self, results, ifos):
del trigsc['ifo']
else:
single_stat = numpy.array([], ndmin=1,
dtype=self.stat_calculator.single_dtype)
dtype=self.stat_calculator.single_dtype)
trigs['stat'] = single_stat

# add each single detector trigger to the and advance the buffer
Expand Down Expand Up @@ -1147,10 +1152,10 @@ def _find_coincs(self, results, valid_ifos):
# Initialize
cstat = [[]]
offsets = []
ctimes = {self.ifos[0]:[], self.ifos[1]:[]}
single_expire = {self.ifos[0]:[], self.ifos[1]:[]}
ctimes = {self.ifos[0]: [], self.ifos[1]: []}
single_expire = {self.ifos[0]: [], self.ifos[1]: []}
template_ids = [[]]
trigger_ids = {self.ifos[0]:[[]], self.ifos[1]:[[]]}
trigger_ids = {self.ifos[0]: [[]], self.ifos[1]: [[]]}

# Calculate all the permutations of coincident triggers for each
# new single detector trigger collected
Expand Down Expand Up @@ -1191,10 +1196,10 @@ def _find_coincs(self, results, valid_ifos):
# (The second output would just be a list of zeroes as we only
# have one trigger in the fixed_ifo.)
i1, _, slide = time_coincidence(times,
numpy.array(trig_time, ndmin=1,
dtype=numpy.float64),
self.time_window,
self.timeslide_interval)
numpy.array(trig_time, ndmin=1,
dtype=numpy.float64),
self.time_window,
self.timeslide_interval)

# Make a copy of the fixed ifo trig_stat for each coinc.
# NB for some statistics the "stat" entry holds more than just
Expand Down Expand Up @@ -1365,7 +1370,8 @@ def add_singles(self, results):

# If there are no results just return
valid_ifos = [k for k in results.keys() if results[k] and k in self.ifos]
if len(valid_ifos) == 0: return {}
if len(valid_ifos) == 0:
return {}

# Add single triggers to the internal buffer
self._add_singles_to_buffer(results, ifos=valid_ifos)
Expand Down
Loading

0 comments on commit e061cc8

Please sign in to comment.