From 780760efb98bb1d2e697362903852e50e0ca87f2 Mon Sep 17 00:00:00 2001 From: Nicky Hochmuth Date: Mon, 25 Nov 2024 09:58:24 +0100 Subject: [PATCH] daily processing pipeline (#405) * bump all product versions to 2 * initial * daily processing task * add singleStep * move ANC-aspect from pipeline to daily_pipeline * removed requirement for the moment * fix #283 and option to process only latest fits file versions * update ddpd scripts * fix circular imports * test git dep * skio test on win * skip test on win/mac * Update stixcore/io/FlareListManager.py Co-authored-by: Shane Maloney * fix format error --------- Co-authored-by: Shane Maloney --- setup.cfg | 3 +- stixcore/ephemeris/manager.py | 49 ++- stixcore/ephemeris/tests/test_manager.py | 4 +- stixcore/io/FlareListManager.py | 132 ++++++++ stixcore/io/ProcessingHistoryStorage.py | 212 +++++++++++++ stixcore/io/fits/processors.py | 48 +++ stixcore/processing/AspectANC.py | 179 +++++++++++ stixcore/processing/FLtoL3.py | 89 ++++++ stixcore/processing/SingleStep.py | 185 +++++++++++ stixcore/processing/pipeline_daily.py | 265 ++++++++++++++++ stixcore/processing/publish.py | 22 +- stixcore/processing/tests/test_publish.py | 18 +- stixcore/products/ANC/aspect.py | 264 ++++++++++++++++ stixcore/products/__init__.py | 1 + stixcore/products/level2/housekeepingL2.py | 312 +------------------ stixcore/products/level3/flarelistproduct.py | 30 ++ stixcore/products/product.py | 4 + stixcore/products/tests/test_bsd.py | 18 +- stixcore/time/datetime.py | 7 +- stixcore/tmtc/packets.py | 2 +- stixcore/util/scripts/ddpd.py | 31 +- 21 files changed, 1510 insertions(+), 365 deletions(-) create mode 100644 stixcore/io/FlareListManager.py create mode 100644 stixcore/io/ProcessingHistoryStorage.py create mode 100644 stixcore/processing/AspectANC.py create mode 100644 stixcore/processing/FLtoL3.py create mode 100644 stixcore/processing/SingleStep.py create mode 100644 stixcore/processing/pipeline_daily.py create mode 100644 stixcore/products/ANC/aspect.py create mode 100644 stixcore/products/level3/flarelistproduct.py diff --git a/setup.cfg b/setup.cfg index e454f0d2..053cc5ab 100644 --- a/setup.cfg +++ b/setup.cfg @@ -29,8 +29,7 @@ install_requires = spiceypy sunpy>=4.0.1 watchdog==6.0.0 - requests - + stixdcpy==3.0 [options.entry_points] diff --git a/stixcore/ephemeris/manager.py b/stixcore/ephemeris/manager.py index 9e78a49e..831faf62 100644 --- a/stixcore/ephemeris/manager.py +++ b/stixcore/ephemeris/manager.py @@ -3,6 +3,7 @@ import logging from enum import Enum from pathlib import Path +from datetime import datetime from textwrap import wrap from collections import defaultdict @@ -70,6 +71,9 @@ class SpiceKernelType(Enum): class SpiceKernelManager(): """A class to manage Spice kernels in the local file system.""" + MK_DATE_PATTERN = re.compile(r"solo_ANC_soc.*_(\d{4})(\d{2})(\d{2})_.*.tm") + OLDEST_MK_DATE = datetime.fromisoformat("2019-01-01T00:00:00.000") + def __init__(self, path): """Creates a new SpiceKernelManager @@ -100,23 +104,36 @@ def _get_latest(self, kerneltype, *, top_n=1): return files[0:top_n] if top_n > 1 else [files[0]] - def get_latest_sclk(self, *, top_n=1): - return self._get_latest(SpiceKernelType.SCLK, top_n=top_n) - - def get_latest_lsk(self, *, top_n=1): - return self._get_latest(SpiceKernelType.LSK, top_n=top_n) - def get_latest_mk(self, *, top_n=1): - return self._get_latest(SpiceKernelType.MK, top_n=top_n) + return [SpiceKernelManager.get_mk_meta(mkp) + for mkp in self._get_latest(SpiceKernelType.MK, top_n=top_n)] def get_latest_mk_pred(self, *, top_n=1): - return self._get_latest(SpiceKernelType.MK_PRED, top_n=top_n) + return [SpiceKernelManager.get_mk_meta(mkp) + for mkp in self._get_latest(SpiceKernelType.MK_PRED, top_n=top_n)] def get_latest_mk_and_pred(self, *, top_n=1): - mks = self._get_latest(SpiceKernelType.MK_PRED, top_n=top_n) - mks.extend(self._get_latest(SpiceKernelType.MK, top_n=top_n)) + mks = self.get_latest_mk_pred(top_n=top_n) + mks.extend(self.get_latest_mk(top_n=top_n)) return mks + @classmethod + def get_mk_date(cls, path: Path): + try: + ds = SpiceKernelManager.MK_DATE_PATTERN.findall(path.name)[0] + return datetime.fromisoformat(f"{ds[0]}-{ds[1]}-{ds[2]}T00:00:00.000") + except Exception: + # return a date before the start so that comparisons will fail + # TODO check if this is OK + return SpiceKernelManager.OLDEST_MK_DATE + return None + + @classmethod + def get_mk_meta(cls, path: Path): + date = cls.get_mk_date(path) + type = "flown" if "-flown-" in path.name else "pred" + return (path, type, date) + def get_latest(self, kerneltype=SpiceKernelType.MK, *, top_n=1): """Finds the latest version of the spice kernel. @@ -149,7 +166,9 @@ def __init__(self, meta_kernel_pathes): # unload all old kernels spiceypy.kclear() - for meta_kernel_path in np.atleast_1d(meta_kernel_pathes): + self.mk_dates = defaultdict(lambda: SpiceKernelManager.OLDEST_MK_DATE) + + for meta_kernel_path, mk_type, mk_date in np.atleast_1d(meta_kernel_pathes): try: meta_kernel_path = Path(meta_kernel_path) if not meta_kernel_path.exists(): @@ -177,13 +196,17 @@ def __init__(self, meta_kernel_pathes): # load the meta kernel spiceypy.furnsh(str(abs_file)) logger.info(f"LOADING NEW META KERNEL: {meta_kernel_path}") - self.meta_kernel_path.append(meta_kernel_path) + self.meta_kernel_path.append((meta_kernel_path, mk_type, mk_date)) + self.mk_dates[mk_type] = max(self.mk_dates[mk_type], mk_date) except Exception as e: logger.warning(f"Failed LOADING NEW META KERNEL: {meta_kernel_path}\n{e}") if len(self.meta_kernel_path) == 0: raise ValueError(f"Failed to load any NEW META KERNEL: {meta_kernel_pathes}") + def get_mk_date(self, meta_kernel_type="flown"): + return self.mk_dates[meta_kernel_type] + @staticmethod def _wrap_value_field(field): r""" @@ -446,7 +469,7 @@ def get_fits_headers(self, *, start_time, average_time): except (SpiceBADPARTNUMBER, SpiceINVALIDSCLKSTRING): et = spiceypy.utc2et(average_time.isot) - headers = (('SPICE_MK', ', '.join([mk.name for mk in self.meta_kernel_path]), + headers = (('SPICE_MK', ', '.join([mkp.name for mkp, mkt, mkd in self.meta_kernel_path]), 'SPICE meta kernel file'),) header_results = defaultdict(lambda: '') diff --git a/stixcore/ephemeris/tests/test_manager.py b/stixcore/ephemeris/tests/test_manager.py index df7ec19d..5afe6275 100644 --- a/stixcore/ephemeris/tests/test_manager.py +++ b/stixcore/ephemeris/tests/test_manager.py @@ -1,3 +1,5 @@ +from datetime import datetime + import pytest from stixcore.config.config import CONFIG @@ -12,7 +14,7 @@ def spicekernelmanager(): def test_loader_nokernel(): with pytest.raises(ValueError) as e: - Spice(meta_kernel_pathes='notreal.mk') + Spice(meta_kernel_pathes=[('notreal.mk', "flown", datetime.now())]) assert str(e.value).startswith("Failed to load any NEW META KERNEL") diff --git a/stixcore/io/FlareListManager.py b/stixcore/io/FlareListManager.py new file mode 100644 index 00000000..f012cc3d --- /dev/null +++ b/stixcore/io/FlareListManager.py @@ -0,0 +1,132 @@ +import sys +import time +from datetime import datetime, timedelta + +import pandas as pd +from stixdcpy.net import Request as stixdcpy_req + +from stixcore.util.logging import get_logger +from stixcore.util.singleton import Singleton + +__all__ = ['FlareListManager', 'SDCFlareListManager'] + +logger = get_logger(__name__) + + +class FlareListManager(): + @property + def flarelist(self): + return self._flarelist + + @flarelist.setter + def flarelist(self, value): + self._flarelist = value + + @property + def flarelistname(self): + return type(self).__name__ + + +class SDCFlareListManager(FlareListManager, metaclass=Singleton): + """Manages a local copy of the operational flarelist provided by stix data datacenter + + TODO + """ + + def __init__(self, file, update=False): + """Creates the manager by pointing to the flarelist file (csv) and setting the update strategy. + + Parameters + ---------- + file : Path + points to the csv file + update : bool, optional + Update strategy: is the flarelist file updated via API?, by default False + """ + self.file = file + self.update = update + self._flarelist = SDCFlareListManager.read_flarelist(self.file, self.update) + + def __str__(self) -> str: + return (f"{self.flarelistname}: file: {self.file} update: " + f"{self.update} size: {len(self.flarelist)}") + + def update_list(self): + """Updates the flarelist file via api request. + + Will create a new file if not available or do a incremental update otherwise, + using the last entry time stamp. + """ + self.flarelist = SDCFlareListManager.read_flarelist(self.file, update=self.update) + + @classmethod + def read_flarelist(cls, file, update=False): + """Reads or creates the LUT of all BSD RIDs and the request reason comment. + + On creation or update an api endpoint from the STIX data center is used + to get the information and persists as a LUT locally. + + Parameters + ---------- + file : Path + path the to LUT file. + update : bool, optional + should the LUT be updated at start up?, by default False + + Returns + ------- + Table + the LUT od RIDs and request reasons. + """ + if update or not file.exists(): + # the api is limited to batch sizes of a month. in order to get the full table we have + # to ready each month after the start of STIX + last_date = datetime(2020, 1, 1, 0, 0, 0) + today = datetime.now() # - timedelta(days=60) + flare_df_lists = [] + if file.exists(): + old_list = pd.read_csv(file) + mds = old_list['start_UTC'].max() + try: + last_date = datetime.strptime(mds, '%Y-%m-%dT%H:%M:%S') + except ValueError: + last_date = datetime.strptime(mds, '%Y-%m-%dT%H:%M:%S.%f') + flare_df_lists = [old_list] + if not file.parent.exists(): + logger.info(f'path not found to flare list file dir: {file.parent} creating dir') + file.parent.mkdir(parents=True, exist_ok=True) + + try: + while (last_date < today): + last_date_1m = last_date + timedelta(days=30) + logger.info(f'download flare list chunk: {last_date.isoformat()}' + f'/{last_date_1m.isoformat()}') + flares = stixdcpy_req.fetch_flare_list(last_date.isoformat(), + last_date_1m.isoformat()) + last_date = last_date_1m + if len(flares) > 0: + flare_df_lists.append(pd.DataFrame(flares)) + logger.info(f'found {len(flares)} flares') + # the stix datacenter API is throttled to 2 calls per second + time.sleep(0.5) + except Exception: + logger.error("FLARELIST API ERROR", exc_info=True) + + full_flare_list = pd.concat(flare_df_lists) + + full_flare_list.drop_duplicates(inplace=True) + full_flare_list.sort_values(by="peak_UTC", inplace=True) + full_flare_list.reset_index(inplace=True, drop=True) + logger.info(f'write total {len(full_flare_list)} flares to local storage') + full_flare_list.to_csv(file, index_label=False) + else: + logger.info(f"read flare list from {file}") + full_flare_list = pd.read_csv(file) + + return full_flare_list + + +if 'pytest' in sys.modules: + # only set the global in test scenario + from stixcore.data.test import test_data + SDCFlareListManager.instance = SDCFlareListManager(test_data.rid_lut.RID_LUT, update=False) diff --git a/stixcore/io/ProcessingHistoryStorage.py b/stixcore/io/ProcessingHistoryStorage.py new file mode 100644 index 00000000..85650b0c --- /dev/null +++ b/stixcore/io/ProcessingHistoryStorage.py @@ -0,0 +1,212 @@ +import sqlite3 +from pathlib import Path +from datetime import datetime + +from stixcore.util.logging import get_logger + +logger = get_logger(__name__) + + +class ProcessingHistoryStorage: + """Persistent handler for meta data on already processed Products""" + + def __init__(self, filename): + """Create a new persistent handler. Will open or create the given sqlite DB file. + + Parameters + ---------- + filename : path like object + path to the sqlite database file + """ + self.conn = None + self.cur = None + self.filename = filename + self._connect_database() + + def _connect_database(self): + """Connects to the sqlite file or creates an empty one if not present.""" + try: + self.conn = sqlite3.connect(self.filename) + self.cur = self.conn.cursor() + logger.info('ProcessingHistoryStorage DB loaded from {}'.format(self.filename)) + + # TODO reactivate later + # self.cur.execute('''CREATE TABLE if not exists processed_flare_products ( + # id INTEGER PRIMARY KEY AUTOINCREMENT, + # flareid TEXT NOT NULL, + # flarelist TEXT NOT NULL, + # version INTEGER NOT NULL, + # name TEXT NOT NULL, + # level TEXT NOT NULL, + # type TEXT NOT NULL, + # fitspath TEXT NOT NULL, + # p_date FLOAT NOT NULL + # ) + # ''') + # self.cur.execute('''CREATE INDEX if not exists processed_flare_products_idx ON + # processed_flare_products (flareid, flarelist, version, name, + # level, type)''') + + self.cur.execute('''CREATE TABLE if not exists processed_fits_products ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + level TEXT NOT NULL, + type TEXT NOT NULL, + version INTEGER NOT NULL, + fits_in_path TEXT NOT NULL, + fits_out_path TEXT NOT NULL, + p_date TEXT NOT NULL + ) + ''') + + self.cur.execute('''CREATE INDEX if not exists processed_fits_products_idx ON + processed_fits_products + (name, level, type, version, fits_in_path)''') + + self.conn.commit() + except sqlite3.Error: + logger.error('Failed load DB from {}'.format(self.filename)) + self.close() + raise + + def close(self): + """Close the IDB connection.""" + if self.conn: + self.conn.commit() + self.conn.close() + self.cur = None + else: + logger.warning("DB connection already closed") + + def is_connected(self): + """Is the handler connected to the db file. + + returns + ------- + True | False + """ + if self.cur: + return True + return False + + # TODO reactivate later + # def count_processed_flare_products(self): + # """Counts the number of already processed flares in the DB + + # Returns + # ------- + # int + # number of processed flares + # """ + # return self.cur.execute("select count(1) from processed_flare_products").fetchone()[0] + + def count_processed_fits_products(self) -> int: + """Counts the number of entries in the DB + + Returns + ------- + int + number of tracked files + """ + return self.cur.execute("select count(1) from processed_fits_products").fetchone()[0] + + def add_processed_fits_products(self, name: str, level: str, type: str, version: int, + fits_in_path: Path, fits_out_path: Path, p_date: datetime): + """Adds a new file to the history of processed fits files. + + Parameters + ---------- + name : str + name of the generated product + level : str + level of the generated product + type : str + type of the generated product + version : int + version of the generated product + fits_in_path : Path + the path to the input file + fits_out_path : Path + the path of the generated product + p_date : datetime + processing time + """ + df = p_date.isoformat() + in_path = str(fits_in_path) + out_path = str(fits_out_path) + + self.cur.execute("""insert into processed_fits_products + (name, level, type, version, fits_in_path, fits_out_path, p_date) + values(?, ?, ?, ?, ?, ?, ?)""", + (name, level, type, version, in_path, out_path, df)) + logger.info(f"""insert into processed_fits_products + (name: '{name}', level: '{level}', type: '{type}', + version: '{version}', fits_in_path: '{in_path}', + fits_out_path: '{out_path}', p_date: '{df}')""") + + def has_processed_fits_products(self, name: str, level: str, type: str, version: int, + fits_in_path: Path, fits_in_create_time: datetime) -> bool: + """Checks if an entry for the given input file and target product exists and if + the processing was before the given datetime. + + Parameters + ---------- + name : str + name of the generated product + level : str + level of the generated product + type : str + type of the generated product + version : int + version of the generated product + fits_in_path : Path + the path to the input file + fits_in_create_time : datetime + the time the input file was created/updated + + Returns + ------- + bool + Was a processing already registered and was it later as the fileupdate time + """ + return self.cur.execute("""select count(1) from processed_fits_products where + fits_in_path = ? and + name = ? and + level = ? and + type = ? and + version = ? and + p_date > ? + """, (str(fits_in_path), name, level, type, version, + fits_in_create_time.isoformat())).fetchone()[0] > 0 + + # TODO reactivate later + # def add_processed_flare_products(self, product, path, flarelistid, flarelistname): + # fitspath = str(path) + # p_date = datetime.now().timestamp() + # ssid = product.ssid + # version = product.version + + # self.cur.execute("""insert into processed_flare_products + # (flareid, flarelist, version, ssid, fitspath, p_date) + # values(?, ?, ?, ?, ?, ?)""", + # (flarelistid, flarelistname, version, ssid, fitspath, p_date)) + + def _execute(self, sql, arguments=None, result_type='list'): + """Execute sql and return results in a list or a dictionary.""" + if not self.cur: + raise Exception('DB is not initialized!') + else: + if arguments: + self.cur.execute(sql, arguments) + else: + self.cur.execute(sql) + if result_type == 'list': + rows = self.cur.fetchall() + else: + rows = [ + dict( + zip([column[0] + for column in self.cur.description], row)) + for row in self.cur.fetchall() + ] + return rows diff --git a/stixcore/io/fits/processors.py b/stixcore/io/fits/processors.py index 5e0ebc58..43abddb2 100644 --- a/stixcore/io/fits/processors.py +++ b/stixcore/io/fits/processors.py @@ -971,3 +971,51 @@ def generate_primary_header(self, filename, product, *, version=0): ) return L1headers, L2headers + + +class FitsL3Processor(FitsL2Processor): + def __init__(self, archive_path): + super().__init__(archive_path) + + def write_fits(self, product, *, version=0): + """ + Write level 3 products into fits files. + + Parameters + ---------- + product : `stixcore.product.level3` + + version : `int` + the version modifier for the filename + default 0 = detect from codebase. + + Returns + ------- + list + of created file as `pathlib.Path` + + """ + if version == 0: + version = product.get_processing_version() + + return super().write_fits(product, version=version) + + def generate_primary_header(self, filename, product, *, version=0): + + if product.fits_header is None: + L2, o = super().generate_primary_header(filename, product, version=version) + L2headers = L2 + o + else: + L2headers = product.fits_header.items() + + # new or override keywords + L3headers = ( + # Name, Value, Comment + ('LEVEL', 'L3', 'Processing level of the data'), + ('VERS_SW', str(stixcore.__version__), 'Version of SW that provided FITS file'), + ('VERS_CFG', str(stixcore.__version_conf__), + 'Version of the common instrument configuration package'), + ('HISTORY', 'Processed by STIXCore L3'), + ) + + return L2headers, L3headers diff --git a/stixcore/processing/AspectANC.py b/stixcore/processing/AspectANC.py new file mode 100644 index 00000000..542fbb5f --- /dev/null +++ b/stixcore/processing/AspectANC.py @@ -0,0 +1,179 @@ +from pathlib import Path +from datetime import datetime, timedelta + +import numpy as np + +from astropy.io import fits +from astropy.table import QTable + +from stixcore.ephemeris.manager import Spice +from stixcore.io.ProcessingHistoryStorage import ProcessingHistoryStorage +from stixcore.processing.SingleStep import SingleProductProcessingStepMixin, TestForProcessingResult +from stixcore.processing.sswidl import SSWIDLProcessor +from stixcore.products.ANC.aspect import AspectIDLProcessing, Ephemeris +from stixcore.products.product import Product +from stixcore.soop.manager import SOOPManager +from stixcore.util.logging import get_logger +from stixcore.util.util import get_complete_file_name_and_path, get_incomplete_file_name_and_path + +__all__ = ['AspectANC'] + +logger = get_logger(__name__) + + +class AspectANC(SingleProductProcessingStepMixin): + """Processing step from a HK L1 fits file to a solo_ANC_stix-asp-ephemeris*.fits file. + """ + INPUT_PATTERN = "solo_L1_stix-hk-maxi_*.fits" + + def __init__(self, source_dir: Path, output_dir: Path): + """Crates a new Processor. + + Parameters + ---------- + source_dir : Path or pathlike + where to search for input fits files + output_dir : Path or pathlike + where to write out the generated fits files + """ + self.source_dir = Path(source_dir) + self.output_dir = Path(output_dir) + + def find_processing_candidates(self) -> list[Path]: + """Performs file pattern search in the source directory + + Returns + ------- + list[Path] + a list of fits files candidates + """ + return list(self.source_dir.rglob(self.ProductInputPattern)) + + def test_for_processing(self, candidate: Path, + phm: ProcessingHistoryStorage) -> TestForProcessingResult: + """_summary_ + + Parameters + ---------- + candidate : Path + a fits file candidate + phm : ProcessingHistoryStorage + the processing history persistent handler + + Returns + ------- + TestForProcessingResult + what should happen with the candidate in the next processing step + """ + try: + c_header = fits.getheader(candidate) + f_data_end = datetime.fromisoformat(c_header['DATE-END']) + f_create_date = datetime.fromisoformat(c_header['DATE']) + f_version = int(c_header['VERSION']) + + cfn = get_complete_file_name_and_path(candidate) + + was_processed = phm.has_processed_fits_products('ephemeris', 'ANC', 'asp', + Ephemeris.get_cls_processing_version(), + str(cfn), f_create_date) + + # found already in the processing history + if was_processed: + return TestForProcessingResult.NotSuitable + + # test if the version of the input fits files is older as the target version + # TODO check if this is not a to restrictive check + # search danamic for the latest version + if f_version < Ephemeris.get_cls_processing_version(): + return TestForProcessingResult.NotSuitable + + # safety margin of 1day until we start with processing of HK files into ANC-asp files + # only use flown spice kernels not predicted once as pointing information + # can be "very off" + if (f_data_end > (Spice.instance.get_mk_date(meta_kernel_type="flown") + - timedelta(hours=24))): + return TestForProcessingResult.NotSuitable + + return TestForProcessingResult.Suitable + except Exception as e: + logger.error(e) + return TestForProcessingResult.NotSuitable + + def process_fits_files(self, files: list[Path], *, soopmanager: SOOPManager, + spice_kernel_path: Path, processor, config) -> list[Path]: + """Performs the processing (expected to run in a dedicated python process) from a + list of solo_L1_stix-hk-maxi_*.fits into solo_ANC_stix-asp-ephemeris*.fits files. + + Parameters + ---------- + files : list[Path] + list of HK level 1 fits files + soopmanager : SOOPManager + the SOOPManager from the main process + spice_kernel_path : Path + the used spice kernel paths from the main process + processor : _type_ + the processor from the main process + config : + the config from the main process + + Returns + ------- + list[Path] + list of all generated fits files + """ + CONFIG = config + max_idlbatch = CONFIG.getint("IDLBridge", "batchsize", fallback=20) + SOOPManager.instance = soopmanager + Spice.instance = Spice(spice_kernel_path) + all_files = list() + + idlprocessor = SSWIDLProcessor(processor) + + for pf in files: + l1hk = Product(pf) + + data = QTable() + data['cha_diode0'] = l1hk.data['hk_asp_photoa0_v'] + data['cha_diode1'] = l1hk.data['hk_asp_photoa1_v'] + data['chb_diode0'] = l1hk.data['hk_asp_photob0_v'] + data['chb_diode1'] = l1hk.data['hk_asp_photob1_v'] + data['time'] = [d.strftime('%Y-%m-%dT%H:%M:%S.%f') + for d in l1hk.data['time'].to_datetime()] + data['scet_time_f'] = l1hk.data['time'].fine + data['scet_time_c'] = l1hk.data['time'].coarse + + # TODO set to seconds + dur = (l1hk.data['time'][1:] - l1hk.data['time'][0:-1]).as_float().value + data['duration'] = dur[0] + data['duration'][0:-1] = dur + data['duration'][:] = dur[-1] + + data['spice_disc_size'] = [Spice.instance.get_sun_disc_size(date=d) + for d in l1hk.data['time']] + + data['y_srf'] = 0.0 + data['z_srf'] = 0.0 + data['calib'] = 0.0 + data['sas_ok'] = np.byte(0) + data['error'] = "" + data['control_index'] = l1hk.data['control_index'] + + dataobj = dict() + for coln in data.colnames: + dataobj[coln] = data[coln].value.tolist() + + f = {'parentfits': str(get_incomplete_file_name_and_path(pf)), + 'data': dataobj} + + idlprocessor[AspectIDLProcessing].params['hk_files'].append(f) + idlprocessor.opentasks += 1 + + if idlprocessor.opentasks >= max_idlbatch: + all_files.extend(idlprocessor.process()) + idlprocessor = SSWIDLProcessor(processor) + + if idlprocessor.opentasks > 0: + all_files.extend(idlprocessor.process()) + + return all_files diff --git a/stixcore/processing/FLtoL3.py b/stixcore/processing/FLtoL3.py new file mode 100644 index 00000000..499f4c33 --- /dev/null +++ b/stixcore/processing/FLtoL3.py @@ -0,0 +1,89 @@ +from pathlib import Path +from collections import defaultdict +from concurrent.futures import ProcessPoolExecutor + +from stixcore.config.config import CONFIG +from stixcore.ephemeris.manager import Spice +from stixcore.io.fits.processors import FitsL1Processor, FitsL3Processor +from stixcore.soop.manager import SOOPManager +from stixcore.util.logging import get_logger +from stixcore.util.util import get_complete_file_name + +logger = get_logger(__name__) + + +class FLLevel3: + """Processing step from a flare list entry to L3. + """ + def __init__(self, source_dir, output_dir, dbfile): + self.source_dir = Path(source_dir) + self.output_dir = Path(output_dir) + self.dbfile = dbfile + self.processor = FitsL3Processor(self.output_dir) + + def process_fits_files(self, files): + all_files = list() + if files is None: + files = self.level0_files + product_types = defaultdict(list) + product_types_batch = defaultdict(int) + batch_size = CONFIG.getint('Pipeline', 'parallel_batchsize_L1', fallback=150) + + for file in files: + # group by service,subservice, ssid example: 'L0/21/6/30' as default + # or (prio, service, subservice, [SSID], [BATCH]) if all data is available + batch = 0 + prio = 3 + product_type = str(file.parent) + if 'L0' in file._parts: + product_type = tuple(map(int, file._parts[file._parts.index('L0')+1:-1])) + if (product_type[0] == 21 and + product_type[-1] in {20, 21, 22, 23, 24, 42}): # sci data + product_types_batch[product_type] += 1 + prio = 2 + elif product_type[0] == 21: # ql data + prio = 1 + batch = product_types_batch[product_type] // batch_size + product_types[(prio, ) + product_type + (batch, )].append(file) + + jobs = [] + with ProcessPoolExecutor() as executor: + # simple heuristic that the daily QL data takes longest so we start early + for pt, files in sorted(product_types.items()): + jobs.append(executor.submit(process_type, files, + processor=FitsL1Processor(self.output_dir), + soopmanager=SOOPManager.instance, + spice_kernel_path=Spice.instance.meta_kernel_path, + config=CONFIG)) + + for job in jobs: + try: + new_files = job.result() + all_files.extend(new_files) + except Exception: + logger.error('error', exc_info=True) + + return list(set(all_files)) + + +def process_type(timeranges, productcls, flarelistparent, *, processor, + soopmanager, spice_kernel_path, config): + SOOPManager.instance = soopmanager + all_files = list() + Spice.instance = Spice(spice_kernel_path) + CONFIG = config + file = 1 + prod = productcls() + for tr in timeranges: + logger.info(f"processing timerange: {timeranges}") + try: + # see https://github.com/i4Ds/STIXCore/issues/350 + get_complete_file_name(file.name) + l3 = prod.from_timerange(tr, flarelistparent=flarelistparent) + all_files.extend(processor.write_fits(l3)) + except Exception as e: + logger.error('Error processing timerange %s', tr, exc_info=True) + logger.error('%s', e) + if CONFIG.getboolean('Logging', 'stop_on_error', fallback=False): + raise e + return all_files diff --git a/stixcore/processing/SingleStep.py b/stixcore/processing/SingleStep.py new file mode 100644 index 00000000..7e304e2e --- /dev/null +++ b/stixcore/processing/SingleStep.py @@ -0,0 +1,185 @@ +import re +from enum import Enum +from pathlib import Path +from datetime import datetime +from collections import defaultdict +from concurrent.futures import ProcessPoolExecutor + +import stixcore.io.fits.processors as fitsp +from stixcore.config.config import CONFIG +from stixcore.ephemeris.manager import Spice +from stixcore.products.product import GenericProduct +from stixcore.soop.manager import SOOPManager +from stixcore.util.logging import get_logger +from stixcore.util.util import get_complete_file_name + +__all__ = ['TestForProcessingResult', 'SingleProductProcessingStepMixin'] + +logger = get_logger(__name__) + + +class TestForProcessingResult(Enum): + # do nothing with the candidate + NotSuitable = 0 + # add to ignore list so it will not be tested again + ToIgnore = 1 + # go on with the processing + Suitable = 2 + + +class SingleProcessingStepResult(): + def __init__(self, name: str, level: str, type: str, version: int, + out_path: Path, in_path: Path, date: datetime): + """Creates a SingleProcessingStepResult + + Parameters + ---------- + name : str + the name of the generated product + level : str + the level of the generated product + type : str + the type of the generated product + version : int + the version of the generated product + out_path : Path + path to the generated file + in_path : Path + the path of the used input file + date : datetime + when was the processing performed + """ + self.name = name + self.level = level + self.type = type + self.version = version + self.out_path = out_path + self.in_path = in_path + self.date = date + + +class SingleProductProcessingStepMixin(): + INPUT_PATTERN = "*.fits" + VERSION_PATTERN = re.compile(r"(.*_V)([0-9]+)U?([\._].*)") + + @property + def ProductInputPattern(cls): + return cls.INPUT_PATTERN + + @property + def test_for_processing(self, path: Path) -> TestForProcessingResult: + pass + + def find_processing_candidates(self) -> list[Path]: + pass + + def get_version(cls, candidates: list[Path], version='latest') -> list[Path]: + + if not version == "latest": + version = int(str(version).lower().replace("v", "")) + + index = defaultdict(dict) + for f in candidates: + f_name = f.name + match = cls.VERSION_PATTERN.match(f_name) + if match: + f_key = f"{match.group(1)}__{match.group(3)}" + f_version = int(match.group(2)) + else: + f_key = f_name + f_version = -1 + + index[f_key][f_version] = f + + version_files = [] + for f_key in index: + versions = index[f_key].keys() + v = max(versions) if version == "latest" else version + if v in versions: + version_files.append(index[f_key][v]) + + return version_files + + def process(self, product: GenericProduct) -> GenericProduct: + pass + + def write_fits(self, product: GenericProduct, folder: Path) -> Path: + pass + + +# NOT NEEDED NOW +class FLLevel3: + """Processing step from a flare list entry to L3. + """ + def __init__(self, source_dir, output_dir, dbfile): + self.source_dir = Path(source_dir) + self.output_dir = Path(output_dir) + self.dbfile = dbfile + self.processor = fitsp.FitsL3Processor(self.output_dir) + + def process_fits_files(self, files): + all_files = list() + if files is None: + files = self.level0_files + product_types = defaultdict(list) + product_types_batch = defaultdict(int) + batch_size = CONFIG.getint('Pipeline', 'parallel_batchsize_L1', fallback=150) + + for file in files: + # group by service,subservice, ssid example: 'L0/21/6/30' as default + # or (prio, service, subservice, [SSID], [BATCH]) if all data is available + batch = 0 + prio = 3 + product_type = str(file.parent) + if 'L0' in file._parts: + product_type = tuple(map(int, file._parts[file._parts.index('L0')+1:-1])) + if (product_type[0] == 21 and + product_type[-1] in {20, 21, 22, 23, 24, 42}): # sci data + product_types_batch[product_type] += 1 + prio = 2 + elif product_type[0] == 21: # ql data + prio = 1 + batch = product_types_batch[product_type] // batch_size + product_types[(prio, ) + product_type + (batch, )].append(file) + + jobs = [] + with ProcessPoolExecutor() as executor: + # simple heuristic that the daily QL data takes longest so we start early + for pt, files in sorted(product_types.items()): + jobs.append(executor.submit(process_type, files, + processor=fitsp.FitsL1Processor(self.output_dir), + soopmanager=SOOPManager.instance, + spice_kernel_path=Spice.instance.meta_kernel_path, + config=CONFIG)) + + for job in jobs: + try: + new_files = job.result() + all_files.extend(new_files) + except Exception: + logger.error('error', exc_info=True) + + return list(set(all_files)) + + +def process_type(timeranges, productcls, flarelistparent, *, processor, + soopmanager, spice_kernel_path, config): + SOOPManager.instance = soopmanager + all_files = list() + Spice.instance = Spice(spice_kernel_path) + CONFIG = config + file = 1 + prod = productcls() + for tr in timeranges: + logger.info(f"processing timerange: {timeranges}") + try: + # see https://github.com/i4Ds/STIXCore/issues/350 + get_complete_file_name(file.name) + l3 = prod.from_timerange(tr, flarelistparent=flarelistparent) + all_files.extend(processor.write_fits(l3)) + except Exception as e: + logger.error('Error processing timerange %s', tr, exc_info=True) + logger.error('%s', e) + if CONFIG.getboolean('Logging', 'stop_on_error', fallback=False): + raise e + return all_files diff --git a/stixcore/processing/pipeline_daily.py b/stixcore/processing/pipeline_daily.py new file mode 100644 index 00000000..69d8fa9c --- /dev/null +++ b/stixcore/processing/pipeline_daily.py @@ -0,0 +1,265 @@ +import sys +import shutil +import logging +import smtplib +import argparse +from pathlib import Path +from datetime import date +from concurrent.futures import ProcessPoolExecutor + +from stixcore.config.config import CONFIG +from stixcore.ephemeris.manager import Spice, SpiceKernelManager +from stixcore.io.fits.processors import FitsL2Processor +from stixcore.io.ProcessingHistoryStorage import ProcessingHistoryStorage +from stixcore.io.RidLutManager import RidLutManager +from stixcore.processing.AspectANC import AspectANC +from stixcore.processing.pipeline import PipelineStatus +from stixcore.processing.SingleStep import SingleProcessingStepResult, TestForProcessingResult +from stixcore.soop.manager import SOOPManager +from stixcore.util.logging import STX_LOGGER_DATE_FORMAT, STX_LOGGER_FORMAT, get_logger +from stixcore.util.util import get_complete_file_name_and_path + +logger = get_logger(__name__) + + +class DailyPipelineErrorReport(logging.StreamHandler): + """Adds file and mail report Handler to a processing step.""" + def __init__(self, log_file: Path, log_level): + """Create a PipelineErrorReport + """ + logging.StreamHandler.__init__(self) + + self.log_file = log_file + self.err_file = log_file.with_suffix(".err") + + self.fh = logging.FileHandler(filename=self.log_file, mode="a+") + self.fh.setFormatter(logging.Formatter(STX_LOGGER_FORMAT, datefmt=STX_LOGGER_DATE_FORMAT)) + self.fh.setLevel(logging.getLevelName(log_level)) + + self.setLevel(logging.ERROR) + self.allright = True + self.error = None + logging.getLogger().addHandler(self) + logging.getLogger().addHandler(self.fh) + + def emit(self, record): + """Called in case of a logging event.""" + self.allright = False + self.error = record + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + logging.getLogger().removeHandler(self) + self.fh.flush() + logging.getLogger().removeHandler(self.fh) + + if not self.allright: + shutil.copyfile(self.log_file, self.err_file) + if CONFIG.getboolean('Pipeline', 'error_mail_send', fallback=False): + try: + sender = CONFIG.get('Pipeline', 'error_mail_sender', fallback='') + receivers = CONFIG.get('Pipeline', 'error_mail_receivers').split(",") + host = CONFIG.get('Pipeline', 'error_mail_smpt_host', fallback='localhost') + port = CONFIG.getint('Pipeline', 'error_mail_smpt_port', fallback=25) + smtp_server = smtplib.SMTP(host=host, port=port) + message = f"""Subject: StixCore Daily Processing Error + +Error while processing + +login to pub099.cs.technik.fhnw.ch and check: + +{self.err_file} + +StixCore +========================== +do not answer to this mail. +""" + smtp_server.sendmail(sender, receivers, message) + except Exception as e: + logger.error(f"Error: unable to send error email: {e}") + + +def run_daily_pipeline(args): + """CLI STIX daily processing step to generate fits products + + Parameters + ---------- + args : list + see -h for details + + Returns + ------- + list + list of generated fits files paths + """ + + parser = argparse.ArgumentParser(description='STIX publish to ESA processing step', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + + # pathes + parser.add_argument("-d", "--db_file", + help="Path to the history publishing database", type=str, + default=CONFIG.get('Pipeline', 'history_db_file', + fallback=str(Path.home() / "processed.sqlite"))) + + parser.add_argument("-i", "--fits_in_dir", + help="input fits directory", + default=CONFIG.get('Paths', 'fits_archive'), type=str) + + parser.add_argument("-o", "--fits_out_dir", + help="output fits directory", + default=CONFIG.get('Paths', 'fits_archive'), type=str) + + parser.add_argument("-s", "--spice_dir", + help="directory to the spice kernels files", + default=CONFIG.get('Paths', 'spice_kernels'), type=str) + + parser.add_argument("-S", "--spice_file", + help="path to the spice meta kernel", + default=None, type=str) + + parser.add_argument("-p", "--soop_dir", + help="directory to the SOOP files", + default=CONFIG.get('Paths', 'soop_files'), type=str) + + parser.add_argument("-O", "--log_dir", + help="output directory for daily logging ", + default=CONFIG.get('Publish', 'log_dir', fallback=str(Path.home())), + type=str, dest='log_dir') + + parser.add_argument("--continue_on_error", + help="the pipeline reports any error and continues processing", + default=not CONFIG.getboolean('Logging', 'stop_on_error', fallback=False), + action='store_false', dest='stop_on_error') + + parser.add_argument("--log_level", + help="the level of logging", + default="INFO", type=str, + choices=["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"], + dest='log_level') + + parser.add_argument("-r", "--rid_lut_file", + help=("Path to the rid LUT file"), + default=CONFIG.get('Publish', 'rid_lut_file'), type=str) + + parser.add_argument("--update_rid_lut", + help="update rid lut file before publishing", + default=False, + action='store_true', dest='update_rid_lut') + + args = parser.parse_args(args) + + # pathes + CONFIG.set('Paths', 'fits_archive', args.fits_in_dir) + CONFIG.set('Paths', 'spice_kernels', args.spice_dir) + CONFIG.set('Paths', 'soop_files', args.soop_dir) + + # logging + CONFIG.set('Logging', 'stop_on_error', str(args.stop_on_error)) + + # generate a log file for each run and an error file in case of any errors + with DailyPipelineErrorReport(Path(args.log_dir) / + f"dailypipeline_{date.today().strftime('%Y%m%d')}.log", + args.log_level): + + # set up the singletons + if args.spice_file: + spicemeta = [SpiceKernelManager.get_mk_meta(Path(args.spice_file))] + else: + _spm = SpiceKernelManager(Path(CONFIG.get('Paths', 'spice_kernels'))) + spicemeta = _spm.get_latest_mk_and_pred() + + Spice.instance = Spice(spicemeta) + + SOOPManager.instance = SOOPManager(Path(CONFIG.get('Paths', 'soop_files'))) + + RidLutManager.instance = RidLutManager(Path(args.rid_lut_file), update=args.update_rid_lut) + + Path(CONFIG.get('Paths', 'fits_archive')) + + db_file = Path(args.db_file) + fits_in_dir = Path(args.fits_in_dir) + if not fits_in_dir.exists(): + logger.error(f'path not found to input files: {fits_in_dir}') + return + + fits_out_dir = Path(args.fits_out_dir) + if not fits_out_dir.exists(): + logger.error(f'path not found to input files: {fits_out_dir}') + return + + PipelineStatus.log_setup() + + logger.info("PARAMETER:") + logger.info(f'db_file: {db_file}') + logger.info(f'fits_in_dir: {fits_in_dir}') + logger.info(f'fits_out_dir: {fits_out_dir}') + logger.info(f"send Mail report: {CONFIG.getboolean('Pipeline', 'error_mail_send')}") + logger.info(f"receivers: {CONFIG.get('Pipeline', 'error_mail_receivers')}") + logger.info(f'log dir: {args.log_dir}') + logger.info(f'log level: {args.log_level}') + + logger.info("\nstart daily pipeline\n") + + phs = ProcessingHistoryStorage(db_file) + + aspect_anc_processor = AspectANC(fits_in_dir, fits_out_dir) + + l2_fits_writer = FitsL2Processor(fits_out_dir) + + # should be done later: internally + hk_in_files = [] + candidates = aspect_anc_processor.find_processing_candidates() + v_candidates = aspect_anc_processor.get_version(candidates, version="latest") + for fc in v_candidates: + tr = aspect_anc_processor.test_for_processing(fc, phs) + if tr == TestForProcessingResult.Suitable: + hk_in_files.append(fc) + + # let each processing "task" run in its own process + jobs = [] + with ProcessPoolExecutor() as executor: + jobs.append(executor.submit(aspect_anc_processor.process_fits_files, hk_in_files, + soopmanager=SOOPManager.instance, + spice_kernel_path=Spice.instance.meta_kernel_path, + processor=l2_fits_writer, + config=CONFIG)) + + # wait for all processes to end + all_files = [] + for job in jobs: + try: + # collect all generated fits files form each process + new_files = job.result() + all_files.extend(new_files) + except Exception: + logger.error('error', exc_info=True) + + # create an entry for each generated file in the ProcessingHistoryStorage + for pr in all_files: + if isinstance(pr, SingleProcessingStepResult): + phs.add_processed_fits_products(pr.name, pr.level, pr.type, pr.version, + get_complete_file_name_and_path(pr.in_path), + pr.out_path, pr.date) + + phs.close() + + all_files = list(set(all_files)) + + # write out all generated fits file in a dedicated log file + out_file = Path(args.log_dir) / f"dailypipeline_{date.today().strftime('%Y%m%d')}.out" + with open(out_file, 'a+') as res_f: + for f in all_files: + res_f.write(f"{str(f)}\n") + + return all_files + + +def main(): + run_daily_pipeline(sys.argv[1:]) + + +if __name__ == '__main__': + main() diff --git a/stixcore/processing/publish.py b/stixcore/processing/publish.py index 21bfa816..c011cdb2 100644 --- a/stixcore/processing/publish.py +++ b/stixcore/processing/publish.py @@ -10,7 +10,7 @@ from enum import Enum from pprint import pformat from pathlib import Path -from datetime import date, datetime +from datetime import date, datetime, timezone, timedelta from collections import defaultdict from paramiko import SSHClient @@ -26,6 +26,7 @@ from stixcore.io.RidLutManager import RidLutManager from stixcore.processing.pipeline_status import pipeline_status from stixcore.products.product import Product +from stixcore.time.datetime import SCETime from stixcore.util.logging import get_logger from stixcore.util.util import get_complete_file_name, is_incomplete_file_name @@ -175,7 +176,7 @@ def add(self, path): dir = str(path.parent) date = datetime.now().timestamp() parts = name[:-5].split('_') - version = int(parts[4].lower().replace("v", "")) + version = int(parts[4].lower().replace("v", "").replace("u", "")) esa_name = '_'.join(parts[0:5]) m_date = path.stat().st_ctime try: @@ -634,6 +635,7 @@ def publish_fits_to_esa(args): candidates = fits_dir.rglob("solo_*.fits") to_publish = list() now = datetime.now().timestamp() + next_week = datetime.now(timezone.utc) + timedelta(hours=24*7) hist = PublishHistoryStorage(db_file) n_candidates = 0 @@ -693,6 +695,7 @@ def publish_fits_to_esa(args): logger.info(f'Spice: {spice.meta_kernel_path}') logger.info("\nstart publishing\n") + published = defaultdict(list) for c in candidates: n_candidates += 1 @@ -740,7 +743,6 @@ def publish_fits_to_esa(args): if args.sort_files: to_publish = sorted(to_publish) - published = defaultdict(list) for p in to_publish: try: comment = add_BSD_comment(p) @@ -751,6 +753,20 @@ def publish_fits_to_esa(args): add_res, data = hist.add(p) + try: + # do not publish products from the future + p_header = fits.getheader(p) + if 'DATE-BEG' in p_header and ":" not in p_header['DATE-BEG']: + p_data_beg = datetime.fromisoformat(p_header['DATE-BEG']) + else: + p_data_beg = SCETime.from_float(p_header['OBT_BEG'] * u.s).to_datetime() + if p_data_beg > next_week: + hist.set_result(PublishResult.IGNORED, data[0]["id"]) + published[PublishResult.IGNORED].append(data) + continue + except Exception: + pass + if p.name in blacklist_files: hist.set_result(PublishResult.BLACKLISTED, data[0]["id"]) published[PublishResult.BLACKLISTED].append(data) diff --git a/stixcore/processing/tests/test_publish.py b/stixcore/processing/tests/test_publish.py index 8ebdabce..e0944c78 100644 --- a/stixcore/processing/tests/test_publish.py +++ b/stixcore/processing/tests/test_publish.py @@ -1,5 +1,6 @@ import re +import sys from pathlib import Path from unittest.mock import patch @@ -193,7 +194,7 @@ def test_fits_incomplete_switch_over(out_dir): files_first = [] processor = FitsL1Processor(fits_dir) - for stime in [707875174, 708739179, 709739179]: + for stime in [707875174, 708739179, 709739179, 1999999999]: beg = SCETime(coarse=stime, fine=0) end = SCETime(coarse=stime + 10, fine=2 ** 15) product.scet_timerange = SCETimeRange(start=beg, end=end) @@ -236,7 +237,7 @@ def test_fits_incomplete_switch_over(out_dir): files_first.extend(processor.write_fits(product)) - assert len(files_first) == 3 + assert len(files_first) == 4 assert get_complete_file_name(files_first[0].name) != files_first[0].name assert get_incomplete_file_name(files_first[0].name) == files_first[0].name @@ -250,8 +251,10 @@ def test_fits_incomplete_switch_over(out_dir): assert res published_files = res[PublishResult.PUBLISHED] - # after the publishing all files will have the complete name + # the one file from the future should be ignored + assert len(res[PublishResult.IGNORED]) == 1 + # after the publishing all files will have the complete name assert len(published_files) == 3 # no conflicting esa names expected assert len(published_files[0]) == 1 @@ -263,7 +266,7 @@ def test_fits_incomplete_switch_over(out_dir): published_files[0][0]['name'] files_last = [] - for f in files_first: + for f in files_first[:-1]: # not the last one from future # read all FITS files and modify the data slightly p = Product(get_complete_file_name_and_path(f)) # old data.fcounts = [t1: 1, t2: 2] @@ -286,6 +289,8 @@ def test_fits_incomplete_switch_over(out_dir): assert (p.data['fcounts'] == [3, 2]).all() +@pytest.mark.skipif(sys.platform.startswith('win'), reason="does not run on windows") +@pytest.mark.skipif(sys.platform != "Darwin", reason="does not run on mac") def test_fits_incomplete_switch_over_remove_dup_files(out_dir): test_fits_incomplete_switch_over(out_dir) @@ -298,9 +303,10 @@ def test_fits_incomplete_switch_over_remove_dup_files(out_dir): cfiles = [f for f in fits_dir.rglob('*.fits') if p.match(f.name)] assert len(ufiles) == 3 - assert len(cfiles) == 3 + assert len(cfiles) == 4 ufiles[0].unlink() + ufiles[1].unlink() cfiles[1].unlink() res = incomplete_twins(['--fits_dir', str(fits_dir), @@ -311,7 +317,7 @@ def test_fits_incomplete_switch_over_remove_dup_files(out_dir): assert len(res) == 1 moved = list(target_dir.rglob('*.fits')) assert len(moved) == 1 - assert moved[0].name == cfiles[2].name + assert moved[0].name == cfiles[3].name @patch('stixcore.products.level1.scienceL1.Spectrogram') diff --git a/stixcore/products/ANC/aspect.py b/stixcore/products/ANC/aspect.py new file mode 100644 index 00000000..694631e1 --- /dev/null +++ b/stixcore/products/ANC/aspect.py @@ -0,0 +1,264 @@ +import json +from pathlib import Path +from datetime import datetime +from collections import defaultdict + +import numpy as np + +import astropy.units as u +from astropy.table import QTable + +from stixcore.ephemeris.manager import Spice +from stixcore.processing.SingleStep import SingleProcessingStepResult +from stixcore.processing.sswidl import SSWIDLTask +from stixcore.products import Product +from stixcore.products.product import GenericProduct, L2Mixin +from stixcore.time import SCETime, SCETimeDelta, SCETimeRange +from stixcore.util.logging import get_logger +from stixcore.util.util import get_complete_file_name + +__all__ = ['Ephemeris', 'AspectIDLProcessing'] + +logger = get_logger(__name__) + + +class AspectIDLProcessing(SSWIDLTask): + """A IDL Task that will calculate the aspect solution based on HK product data input.""" + def __init__(self): + script = ''' + workdir = '{{ work_dir }}' + print, workdir + cd, workdir + + ; I/O directories: + ; - location of some parameter files + param_dir = workdir + d + 'SAS_param' + d + calib_file = param_dir + 'SAS_calib_20211005.sav' + aperfile = param_dir + 'apcoord_FM_circ.sav' + simu_data_file = param_dir + 'SAS_simu.sav' + + hk_files = JSON_PARSE('{{ hk_files }}', /TOSTRUCT) + + data = [] + processed_files = [] + FOREACH hk_file, hk_files, file_index DO BEGIN + catch, error + if error ne 0 then begin + print, hk_file.parentfits, 'A IDL error occured: ' + !error_state.msg + data_f.error = "FATAL_IDL_ERROR" + data = [data, data_f] + catch, /cancel + continue + endif + + print, hk_file.parentfits + print, "" + flush, -1 + processed_files = [processed_files, hk_file.parentfits] + data_f = [] + for i=0L, n_elements(hk_file.DATA.cha_diode0)-1 do begin + data_e = { stx_aspect_dto, $ + cha_diode0: hk_file.DATA.cha_diode0[i], $ + cha_diode1: hk_file.DATA.cha_diode1[i], $ + chb_diode0: hk_file.DATA.chb_diode0[i], $ + chb_diode1: hk_file.DATA.chb_diode1[i], $ + time: hk_file.DATA.time[i], $ + scet_time_c: hk_file.DATA.scet_time_c[i], $ + scet_time_f: hk_file.DATA.scet_time_f[i], $ + duration : hk_file.DATA.duration[i], $ + spice_disc_size : hk_file.DATA.spice_disc_size[i], $ + y_srf : hk_file.DATA.y_srf[i], $ + z_srf : hk_file.DATA.z_srf[i], $ + calib : hk_file.DATA.calib[i], $ + sas_ok : fix(hk_file.DATA.sas_ok[i]), $ + error : hk_file.DATA.error[i], $ + control_index : hk_file.DATA.control_index[i], $ + parentfits : file_index $ + } + data_f = [data_f, data_e] + endfor + + ;fake error + ;if file_index eq 2 then begin + ; zu = processed_files[1000] + ;endif + ; START ASPECT PROCESSING + + help, data_e, /str + print, n_elements(data_f) + flush, -1 + + print,"Calibrating data..." + flush, -1 + ; First, substract dark currents and applies relative gains + stx_calib_sas_data, data_f, calib_file + + ; copy result in a new object + data_calib = data_f + ; Added 2023-09-18: remove data points with some error detected during calibration + stx_remove_bad_sas_data, data_calib + + ; Now automatically compute global calibration correction factor and applies it + ; Note: this takes a bit of time + stx_auto_scale_sas_data, data_calib, simu_data_file, aperfile + + cal_corr_factor = data_calib[0].calib + data_f.CHA_DIODE0 *= cal_corr_factor + data_f.CHA_DIODE1 *= cal_corr_factor + data_f.CHB_DIODE0 *= cal_corr_factor + data_f.CHB_DIODE1 *= cal_corr_factor + + print,"Computing aspect solution..." + stx_derive_aspect_solution, data_f, simu_data_file, interpol_r=1, interpol_xy=1 + + print,"END Computing aspect solution..." + flush, -1 + ; END ASPECT PROCESSING + + data = [data, data_f] + ENDFOREACH + + stx_gsw_version, version = idlgswversion + + undefine, hk_file, hk_files, data_e, i, di, data_f, d + +''' + super().__init__(script=script, work_dir='stix/idl/processing/aspect/', + params={'hk_files': list()}) + + def pack_params(self): + """Preprocessing step applying json formatting to the input data. + + Returns + ------- + dict + the pre processed data + """ + packed = self.params.copy() + logger.info("calling IDL for hk files:") + for f in packed['hk_files']: + logger.info(f" {f['parentfits']}") + packed['hk_files'] = json.dumps(packed['hk_files']) + return packed + + def postprocessing(self, result, fits_processor): + """Postprocessing step after the IDL tasks returned the aspect data. + + The aspect data and additional auxiliary data will be compiled + into `Ephemeris` data product and written out to fits file. + + Parameters + ---------- + result : dict + the result of the IDL aspect solution + fits_processor : FitsProcessor + a fits processor to write out product as fits + + Returns + ------- + list + a list of all written fits files + """ + files = [] + logger.info("returning from IDL") + if 'data' in result and 'processed_files' in result: + for file_idx, resfile in enumerate(result.processed_files): + file_path = Path(resfile.decode()) + logger.info(f"IDL postprocessing HK file: {resfile}") + HK = Product(file_path) + + control = HK.control + data = QTable() + + idldata = result.data[result.data["parentfits"] == file_idx] + n = len(idldata) + + data['time'] = SCETime(coarse=idldata['scet_time_c'], fine=idldata['scet_time_f']) + # data['timedel'] = SCETimeDelta.from_float(idldata["duration"] * u.s) + # we have instantaneous data so the integration time is set to 0 + data['timedel'] = SCETimeDelta(coarse=0, fine=0) + data['time_utc'] = [t.decode() for t in idldata['time']] + # [datetime.strptime(t.decode(), '%Y-%m-%dT%H:%M:%S.%f') for t in idldata['time']] + data['control_index'] = idldata['control_index'] + data['spice_disc_size'] = (idldata['spice_disc_size'] * u.arcsec).astype(np.float32) + data['y_srf'] = (idldata['y_srf'] * u.arcsec).astype(np.float32) + data['z_srf'] = (idldata['z_srf'] * u.arcsec).astype(np.float32) + data['sas_ok'] = (idldata['sas_ok']).astype(np.bool_) + data['sas_ok'].description = "0: not usable, 1: good" + data['sas_error'] = [e.decode() if hasattr(e, 'decode') else e + for e in idldata['error']] + + data['solo_loc_carrington_lonlat'] = np.tile(np.array([0.0, 0.0]), (n, 1)).\ + astype(np.float32) * u.deg + data['solo_loc_carrington_dist'] = np.tile(np.array([0.0]), (n, 1)).\ + astype(np.float32) * u.km + data['solo_loc_heeq_zxy'] = np.tile(np.array([0.0, 0.0, 0.0]), (n, 1)).\ + astype(np.float32) * u.km + data['roll_angle_rpy'] = np.tile(np.array([0.0, 0.0, 0.0]), (n, 1)).\ + astype(np.float32) * u.deg + + for idx, d in enumerate(data['time']): + orient, dist, car, heeq = Spice.instance.get_auxiliary_positional_data(date=d) + + data[idx]['solo_loc_carrington_lonlat'] = car.to('deg').astype(np.float32) + data[idx]['solo_loc_carrington_dist'] = dist.to('km').astype(np.float32) + data[idx]['solo_loc_heeq_zxy'] = heeq.to('km').astype(np.float32) + data[idx]['roll_angle_rpy'] = orient.to('deg').astype(np.float32) + + control['parent'] = get_complete_file_name(file_path.name) + + aux = Ephemeris(control=control, data=data, idb_versions=HK.idb_versions) + + aux.add_additional_header_keyword( + ('STX_GSW', result.idlgswversion[0].decode(), + 'Version of STX-GSW that provided data')) + aux.add_additional_header_keyword( + ('HISTORY', 'aspect data processed by STX-GSW', '')) + files.extend([SingleProcessingStepResult(aux.name, aux.level, aux.type, + aux.get_processing_version(), + fop, file_path, datetime.now()) + for fop in fits_processor.write_fits(aux)]) + else: + logger.error("IDL ERROR") + + return files + + +class Ephemeris(GenericProduct, L2Mixin): + """Ephemeris data, including spacecraft attitude and coordinates as well as STIX + pointing with respect to Sun center as derived from the STIX aspect system. + + In ANC product format. + """ + PRODUCT_PROCESSING_VERSION = 2 + + def __init__(self, *, service_type=0, service_subtype=0, ssid=1, control, data, + idb_versions=defaultdict(SCETimeRange), **kwargs): + super().__init__(service_type=service_type, service_subtype=service_subtype, + ssid=ssid, control=control, data=data, + idb_versions=idb_versions, **kwargs) + self.name = 'ephemeris' + self.level = 'ANC' + self.type = 'asp' + self.ssid = 1 + self.service_subtype = 0 + self.service_type = 0 + + @property + def dmin(self): + return np.nanmin([self.data['y_srf'].min().to_value('arcsec'), + self.data['z_srf'].min().to_value('arcsec')]) + + @property + def dmax(self): + return np.nanmax([self.data['y_srf'].max().to_value('arcsec'), + self.data['z_srf'].max().to_value('arcsec')]) + + @property + def bunit(self): + return 'arcsec' + + @classmethod + def is_datasource_for(cls, *, service_type, service_subtype, ssid, **kwargs): + return (kwargs['level'] == 'ANC' and service_type == 0 + and service_subtype == 0 and ssid == 1) diff --git a/stixcore/products/__init__.py b/stixcore/products/__init__.py index bbba5398..c4565966 100644 --- a/stixcore/products/__init__.py +++ b/stixcore/products/__init__.py @@ -1,4 +1,5 @@ from stixcore.products.product import Product # noqa +from stixcore.products.ANC.aspect import Ephemeris from stixcore.products.level0.housekeepingL0 import * from stixcore.products.level0.quicklookL0 import * from stixcore.products.level0.scienceL0 import * diff --git a/stixcore/products/level2/housekeepingL2.py b/stixcore/products/level2/housekeepingL2.py index 09224070..3a84b12d 100644 --- a/stixcore/products/level2/housekeepingL2.py +++ b/stixcore/products/level2/housekeepingL2.py @@ -1,25 +1,14 @@ """ House Keeping data products """ -import json -from pathlib import Path from collections import defaultdict -import numpy as np - -import astropy.units as u -from astropy.table import QTable - -from stixcore.ephemeris.manager import Spice -from stixcore.processing.sswidl import SSWIDLProcessor, SSWIDLTask -from stixcore.products import Product from stixcore.products.level0.housekeepingL0 import HKProduct from stixcore.products.product import L2Mixin -from stixcore.time import SCETime, SCETimeDelta, SCETimeRange +from stixcore.time import SCETimeRange from stixcore.util.logging import get_logger -from stixcore.util.util import get_complete_file_name, get_incomplete_file_name_and_path -__all__ = ['MiniReport', 'MaxiReport', 'Ephemeris', 'AspectIDLProcessing'] +__all__ = ['MiniReport', 'MaxiReport'] logger = get_logger(__name__) @@ -59,304 +48,7 @@ def __init__(self, *, service_type, service_subtype, ssid, control, data, self.level = 'L2' self.type = 'hk' - @classmethod - def from_level1(cls, l1product, parent='', idlprocessor=None): - - # create a l2 HK product - l2 = cls(service_type=l1product.service_type, - service_subtype=l1product.service_subtype, - ssid=l1product.ssid, - control=l1product.control, - data=l1product.data, - idb_versions=l1product.idb_versions) - - l2.control.replace_column('parent', [Path(parent).name] * len(l2.control)) - l2.fits_header = l1product.fits_header - - # use the HK data to generate aux data product in a seperate task - if isinstance(idlprocessor, SSWIDLProcessor): - - data = QTable() - data['cha_diode0'] = l2.data['hk_asp_photoa0_v'] - data['cha_diode1'] = l2.data['hk_asp_photoa1_v'] - data['chb_diode0'] = l2.data['hk_asp_photob0_v'] - data['chb_diode1'] = l2.data['hk_asp_photob1_v'] - data['time'] = [d.strftime('%Y-%m-%dT%H:%M:%S.%f') - for d in l2.data['time'].to_datetime()] - data['scet_time_f'] = l2.data['time'].fine - data['scet_time_c'] = l2.data['time'].coarse - - # TODO set to seconds - dur = (l2.data['time'][1:] - l2.data['time'][0:-1]).as_float().value - data['duration'] = dur[0] - data['duration'][0:-1] = dur - data['duration'][:] = dur[-1] - - # data['spice_disc_size'] = Spice.instance.get_sun_disc_size(date=l2.data['time']) - data['spice_disc_size'] = [Spice.instance.get_sun_disc_size(date=d) - for d in l2.data['time']] - - data['y_srf'] = 0.0 - data['z_srf'] = 0.0 - data['calib'] = 0.0 - data['sas_ok'] = np.byte(0) - data['error'] = "" - data['control_index'] = l2.data['control_index'] - - dataobj = dict() - for coln in data.colnames: - dataobj[coln] = data[coln].value.tolist() - - f = {'parentfits': str(get_incomplete_file_name_and_path(parent)), - 'data': dataobj} - - idlprocessor[AspectIDLProcessing].params['hk_files'].append(f) - idlprocessor.opentasks += 1 - - # currently the HK L2 product is the same as L1 - # TODO add real calibration supress the writeout - return [l2] - @classmethod def is_datasource_for(cls, *, service_type, service_subtype, ssid, **kwargs): return (kwargs['level'] == 'L2' and service_type == 3 and service_subtype == 25 and ssid == 2) - - -class AspectIDLProcessing(SSWIDLTask): - """A IDL Task that will calculate the aspect solution based on HK product data input.""" - def __init__(self): - script = ''' - workdir = '{{ work_dir }}' - print, workdir - cd, workdir - - ; I/O directories: - ; - location of some parameter files - param_dir = workdir + d + 'SAS_param' + d - calib_file = param_dir + 'SAS_calib_20211005.sav' - aperfile = param_dir + 'apcoord_FM_circ.sav' - simu_data_file = param_dir + 'SAS_simu.sav' - - hk_files = JSON_PARSE('{{ hk_files }}', /TOSTRUCT) - - data = [] - processed_files = [] - FOREACH hk_file, hk_files, file_index DO BEGIN - catch, error - if error ne 0 then begin - print, hk_file.parentfits, 'A IDL error occured: ' + !error_state.msg - data_f.error = "FATAL_IDL_ERROR" - data = [data, data_f] - catch, /cancel - continue - endif - - print, hk_file.parentfits - print, "" - flush, -1 - processed_files = [processed_files, hk_file.parentfits] - data_f = [] - for i=0L, n_elements(hk_file.DATA.cha_diode0)-1 do begin - data_e = { stx_aspect_dto, $ - cha_diode0: hk_file.DATA.cha_diode0[i], $ - cha_diode1: hk_file.DATA.cha_diode1[i], $ - chb_diode0: hk_file.DATA.chb_diode0[i], $ - chb_diode1: hk_file.DATA.chb_diode1[i], $ - time: hk_file.DATA.time[i], $ - scet_time_c: hk_file.DATA.scet_time_c[i], $ - scet_time_f: hk_file.DATA.scet_time_f[i], $ - duration : hk_file.DATA.duration[i], $ - spice_disc_size : hk_file.DATA.spice_disc_size[i], $ - y_srf : hk_file.DATA.y_srf[i], $ - z_srf : hk_file.DATA.z_srf[i], $ - calib : hk_file.DATA.calib[i], $ - sas_ok : fix(hk_file.DATA.sas_ok[i]), $ - error : hk_file.DATA.error[i], $ - control_index : hk_file.DATA.control_index[i], $ - parentfits : file_index $ - } - data_f = [data_f, data_e] - endfor - - ;fake error - ;if file_index eq 2 then begin - ; zu = processed_files[1000] - ;endif - ; START ASPECT PROCESSING - - help, data_e, /str - print, n_elements(data_f) - flush, -1 - - print,"Calibrating data..." - flush, -1 - ; First, substract dark currents and applies relative gains - stx_calib_sas_data, data_f, calib_file - - ; copy result in a new object - data_calib = data_f - ; Added 2023-09-18: remove data points with some error detected during calibration - stx_remove_bad_sas_data, data_calib - - ; Now automatically compute global calibration correction factor and applies it - ; Note: this takes a bit of time - stx_auto_scale_sas_data, data_calib, simu_data_file, aperfile - - cal_corr_factor = data_calib[0].calib - data_f.CHA_DIODE0 *= cal_corr_factor - data_f.CHA_DIODE1 *= cal_corr_factor - data_f.CHB_DIODE0 *= cal_corr_factor - data_f.CHB_DIODE1 *= cal_corr_factor - - print,"Computing aspect solution..." - stx_derive_aspect_solution, data_f, simu_data_file, interpol_r=1, interpol_xy=1 - - print,"END Computing aspect solution..." - flush, -1 - ; END ASPECT PROCESSING - - data = [data, data_f] - ENDFOREACH - - stx_gsw_version, version = idlgswversion - - undefine, hk_file, hk_files, data_e, i, di, data_f, d - -''' - super().__init__(script=script, work_dir='stix/idl/processing/aspect/', - params={'hk_files': list()}) - - def pack_params(self): - """Preprocessing step applying json formatting to the input data. - - Returns - ------- - dict - the pre processed data - """ - packed = self.params.copy() - logger.info("calling IDL for hk files:") - for f in packed['hk_files']: - logger.info(f" {f['parentfits']}") - packed['hk_files'] = json.dumps(packed['hk_files']) - return packed - - def postprocessing(self, result, fits_processor): - """Postprocessing step after the IDL tasks returned the aspect data. - - The aspect data and additional auxiliary data will be compiled - into `Ephemeris` data product and written out to fits file. - - Parameters - ---------- - result : dict - the result of the IDL aspect solution - fits_processor : FitsProcessor - a fits processor to write out product as fits - - Returns - ------- - list - a list of all written fits files - """ - files = [] - logger.info("returning from IDL") - if 'data' in result and 'processed_files' in result: - for file_idx, resfile in enumerate(result.processed_files): - file_path = Path(resfile.decode()) - logger.info(f"IDL postprocessing HK file: {resfile}") - HK = Product(file_path) - - control = HK.control - data = QTable() - - idldata = result.data[result.data["parentfits"] == file_idx] - n = len(idldata) - - data['time'] = SCETime(coarse=idldata['scet_time_c'], fine=idldata['scet_time_f']) - # data['timedel'] = SCETimeDelta.from_float(idldata["duration"] * u.s) - # we have instantaneous data so the integration time is set to 0 - data['timedel'] = SCETimeDelta(coarse=0, fine=0) - data['time_utc'] = [t.decode() for t in idldata['time']] - # [datetime.strptime(t.decode(), '%Y-%m-%dT%H:%M:%S.%f') for t in idldata['time']] - data['control_index'] = idldata['control_index'] - data['spice_disc_size'] = (idldata['spice_disc_size'] * u.arcsec).astype(np.float32) - data['y_srf'] = (idldata['y_srf'] * u.arcsec).astype(np.float32) - data['z_srf'] = (idldata['z_srf'] * u.arcsec).astype(np.float32) - data['sas_ok'] = (idldata['sas_ok']).astype(np.bool_) - data['sas_ok'].description = "0: not usable, 1: good" - data['sas_error'] = [e.decode() if hasattr(e, 'decode') else e - for e in idldata['error']] - - data['solo_loc_carrington_lonlat'] = np.tile(np.array([0.0, 0.0]), (n, 1)).\ - astype(np.float32) * u.deg - data['solo_loc_carrington_dist'] = np.tile(np.array([0.0]), (n, 1)).\ - astype(np.float32) * u.km - data['solo_loc_heeq_zxy'] = np.tile(np.array([0.0, 0.0, 0.0]), (n, 1)).\ - astype(np.float32) * u.km - data['roll_angle_rpy'] = np.tile(np.array([0.0, 0.0, 0.0]), (n, 1)).\ - astype(np.float32) * u.deg - - for idx, d in enumerate(data['time']): - orient, dist, car, heeq = Spice.instance.get_auxiliary_positional_data(date=d) - - data[idx]['solo_loc_carrington_lonlat'] = car.to('deg').astype(np.float32) - data[idx]['solo_loc_carrington_dist'] = dist.to('km').astype(np.float32) - data[idx]['solo_loc_heeq_zxy'] = heeq.to('km').astype(np.float32) - data[idx]['roll_angle_rpy'] = orient.to('deg').astype(np.float32) - - control['parent'] = get_complete_file_name(file_path.name) - - aux = Ephemeris(control=control, data=data, idb_versions=HK.idb_versions) - - aux.add_additional_header_keyword( - ('STX_GSW', result.idlgswversion[0].decode(), - 'Version of STX-GSW that provided data')) - aux.add_additional_header_keyword( - ('HISTORY', 'aspect data processed by STX-GSW', '')) - files.extend(fits_processor.write_fits(aux)) - else: - logger.error("IDL ERROR") - - return files - - -class Ephemeris(HKProduct, L2Mixin): - """Ephemeris data, including spacecraft attitude and coordinates as well as STIX - pointing with respect to Sun center as derived from the STIX aspect system. - - In level 2 format. - """ - PRODUCT_PROCESSING_VERSION = 2 - - def __init__(self, *, service_type=0, service_subtype=0, ssid=1, control, data, - idb_versions=defaultdict(SCETimeRange), **kwargs): - super().__init__(service_type=service_type, service_subtype=service_subtype, - ssid=ssid, control=control, data=data, - idb_versions=idb_versions, **kwargs) - self.name = 'ephemeris' - self.level = 'ANC' - self.type = 'asp' - self.ssid = 1 - self.service_subtype = 0 - self.service_type = 0 - - @property - def dmin(self): - return np.nanmin([self.data['y_srf'].min().to_value('arcsec'), - self.data['z_srf'].min().to_value('arcsec')]) - - @property - def dmax(self): - return np.nanmax([self.data['y_srf'].max().to_value('arcsec'), - self.data['z_srf'].max().to_value('arcsec')]) - - @property - def bunit(self): - return 'arcsec' - - @classmethod - def is_datasource_for(cls, *, service_type, service_subtype, ssid, **kwargs): - return (kwargs['level'] == 'ANC' and service_type == 0 - and service_subtype == 0 and ssid == 1) diff --git a/stixcore/products/level3/flarelistproduct.py b/stixcore/products/level3/flarelistproduct.py new file mode 100644 index 00000000..d6280a81 --- /dev/null +++ b/stixcore/products/level3/flarelistproduct.py @@ -0,0 +1,30 @@ +from stixcore.products.product import GenericProduct, L3Mixin +from stixcore.time.datetime import SCETimeRange + + +class FlareListProduct(GenericProduct, L3Mixin): + """Product not based on direct TM data but on time ranges defined in flare lists. + + In level 3 format. + """ + + @classmethod + def from_timerange(cls, timerange: SCETimeRange, *, flarelistparent: str = ''): + pass + + +class FlareOverviewImage(FlareListProduct): + PRODUCT_PROCESSING_VERSION = 1 + + def __init__(self, *, service_type=0, service_subtype=0, ssid=2, **kwargs): + self.name = 'overview' + self.level = 'L3' + self.type = 'img' + self.ssid = ssid + self.service_subtype = service_subtype + self.service_type = service_type + + @classmethod + def is_datasource_for(cls, *, service_type, service_subtype, ssid, **kwargs): + return (kwargs['level'] == 'L3' and service_type == 0 + and service_subtype == 0 and ssid == 2) diff --git a/stixcore/products/product.py b/stixcore/products/product.py index b3730457..acf3ef9a 100644 --- a/stixcore/products/product.py +++ b/stixcore/products/product.py @@ -905,6 +905,10 @@ def from_level1(cls, l1product, parent='', idlprocessor=None): return [l2] +class L3Mixin(FitsHeaderMixin): + pass + + class DefaultProduct(GenericProduct, L1Mixin, L2Mixin): """ Default product use when not QL or BSD. diff --git a/stixcore/products/tests/test_bsd.py b/stixcore/products/tests/test_bsd.py index 8db4bce2..1b0e8fcd 100644 --- a/stixcore/products/tests/test_bsd.py +++ b/stixcore/products/tests/test_bsd.py @@ -7,6 +7,7 @@ from stixcore.data.test import test_data from stixcore.products.level0 import scienceL0 as sl0 from stixcore.products.level1 import scienceL1 as sl1 +from stixcore.time import SCETime @pytest.fixture @@ -15,17 +16,17 @@ def out_dir(tmp_path): testpackets = [(test_data.tmtc.TM_21_6_20_complete, sl0.RawPixelData, sl1.RawPixelData, - 'xray-rpd', '0640971848f00000', '0640971950f00000', 6), + 'xray-rpd', '640971848:0', '640971968:0', 6), (test_data.tmtc.TM_21_6_21, sl0.CompressedPixelData, sl1.CompressedPixelData, - 'xray-cpd', '0658880585f52427', '0658880585f58981', 1), + 'xray-cpd', '658880585:52428', '658880586:52428', 1), (test_data.tmtc.TM_21_6_21_complete, sl0.CompressedPixelData, - sl1.CompressedPixelData, 'xray-cpd', '0640274394f06553', '0640274476f06553', 5), + sl1.CompressedPixelData, 'xray-cpd', '640274394:6554', '640274494:6554', 5), (test_data.tmtc.TM_21_6_24, sl0.Spectrogram, sl1.Spectrogram, - 'xray-spec', '0659402043f39320', '0659402958f32767', 54), + 'xray-spec', '659402043:39322', '659402958:32768', 54), (test_data.tmtc.TM_21_6_23_complete, sl0.Visibility, sl1.Visibility, - 'xray-vis', '0642038387f06553', '0642038403f32767', 5), + 'xray-vis', '642038387:6554', '642038407:6554', 5), (test_data.tmtc.TM_21_6_42_complete, sl0.Aspect, sl1.Aspect, - 'aspect-burst', '0645932472f05485', '0645933132f52624', 2105)] + 'aspect-burst', '645932471:62633', '645933124:48429', 2105)] @patch('stixcore.products.levelb.binary.LevelB') @@ -41,9 +42,8 @@ def test_xray(levelb, out_dir, packets): assert xray_L0.level == 'L0' assert xray_L0.name == name - # TODO enable time tests again - # assert str(xray_L0.obs_beg) == beg - # assert str(xray_L0.obs_end) == end + assert xray_L0.scet_timerange.start == SCETime.from_string(beg) + assert xray_L0.scet_timerange.end == SCETime.from_string(end) assert len(xray_L0.data) == size xray_L1 = cl_l1.from_level0(xray_L0, parent='parent.l0.fits') diff --git a/stixcore/time/datetime.py b/stixcore/time/datetime.py index 2bd60dff..22fb92e9 100644 --- a/stixcore/time/datetime.py +++ b/stixcore/time/datetime.py @@ -3,7 +3,6 @@ """ import logging import operator -from datetime import datetime import numpy as np from sunpy.time.timerange import TimeRange @@ -452,12 +451,8 @@ def to_datetime(self, raise_error=False): except TypeError: utc = Spice.instance.scet_to_datetime(self.to_string()) - flown_mks = sorted([k for k in Spice.instance.meta_kernel_path if 'flown' in k.name]) + kernel_date = Spice.instance.get_mk_date(meta_kernel_type="flown") - try: - kernel_date = datetime.strptime(flown_mks[-1].name.split('_')[4], '%Y%m%d') - except IndexError: - kernel_date = datetime.fromtimestamp(flown_mks.lstat().st_ctime) bad = [t.replace(tzinfo=None) > kernel_date for t in (utc if isinstance(utc, list) else [utc])] if any(bad): diff --git a/stixcore/tmtc/packets.py b/stixcore/tmtc/packets.py index ffb6d172..10ca7555 100644 --- a/stixcore/tmtc/packets.py +++ b/stixcore/tmtc/packets.py @@ -679,7 +679,7 @@ def export(self, descr=False): p['header'] = h p['parameters'] = d p['idb_version'] = self.get_idb().version - p['spice_kernel'] = [p.name for p in Spice.instance.meta_kernel_path] + p['spice_kernel'] = [mkp.name for mkp, mkt, mkd in Spice.instance.meta_kernel_path] p['run_id'] = 0 return p diff --git a/stixcore/util/scripts/ddpd.py b/stixcore/util/scripts/ddpd.py index cf9b769c..74ce0bf8 100644 --- a/stixcore/util/scripts/ddpd.py +++ b/stixcore/util/scripts/ddpd.py @@ -275,7 +275,7 @@ def product(file_in): typenames = {"sci": "Science Data", "hk": "Housekeeping Data", "ql": "Quicklook Data", - "aux": "Auxilary Data", + "asp": "Ancillary Aspect Data", "cal": "Calibration"} collector = defaultdict(lambda: defaultdict(list)) @@ -294,16 +294,16 @@ def product(file_in): "L0/21/6/42/solo_L0_stix-sci-aspect-burst_0687412111-0687419343_V02_2110130059.fits", # noqa "L0/21/6/24/solo_L0_stix-sci-xray-spec_0689786926-0689801914_V02_2111090002-50819.fits", # noqa # QL - "L0/21/6/31/solo_L0_stix-ql-background_0668822400_V02U.fits", - "L0/21/6/34/solo_L0_stix-ql-flareflag_0684547200_V02U.fits", - "L0/21/6/30/solo_L0_stix-ql-lightcurve_0684892800_V02U.fits", - "L0/21/6/33/solo_L0_stix-ql-variance_0687484800_V02U.fits", - "L0/21/6/32/solo_L0_stix-ql-spectra_0680400000_V02U.fits", + "L0/21/6/31/solo_L0_stix-ql-background_0668822400_V02.fits", + "L0/21/6/34/solo_L0_stix-ql-flareflag_0684547200_V02.fits", + "L0/21/6/30/solo_L0_stix-ql-lightcurve_0684892800_V02.fits", + "L0/21/6/33/solo_L0_stix-ql-variance_0687484800_V02.fits", + "L0/21/6/32/solo_L0_stix-ql-spectra_0680400000_V02.fits", # HK - "L0/3/25/2/solo_L0_stix-hk-maxi_0647913600_V02U.fits", - "L0/3/25/1/solo_L0_stix-hk-mini_0643507200_V02U.fits", + "L0/3/25/2/solo_L0_stix-hk-maxi_0647913600_V02.fits", + "L0/3/25/1/solo_L0_stix-hk-mini_0643507200_V02.fits", # CAL - "L0/21/6/41/solo_L0_stix-cal-energy_0640137600_V02U.fits", + "L0/21/6/41/solo_L0_stix-cal-energy_0640137600_V02.fits", # L1 # science @@ -320,19 +320,22 @@ def product(file_in): "L1/2020/06/16/QL/solo_L1_stix-ql-variance_20200616_V02.fits", "L1/2021/11/16/QL/solo_L1_stix-ql-spectra_20211116_V02.fits", # HK - "L1/2020/06/16/HK/solo_L1_stix-hk-maxi_20200616_V02U.fits", - "L1/2021/09/20/HK/solo_L1_stix-hk-mini_20210920_V02U.fits", + "L1/2020/06/16/HK/solo_L1_stix-hk-maxi_20200616_V02.fits", + "L1/2021/09/20/HK/solo_L1_stix-hk-mini_20210920_V02.fits", # CAL - "L1/2023/02/13/CAL/solo_L1_stix-cal-energy_20230213_V02U.fits", + "L1/2023/02/13/CAL/solo_L1_stix-cal-energy_20230213_V02.fits", - # L2 - "L2/2023/06/06/AUX/solo_L2_stix-aux-ephemeris_20230606_V02U.fits" + # ANC + # "ANC/2023/06/06/AUX/solo_L2_stix-aux-ephemeris_20230606_V02U.fits" ] remote = ["https://pub099.cs.technik.fhnw.ch/data/fits/" + x for x in files] # files = ["/home/shane/fits_test/" + x for x in files] files = [("/data/stix/out/fits_v1.2.0/" + x, remote[i]) for i, x in enumerate(files)] + files.append(("/data/stix/out/test/fits_ANC/ANC/2024/04/11/ASP/solo_ANC_stix-asp-ephemeris_20240411_V02U.fits", # noqa + "https://pub099.cs.technik.fhnw.ch/data/fits/ANC/2024/04/11/ASP/solo_ANC_stix-asp-ephemeris_20240411_V02U.fits")) # noqa + with tempfile.TemporaryDirectory() as tempdir: temppath = Path(tempdir) for f in files: