From 5af0cb883b188be81c2577599c096ca2eb1f9b55 Mon Sep 17 00:00:00 2001 From: Steven Murray Date: Tue, 31 Jan 2023 16:18:54 -0700 Subject: [PATCH 1/4] feat: add partial time-reading and conjugation to HERADataFastReader Adds: - `keep_times` argument to `read_hera_hdf5`, which keeps certain times at the end of the read (not really partial I/O, but still better than `.select()` on a DC). - ability to get conjugate baselines in `read_hera_hdf5`. - more tests to ensure partial bls/pols/times is done right. --- hera_cal/io.py | 122 +++++++++++++++++++++-------- hera_cal/tests/test_io.py | 161 ++++++++++++++++++++++++++------------ 2 files changed, 198 insertions(+), 85 deletions(-) diff --git a/hera_cal/io.py b/hera_cal/io.py index 5b41ab0f7..140eb51fc 100644 --- a/hera_cal/io.py +++ b/hera_cal/io.py @@ -1109,19 +1109,23 @@ def empty_arrays(self): self.nsample_array = (np.zeros_like(self.nsample_array) if self.nsample_array is not None else None) -def read_hera_hdf5(filenames, bls=None, pols=None, full_read_thresh=0.002, - read_data=True, read_flags=False, read_nsamples=False, - check=False, dtype=np.complex128, verbose=False): +def read_hera_hdf5( + filenames, bls=None, pols=None, keep_times: list[float] | None=None, full_read_thresh=0.002, + read_data=True, read_flags=False, read_nsamples=False, + check=False, dtype=np.complex128, verbose=False +): '''A potentially faster interface for reading HERA HDF5 files. Only concatenates along time axis. Puts times in ascending order, but does not check that files are contiguous. Currently not BDA compatible. - Arguments: filenames: list of files to read bls: list of (ant_1, ant_2, [polstr]) tuples to read out of files. Default: all bls common to all files. pols: list of pol strings to read out of files. Default: all, but is superceded by any polstrs listed in bls. + keep_times: list of times to read out of files. Default: all. Note: all times + are always read from the files, setting this only down-filters times + after reading. full_read_thresh (0.002): fractional threshold for reading whole file instead of baseline by baseline. read_data (bool, True): read data @@ -1130,7 +1134,6 @@ def read_hera_hdf5(filenames, bls=None, pols=None, full_read_thresh=0.002, check (bool, False): run sanity checks to make sure files match. dtype (np.complex128): numpy datatype for output complex-valued arrays verbose: print some progress messages. - Returns: rv: dict with keys 'info' and optionally 'data', 'flags', and 'nsamples', based on whether read_data, read_flags, and read_nsamples are true. @@ -1148,6 +1151,8 @@ def read_hera_hdf5(filenames, bls=None, pols=None, full_read_thresh=0.002, inds = {} # Read file metadata to size up arrays and sort times filenames = _parse_input_files(filenames, name='input_data') + + for filename in filenames: if verbose: print(f'Reading header of {filename}') @@ -1163,12 +1168,20 @@ def read_hera_hdf5(filenames, bls=None, pols=None, full_read_thresh=0.002, info['freqs'] = h['freq_array'][()] # make 1D instead of 2D nfreqs = info['freqs'].size pol_array = h['polarization_array'][()] + npols = pol_array.size # the following errors if x_orientation not set in this hdf5 x_orient = str(h['x_orientation'][()], encoding='utf-8') - pol_indices = {uvutils.parse_polstr(POL_NUM2STR_DICT[n], x_orientation=x_orient): cnt - for cnt, n in enumerate(pol_array)} + + pol_indices = { + uvutils.parse_polstr(POL_NUM2STR_DICT[n], x_orientation=x_orient): cnt + for cnt, n in enumerate(pol_array) + } + if pols is not None: + pols = [uvutils.parse_polstr(p, x_orientation=x_orient) for p in pols] + info['pols'] = list(pol_indices.keys()) + info['x_orientation'] = x_orient info['ants'] = antenna_numbers = h['antenna_numbers'][()] info['antpos'] = dict(zip(antenna_numbers, h['antenna_positions'][()])) for coord in ['latitude', 'longitude', 'altitude']: @@ -1196,26 +1209,34 @@ def read_hera_hdf5(filenames, bls=None, pols=None, full_read_thresh=0.002, data_ants.update(set(ant2_array)) _hash = hash((ant1_array.tobytes(), ant2_array.tobytes(), time_first, ntimes)) # map baselines to array indices for each unique antenna order + existing_bls = list(zip(ant1_array, ant2_array)) if _hash not in inds: + inds[_hash] = {} + if time_first: - inds[_hash] = {(i, j): slice(n * ntimes, (n + 1) * ntimes) - for n, (i, j) in enumerate(zip(ant1_array, - ant2_array))} + for n, (i, j) in enumerate(existing_bls): + slc = slice(n * ntimes, (n + 1) * ntimes) + inds[_hash][(i, j)] = slc + inds[_hash][(j, i)] = slc # Allow for conjugation else: - inds[_hash] = {(i, j): slice(n, None, nbls) - for n, (i, j) in enumerate(zip(ant1_array, - ant2_array))} + for n, (i, j) in enumerate(existing_bls): + slc = slice(n, None, nbls) + inds[_hash][(i, j)] = slc + inds[_hash][(j, i)] = slc # Allow for conjugation + + if bls is not None: # Make sure our baselines of interest are in this file - if not all([bl[:2] in inds[_hash] for bl in bls]): + if not all(bl[:2] in inds[_hash] for bl in bls): missing_bls = [bl for bl in bls if bl[:2] not in inds[_hash]] raise ValueError(f'File {filename} missing:' + str(missing_bls)) - assert bl[:2] in inds[_hash] + + if 'bls' not in info: - info['bls'] = set(inds[_hash].keys()) + info['bls'] = set(existing_bls) info['data_ants'] = data_ants else: - info['bls'].intersection_update(set(inds[_hash].keys())) + info['bls'].intersection_update(set(existing_bls)) info['data_ants'].intersection_update(data_ants) bl2ind[filename] = inds[_hash] @@ -1251,7 +1272,7 @@ def read_hera_hdf5(filenames, bls=None, pols=None, full_read_thresh=0.002, # bail here if all we wanted was the info if len(rv) == 0: return {'info': info} - + t = 0 for _times, filename, _info in times: inds = bl2ind[filename] @@ -1302,27 +1323,50 @@ def index_exp(i, j, p): for i, j, p in bls: _d = d[index_exp(i, j, p)] data[i, j, p][t:t + ntimes].real = _d['r'] - data[i, j, p][t:t + ntimes].imag = _d['i'] + if (i,j) in info['bls']: + data[i, j, p][t:t + ntimes].imag = _d['i'] + else: # conjugate + data[i, j, p][t:t + ntimes].imag = -_d['i'] else: for i, j, p in bls: - data[i, j, p][t:t + ntimes] = d[index_exp(i, j, p)] - + if (i, j) in info['bls']: + data[i, j, p][t:t + ntimes] = d[index_exp(i, j, p)] + else: + data[i, j, p][t:t + ntimes] = np.conj(d[index_exp(i, j, p)]) t += ntimes + + # Now subselect the times. + if keep_times is not None: + time_mask = np.array([t in keep_times for t in info['times']]) + info['times'] = info['times'][time_mask] + for key in ['visdata', 'flags', 'nsamples']: + if key in rv: + for bl in rv[key]: + rv[key][bl] = rv[key][bl][time_mask] + # Quick renaming of data key for niceness if 'visdata' in rv: rv['data'] = rv.pop('visdata', []) - info['data_ants'] = np.array(sorted(info['data_ants'])) + + # Ensure info['bls'] matches input bls. + antpairs = set(bl[:2] for bl in bls) + info['bls'] = [bl for bl in info['bls'] if bl in antpairs] + + info['data_ants'] = set() + for bl in info['bls']: + info['data_ants'].add(bl[0]) + info['data_ants'].add(bl[1]) + rv['info'] = info return rv -class HERADataFastReader(): +class HERADataFastReader: '''Wrapper class around read_hera_hdf5 meant to mimic the functionality of HERAData for drop-in replacement.''' def __init__(self, input_data, read_metadata=True, check=False, skip_lsts=False): '''Instantiates a HERADataFastReader object. Only supports reading uvh5 files, not writing them. Does not support BDA and only supports patial i/o along baselines and polarization axes. - Arguments: input_data: path or list of paths to uvh5 files. read_metadata (bool, True): reads metadata from file and stores it internally to try to match HERAData @@ -1346,6 +1390,8 @@ def __init__(self, input_data, read_metadata=True, check=False, skip_lsts=False) else: setattr(self, meta, None) + self.x_orientation = rv['info'].get('x_orientation', None) + # create functions that error informatively when trying to use standard HERAData/UVData methods for funcname in list(dir(HERAData)): if funcname.startswith('__') and funcname.endswith('__'): @@ -1366,7 +1412,7 @@ def _adapt_metadata(self, info_dict, skip_lsts=False): info_dict['data_antpos'] = {ant: info_dict['antpos'][ant] for ant in info_dict['data_ants']} info_dict['times'] = np.unique(info_dict['times']) info_dict['times_by_bl'] = {ap: info_dict['times'] for ap in info_dict['antpairs']} - info_dict['times_by_bl'].update({(a2, a1): info_dict['times'] for (a1, a2) in info_dict['antpairs']}) + # info_dict['times_by_bl'].update({(a2, a1): info_dict['times'] for (a1, a2) in info_dict['antpairs']}) if not skip_lsts: info_dict['lsts'] = JD2LST(info_dict['times'], info_dict['latitude'], info_dict['longitude'], info_dict['altitude']) info_dict['lsts_by_bl'] = {ap: info_dict['lsts'] for ap in info_dict['antpairs']} @@ -1374,14 +1420,17 @@ def _adapt_metadata(self, info_dict, skip_lsts=False): def _HERAData_error(self, *args, **kwargs): raise NotImplementedError('HERADataFastReader does not support this method. Try HERAData instead.') - def read(self, bls=None, pols=None, full_read_thresh=0.002, read_data=True, read_flags=True, + def read(self, bls=None, pols=None, keep_times: list[float] | None = None, + full_read_thresh=0.002, read_data=True, read_flags=True, read_nsamples=True, check=False, dtype=np.complex128, verbose=False, skip_lsts=False): '''A faster read that only concatenates along the time axis. Puts times in ascending order, but does not check that files are contiguous. Currently not BDA compatible. - Arguments: bls: list of (ant_1, ant_2, [polstr]) tuples to read out of files. Default: all bls common to all files. pols: list of pol strings to read out of files. Default: all, but is superceded by any polstrs listed in bls. + keep_times: list of times to read out of files. Default: all. Note: all times + are always read from the files, setting this only down-filters times + after reading. full_read_thresh (0.002): fractional threshold for reading whole file instead of baseline by baseline. read_data (bool, True): read data read_flags (bool, True): read flags @@ -1390,25 +1439,32 @@ def read(self, bls=None, pols=None, full_read_thresh=0.002, read_data=True, read dtype (np.complex128): numpy datatype for output complex-valued arrays verbose: print some progress messages. skip_lsts (bool, False): save time by not computing LSTs from JDs - Returns: data: DataContainer mapping baseline keys to complex visibility waterfalls (if read_data is True, else None) flags: DataContainer mapping baseline keys to boolean flag waterfalls (if read_flags is True, else None) nsamples: DataContainer mapping baseline keys to interger Nsamples waterfalls (if read_nsamples is True, else None) ''' - rv = read_hera_hdf5(self.filepaths, bls=bls, pols=pols, full_read_thresh=full_read_thresh, - read_data=read_data, read_flags=read_flags, read_nsamples=read_nsamples, - check=check, dtype=dtype, verbose=verbose) + rv = read_hera_hdf5( + self.filepaths, bls=bls, pols=pols, keep_times=keep_times, + full_read_thresh=full_read_thresh, + read_data=read_data, read_flags=read_flags, read_nsamples=read_nsamples, + check=check, dtype=dtype, verbose=verbose + ) self._adapt_metadata(rv['info'], skip_lsts=skip_lsts) + print("IN: ", rv['info']['antpairs']) - # make autocorrleations real by taking the abs, matches UVData._fix_autos() + # make autocorrelations real by taking the abs, matches UVData._fix_autos() if 'data' in rv: for bl in rv['data']: if split_bl(bl)[0] == split_bl(bl)[1]: rv['data'][bl] = np.abs(rv['data'][bl]) # construct datacontainers from result - return self._make_datacontainer(rv, 'data'), self._make_datacontainer(rv, 'flags'), self._make_datacontainer(rv, 'nsamples') + return ( + self._make_datacontainer(rv, 'data'), + self._make_datacontainer(rv, 'flags'), + self._make_datacontainer(rv, 'nsamples') + ) def _make_datacontainer(self, rv, key='data'): '''Converts outputs from read_hera_hdf5 to a more standard HERAData output.''' diff --git a/hera_cal/tests/test_io.py b/hera_cal/tests/test_io.py index a2a9d9503..8dbb336d0 100644 --- a/hera_cal/tests/test_io.py +++ b/hera_cal/tests/test_io.py @@ -846,7 +846,7 @@ def test_read_bl_then_time_poltranpose(self): assert len(rv['info']['times']) == 2 -class Test_HERADataFastReader(object): +class Test_HERADataFastReader: def setup_method(self): self.uvh5_1 = os.path.join(DATA_PATH, "test_input", "zen.2458042.60288.HH.uvRXLS.uvh5_downselected") self.uvh5_2 = os.path.join(DATA_PATH, "test_input", "zen.2458042.61034.HH.uvRXLS.uvh5_downselected") @@ -874,59 +874,116 @@ def test_read_data(self): else: np.testing.assert_array_equal(d[bl], np.abs(rv['data'][bl])) - def test_comp_to_HERAData(self): - for infile in ([self.uvh5_1], [self.uvh5_1, self.uvh5_2], self.uvh5_h4c): - hd = io.HERADataFastReader(infile, read_metadata=False) - d, f, n = hd.read(check=True) - hd2 = io.HERAData(infile) - d2, f2, n2 = hd2.read() - # compare all data and metadata - for dc1, dc2 in zip([d, f, n], [d2, f2, n2]): - for bl in dc1: - if (split_bl(bl)[0] == split_bl(bl)[1]) and (infile != self.uvh5_h4c): - # somehow there are numerical issues at play for H1C data - np.testing.assert_allclose(dc1[bl], dc2[bl], rtol=1e-6) - else: - np.testing.assert_array_equal(dc1[bl], dc2[bl]) - np.testing.assert_array_equal(dc1.freqs, dc2.freqs) - np.testing.assert_array_equal(dc1.times, dc2.times) - np.testing.assert_allclose(dc1.lsts, dc2.lsts) - np.testing.assert_array_equal(dc1.ants, dc2.ants) - np.testing.assert_array_equal(dc1.data_ants, dc2.data_ants) - np.testing.assert_array_equal(sorted(dc1.pols()), sorted(dc2.pols())) - np.testing.assert_array_equal(sorted(dc1.antpairs()), sorted(dc2.antpairs())) - np.testing.assert_array_equal(sorted(dc1.bls()), sorted(dc2.bls())) - for ant in dc1.antpos: - np.testing.assert_array_almost_equal(dc1.antpos[ant] - dc2.antpos[ant], 0) - for ant in dc1.data_antpos: - np.testing.assert_array_almost_equal(dc1.antpos[ant] - dc2.antpos[ant], 0) - for ap in dc1.times_by_bl: - np.testing.assert_array_equal(dc1.times_by_bl[ap], dc2.times_by_bl[ap]) - for ap in dc1.lsts_by_bl: - np.testing.assert_allclose(dc1.lsts_by_bl[ap], dc2.lsts_by_bl[ap]) + def compare_datacontainers(self, dc1, dc2, allow_close: bool = False): + for bl in dc1: + if (split_bl(bl)[0] == split_bl(bl)[1]) and allow_close: + # somehow there are numerical issues at play for H1C data + np.testing.assert_allclose(dc1[bl], dc2[bl], rtol=1e-6) + else: + np.testing.assert_array_equal(dc1[bl], dc2[bl]) + np.testing.assert_array_equal(dc1.freqs, dc2.freqs) + np.testing.assert_array_equal(dc1.times, dc2.times) + np.testing.assert_allclose(dc1.lsts, dc2.lsts) + np.testing.assert_array_equal(dc1.ants, dc2.ants) + np.testing.assert_array_equal(dc1.data_ants, dc2.data_ants) + np.testing.assert_array_equal(sorted(dc1.pols()), sorted(dc2.pols())) + np.testing.assert_array_equal(sorted(dc1.antpairs()), sorted(dc2.antpairs())) + np.testing.assert_array_equal(sorted(dc1.bls()), sorted(dc2.bls())) + for ant in dc1.antpos: + np.testing.assert_array_almost_equal(dc1.antpos[ant] - dc2.antpos[ant], 0) + for ant in dc1.data_antpos: + np.testing.assert_array_almost_equal(dc1.antpos[ant] - dc2.antpos[ant], 0) + for ap in dc1.times_by_bl: + np.testing.assert_array_equal(dc1.times_by_bl[ap], dc2.times_by_bl[ap]) + for ap in dc1.lsts_by_bl: + np.testing.assert_allclose(dc1.lsts_by_bl[ap], dc2.lsts_by_bl[ap]) + + @pytest.mark.parametrize( + 'infile', (['uvh5_1'], ['uvh5_1', 'uvh5_2'], 'uvh5_h4c') + ) + @pytest.mark.parametrize( + 'bls', (None, 'first10') + ) + @pytest.mark.parametrize( + 'pols', (None, ['ee'], ['yy']) + ) + @pytest.mark.parametrize( + 'times', (None, 'first3', 'reverse') + ) + def test_comp_to_HERAData_dc(self, infile, bls, pols, times): + # This just turns the strings from the parametrize into actual objects + if (isinstance(infile, list) and len(infile) > 1 and times is not None): + pytest.skip("HERAData doesn't support partial times with multiple files that have different times.") + + if isinstance(infile, list): + indata = [getattr(self, f) for f in infile] + else: + indata = getattr(self, infile) + + hd = io.HERADataFastReader(indata, read_metadata=False) + hd2 = io.HERAData(indata) + if bls == "first10": + bls = hd2.get_antpairs()[:10] + print(bls) + if times == "first3": + if isinstance(hd2.times, np.ndarray): + times = hd2.times[:3] + else: + # mulptiple files means that "times" is a dict, one for each file. + times = np.concatenate([t[:3] for t in hd2.times.values()]) + elif times == 'reverse': + times = hd2.times[::-1] + + d, f, n = hd.read(check=True,bls=bls, pols=pols, keep_times=times) + d2, f2, n2 = hd2.read(bls=bls, polarizations=pols, times=times) + print(d.antpairs()) + print(d.times_by_bl.keys()) + # compare all data and metadata + for dc1, dc2 in zip([d, f, n], [d2, f2, n2]): + self.compare_datacontainers(dc1, dc2, allow_close=infile != 'uvh5_h4c') + + def test_partial_times_multiple_files(self): + """Test that partial times works with multiple files that have different times.""" + hd = io.HERADataFastReader([self.uvh5_1, self.uvh5_2]) + times = hd.times[:-1] + + d, f, n = hd.read(check=True, keep_times=times) + d2, f2, n2 = hd.read(check=True) + + for key, val in d.items(): + np.testing.assert_array_equal(val, d2[key][:-1]) + + @pytest.mark.parametrize( + 'infile', (['uvh5_1'], 'uvh5_h4c') + ) + def test_comp_to_HERAData_hd(self, infile): + # This just turns the strings from the parametrize into actual objects + if isinstance(infile, list): + infile = [getattr(self, f) for f in infile] + else: + infile = getattr(self, infile) # compare metadata stored in hd object - for infile in ([self.uvh5_1], self.uvh5_h4c): - hd1 = io.HERADataFastReader(infile) - d, f, n = hd.read(check=True) - hd2 = io.HERAData(infile) - d2, f2, n2 = hd2.read() - np.testing.assert_array_equal(hd1.freqs, hd2.freqs) - np.testing.assert_array_equal(hd1.times, hd2.times) - np.testing.assert_allclose(hd1.lsts, hd2.lsts) - np.testing.assert_array_equal(hd1.ants, hd2.ants) - np.testing.assert_array_equal(hd1.data_ants, hd2.data_ants) - np.testing.assert_array_equal(hd1.pols, hd2.pols) - np.testing.assert_array_equal(hd1.antpairs, hd2.antpairs) - np.testing.assert_array_equal(hd1.bls, hd2.bls) - for ant in hd1.antpos: - np.testing.assert_array_almost_equal(hd1.antpos[ant] - hd2.antpos[ant], 0) - for ant in hd1.data_antpos: - np.testing.assert_array_almost_equal(hd1.antpos[ant] - hd2.antpos[ant], 0) - for ap in hd1.times_by_bl: - np.testing.assert_array_equal(hd1.times_by_bl[ap], hd2.times_by_bl[ap]) - for ap in hd1.lsts_by_bl: - np.testing.assert_allclose(hd1.lsts_by_bl[ap], hd2.lsts_by_bl[ap]) + hd1 = io.HERADataFastReader(infile) + hd1.read(check=True) + hd2 = io.HERAData(infile) + #hd2.read() + np.testing.assert_array_equal(hd1.freqs, hd2.freqs) + np.testing.assert_array_equal(hd1.times, hd2.times) + np.testing.assert_allclose(hd1.lsts, hd2.lsts) + np.testing.assert_array_equal(hd1.ants, hd2.ants) + np.testing.assert_array_equal(hd1.data_ants, hd2.data_ants) + np.testing.assert_array_equal(hd1.pols, hd2.pols) + np.testing.assert_array_equal(hd1.antpairs, hd2.antpairs) + np.testing.assert_array_equal(hd1.bls, hd2.bls) + for ant in hd1.antpos: + np.testing.assert_array_almost_equal(hd1.antpos[ant] - hd2.antpos[ant], 0) + for ant in hd1.data_antpos: + np.testing.assert_array_almost_equal(hd1.antpos[ant] - hd2.antpos[ant], 0) + for ap in hd1.times_by_bl: + np.testing.assert_array_equal(hd1.times_by_bl[ap], hd2.times_by_bl[ap]) + for ap in hd1.lsts_by_bl: + np.testing.assert_allclose(hd1.lsts_by_bl[ap], hd2.lsts_by_bl[ap]) def test_errors(self): hd = io.HERADataFastReader([self.uvh5_1, self.uvh5_2]) From ba2293f36abffa2026ec53322a33518fc1e9914e Mon Sep 17 00:00:00 2001 From: Steven Murray Date: Wed, 1 Feb 2023 13:24:06 -0700 Subject: [PATCH 2/4] test: add test of conjugated baselines --- hera_cal/io.py | 48 ++++++++++++++++++++------------------- hera_cal/tests/test_io.py | 5 +++- 2 files changed, 29 insertions(+), 24 deletions(-) diff --git a/hera_cal/io.py b/hera_cal/io.py index 140eb51fc..0579e2998 100644 --- a/hera_cal/io.py +++ b/hera_cal/io.py @@ -1152,6 +1152,10 @@ def read_hera_hdf5( # Read file metadata to size up arrays and sort times filenames = _parse_input_files(filenames, name='input_data') + if bls is not None: + antpairs = [bl[:2] for bl in bls] + else: + antpairs = None for filename in filenames: if verbose: @@ -1209,35 +1213,34 @@ def read_hera_hdf5( data_ants.update(set(ant2_array)) _hash = hash((ant1_array.tobytes(), ant2_array.tobytes(), time_first, ntimes)) # map baselines to array indices for each unique antenna order - existing_bls = list(zip(ant1_array, ant2_array)) + data_bls = list(zip(ant1_array, ant2_array)) + if _hash not in inds: inds[_hash] = {} if time_first: - for n, (i, j) in enumerate(existing_bls): + for n, (i, j) in enumerate(data_bls): slc = slice(n * ntimes, (n + 1) * ntimes) inds[_hash][(i, j)] = slc - inds[_hash][(j, i)] = slc # Allow for conjugation else: - for n, (i, j) in enumerate(existing_bls): + for n, (i, j) in enumerate(data_bls): slc = slice(n, None, nbls) inds[_hash][(i, j)] = slc - inds[_hash][(j, i)] = slc # Allow for conjugation - - if bls is not None: + if antpairs is not None: # Make sure our baselines of interest are in this file - if not all(bl[:2] in inds[_hash] for bl in bls): - missing_bls = [bl for bl in bls if bl[:2] not in inds[_hash]] + missing_bls = [(i,j) for (i,j) in antpairs if not ((i,j) in inds[_hash] or (j,i) in inds[_hash])] + if missing_bls: raise ValueError(f'File {filename} missing:' + str(missing_bls)) - + # Use intersection of bls across files. if 'bls' not in info: - info['bls'] = set(existing_bls) + info['bls'] = set(inds[_hash].keys()) info['data_ants'] = data_ants else: - info['bls'].intersection_update(set(existing_bls)) - info['data_ants'].intersection_update(data_ants) + info['bls'] &= set(inds[_hash].keys()) + info['data_ants'] &= data_ants + bl2ind[filename] = inds[_hash] if bls is None: @@ -1254,8 +1257,13 @@ def read_hera_hdf5( pols = list(pol_indices.keys()) bls = set(bl for bl in bls if len(bl) == 3) bls = bls.union([bl + (p,) for bl in bls_len2 for p in pols]) + + # now, conjugate them if they're not in the data + bls = {(i, j, p) if (i, j) in info['bls'] else (j, i, utils.conj_pol(p)) for (i, j, p) in bls} + # record polarizations as total of ones indexed in bls pols = set(bl[2] for bl in bls) + # sort files by time of first integration times.sort(key=lambda x: x[0][0]) info['times'] = np.concatenate([t[0] for t in times], axis=0) @@ -1323,16 +1331,10 @@ def index_exp(i, j, p): for i, j, p in bls: _d = d[index_exp(i, j, p)] data[i, j, p][t:t + ntimes].real = _d['r'] - if (i,j) in info['bls']: - data[i, j, p][t:t + ntimes].imag = _d['i'] - else: # conjugate - data[i, j, p][t:t + ntimes].imag = -_d['i'] + data[i, j, p][t:t + ntimes].imag = _d['i'] else: for i, j, p in bls: - if (i, j) in info['bls']: - data[i, j, p][t:t + ntimes] = d[index_exp(i, j, p)] - else: - data[i, j, p][t:t + ntimes] = np.conj(d[index_exp(i, j, p)]) + data[i, j, p][t:t + ntimes] = d[index_exp(i, j, p)] t += ntimes # Now subselect the times. @@ -1349,8 +1351,8 @@ def index_exp(i, j, p): rv['data'] = rv.pop('visdata', []) # Ensure info['bls'] matches input bls. - antpairs = set(bl[:2] for bl in bls) - info['bls'] = [bl for bl in info['bls'] if bl in antpairs] + if antpairs is not None: + info['bls'] = {bl[:2] for bl in bls} info['data_ants'] = set() for bl in info['bls']: diff --git a/hera_cal/tests/test_io.py b/hera_cal/tests/test_io.py index 8dbb336d0..84a161a76 100644 --- a/hera_cal/tests/test_io.py +++ b/hera_cal/tests/test_io.py @@ -902,7 +902,7 @@ def compare_datacontainers(self, dc1, dc2, allow_close: bool = False): 'infile', (['uvh5_1'], ['uvh5_1', 'uvh5_2'], 'uvh5_h4c') ) @pytest.mark.parametrize( - 'bls', (None, 'first10') + 'bls', (None, 'first10', 'conjugated') ) @pytest.mark.parametrize( 'pols', (None, ['ee'], ['yy']) @@ -925,6 +925,9 @@ def test_comp_to_HERAData_dc(self, infile, bls, pols, times): if bls == "first10": bls = hd2.get_antpairs()[:10] print(bls) + elif bls == "conjugated": + bls = hd2.get_antpairs()[:15] + bls = [(b, a) for a, b in bls] if times == "first3": if isinstance(hd2.times, np.ndarray): times = hd2.times[:3] From 15d15ba6ebf9180c4e94670d8bae75d29e485fa7 Mon Sep 17 00:00:00 2001 From: Steven Murray Date: Wed, 1 Feb 2023 14:04:27 -0700 Subject: [PATCH 3/4] add future annotations --- hera_cal/io.py | 1 + 1 file changed, 1 insertion(+) diff --git a/hera_cal/io.py b/hera_cal/io.py index 0579e2998..3d782ecda 100644 --- a/hera_cal/io.py +++ b/hera_cal/io.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Copyright 2019 the HERA Project # Licensed under the MIT License +from __future__ import annotations import numpy as np from collections import OrderedDict as odict From 7d9cf5317ec937f3ffd0665aa65436de66bd20a5 Mon Sep 17 00:00:00 2001 From: Steven Murray Date: Wed, 1 Feb 2023 16:59:42 -0700 Subject: [PATCH 4/4] fix Josh's comments --- hera_cal/io.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/hera_cal/io.py b/hera_cal/io.py index 3d782ecda..2cdf4f9f6 100644 --- a/hera_cal/io.py +++ b/hera_cal/io.py @@ -1111,7 +1111,7 @@ def empty_arrays(self): def read_hera_hdf5( - filenames, bls=None, pols=None, keep_times: list[float] | None=None, full_read_thresh=0.002, + filenames, bls=None, pols=None, keep_times: list[float] | None = None, full_read_thresh=0.002, read_data=True, read_flags=False, read_nsamples=False, check=False, dtype=np.complex128, verbose=False ): @@ -1260,7 +1260,7 @@ def read_hera_hdf5( bls = bls.union([bl + (p,) for bl in bls_len2 for p in pols]) # now, conjugate them if they're not in the data - bls = {(i, j, p) if (i, j) in info['bls'] else (j, i, utils.conj_pol(p)) for (i, j, p) in bls} + bls = {bl if bl[:2] in info['bls'] else utils.reverse_bl(bl) for bl in bls} # record polarizations as total of ones indexed in bls pols = set(bl[2] for bl in bls) @@ -1340,7 +1340,14 @@ def index_exp(i, j, p): # Now subselect the times. if keep_times is not None: - time_mask = np.array([t in keep_times for t in info['times']]) + time_mask = np.array([ + np.isclose( + keep_times, + t, + rtol=UVData._time_array.tols[0], + atol=UVData._time_array.tols[1], + ).any() for t in info['times'] + ]) info['times'] = info['times'][time_mask] for key in ['visdata', 'flags', 'nsamples']: if key in rv: @@ -1415,7 +1422,6 @@ def _adapt_metadata(self, info_dict, skip_lsts=False): info_dict['data_antpos'] = {ant: info_dict['antpos'][ant] for ant in info_dict['data_ants']} info_dict['times'] = np.unique(info_dict['times']) info_dict['times_by_bl'] = {ap: info_dict['times'] for ap in info_dict['antpairs']} - # info_dict['times_by_bl'].update({(a2, a1): info_dict['times'] for (a1, a2) in info_dict['antpairs']}) if not skip_lsts: info_dict['lsts'] = JD2LST(info_dict['times'], info_dict['latitude'], info_dict['longitude'], info_dict['altitude']) info_dict['lsts_by_bl'] = {ap: info_dict['lsts'] for ap in info_dict['antpairs']}