Skip to content

Commit

Permalink
daily processing pipeline (#405)
Browse files Browse the repository at this point in the history
* bump all product versions to 2

* initial

* daily processing task

* add singleStep

* move ANC-aspect from pipeline to daily_pipeline

* removed requirement for the moment

* fix #283 and option to process only latest fits file versions

* update ddpd scripts

* fix circular imports

* test git dep

* skio test on win

* skip test on win/mac

* Update stixcore/io/FlareListManager.py

Co-authored-by: Shane Maloney <[email protected]>

* fix format error

---------

Co-authored-by: Shane Maloney <[email protected]>
  • Loading branch information
nicHoch and samaloney authored Nov 25, 2024
1 parent 5262249 commit 780760e
Show file tree
Hide file tree
Showing 21 changed files with 1,510 additions and 365 deletions.
3 changes: 1 addition & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ install_requires =
spiceypy
sunpy>=4.0.1
watchdog==6.0.0
requests

stixdcpy==3.0


[options.entry_points]
Expand Down
49 changes: 36 additions & 13 deletions stixcore/ephemeris/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
from enum import Enum
from pathlib import Path
from datetime import datetime
from textwrap import wrap
from collections import defaultdict

Expand Down Expand Up @@ -70,6 +71,9 @@ class SpiceKernelType(Enum):
class SpiceKernelManager():
"""A class to manage Spice kernels in the local file system."""

MK_DATE_PATTERN = re.compile(r"solo_ANC_soc.*_(\d{4})(\d{2})(\d{2})_.*.tm")
OLDEST_MK_DATE = datetime.fromisoformat("2019-01-01T00:00:00.000")

def __init__(self, path):
"""Creates a new SpiceKernelManager
Expand Down Expand Up @@ -100,23 +104,36 @@ def _get_latest(self, kerneltype, *, top_n=1):

return files[0:top_n] if top_n > 1 else [files[0]]

def get_latest_sclk(self, *, top_n=1):
return self._get_latest(SpiceKernelType.SCLK, top_n=top_n)

def get_latest_lsk(self, *, top_n=1):
return self._get_latest(SpiceKernelType.LSK, top_n=top_n)

def get_latest_mk(self, *, top_n=1):
return self._get_latest(SpiceKernelType.MK, top_n=top_n)
return [SpiceKernelManager.get_mk_meta(mkp)
for mkp in self._get_latest(SpiceKernelType.MK, top_n=top_n)]

def get_latest_mk_pred(self, *, top_n=1):
return self._get_latest(SpiceKernelType.MK_PRED, top_n=top_n)
return [SpiceKernelManager.get_mk_meta(mkp)
for mkp in self._get_latest(SpiceKernelType.MK_PRED, top_n=top_n)]

def get_latest_mk_and_pred(self, *, top_n=1):
mks = self._get_latest(SpiceKernelType.MK_PRED, top_n=top_n)
mks.extend(self._get_latest(SpiceKernelType.MK, top_n=top_n))
mks = self.get_latest_mk_pred(top_n=top_n)
mks.extend(self.get_latest_mk(top_n=top_n))
return mks

@classmethod
def get_mk_date(cls, path: Path):
try:
ds = SpiceKernelManager.MK_DATE_PATTERN.findall(path.name)[0]
return datetime.fromisoformat(f"{ds[0]}-{ds[1]}-{ds[2]}T00:00:00.000")
except Exception:
# return a date before the start so that comparisons will fail
# TODO check if this is OK
return SpiceKernelManager.OLDEST_MK_DATE
return None

@classmethod
def get_mk_meta(cls, path: Path):
date = cls.get_mk_date(path)
type = "flown" if "-flown-" in path.name else "pred"
return (path, type, date)

def get_latest(self, kerneltype=SpiceKernelType.MK, *, top_n=1):
"""Finds the latest version of the spice kernel.
Expand Down Expand Up @@ -149,7 +166,9 @@ def __init__(self, meta_kernel_pathes):
# unload all old kernels
spiceypy.kclear()

for meta_kernel_path in np.atleast_1d(meta_kernel_pathes):
self.mk_dates = defaultdict(lambda: SpiceKernelManager.OLDEST_MK_DATE)

for meta_kernel_path, mk_type, mk_date in np.atleast_1d(meta_kernel_pathes):
try:
meta_kernel_path = Path(meta_kernel_path)
if not meta_kernel_path.exists():
Expand Down Expand Up @@ -177,13 +196,17 @@ def __init__(self, meta_kernel_pathes):
# load the meta kernel
spiceypy.furnsh(str(abs_file))
logger.info(f"LOADING NEW META KERNEL: {meta_kernel_path}")
self.meta_kernel_path.append(meta_kernel_path)
self.meta_kernel_path.append((meta_kernel_path, mk_type, mk_date))
self.mk_dates[mk_type] = max(self.mk_dates[mk_type], mk_date)
except Exception as e:
logger.warning(f"Failed LOADING NEW META KERNEL: {meta_kernel_path}\n{e}")

if len(self.meta_kernel_path) == 0:
raise ValueError(f"Failed to load any NEW META KERNEL: {meta_kernel_pathes}")

def get_mk_date(self, meta_kernel_type="flown"):
return self.mk_dates[meta_kernel_type]

@staticmethod
def _wrap_value_field(field):
r"""
Expand Down Expand Up @@ -446,7 +469,7 @@ def get_fits_headers(self, *, start_time, average_time):
except (SpiceBADPARTNUMBER, SpiceINVALIDSCLKSTRING):
et = spiceypy.utc2et(average_time.isot)

headers = (('SPICE_MK', ', '.join([mk.name for mk in self.meta_kernel_path]),
headers = (('SPICE_MK', ', '.join([mkp.name for mkp, mkt, mkd in self.meta_kernel_path]),
'SPICE meta kernel file'),)

header_results = defaultdict(lambda: '')
Expand Down
4 changes: 3 additions & 1 deletion stixcore/ephemeris/tests/test_manager.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from datetime import datetime

import pytest

from stixcore.config.config import CONFIG
Expand All @@ -12,7 +14,7 @@ def spicekernelmanager():

def test_loader_nokernel():
with pytest.raises(ValueError) as e:
Spice(meta_kernel_pathes='notreal.mk')
Spice(meta_kernel_pathes=[('notreal.mk', "flown", datetime.now())])
assert str(e.value).startswith("Failed to load any NEW META KERNEL")


Expand Down
132 changes: 132 additions & 0 deletions stixcore/io/FlareListManager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import sys
import time
from datetime import datetime, timedelta

import pandas as pd
from stixdcpy.net import Request as stixdcpy_req

from stixcore.util.logging import get_logger
from stixcore.util.singleton import Singleton

__all__ = ['FlareListManager', 'SDCFlareListManager']

logger = get_logger(__name__)


class FlareListManager():
@property
def flarelist(self):
return self._flarelist

@flarelist.setter
def flarelist(self, value):
self._flarelist = value

@property
def flarelistname(self):
return type(self).__name__


class SDCFlareListManager(FlareListManager, metaclass=Singleton):
"""Manages a local copy of the operational flarelist provided by stix data datacenter
TODO
"""

def __init__(self, file, update=False):
"""Creates the manager by pointing to the flarelist file (csv) and setting the update strategy.
Parameters
----------
file : Path
points to the csv file
update : bool, optional
Update strategy: is the flarelist file updated via API?, by default False
"""
self.file = file
self.update = update
self._flarelist = SDCFlareListManager.read_flarelist(self.file, self.update)

def __str__(self) -> str:
return (f"{self.flarelistname}: file: {self.file} update: "
f"{self.update} size: {len(self.flarelist)}")

def update_list(self):
"""Updates the flarelist file via api request.
Will create a new file if not available or do a incremental update otherwise,
using the last entry time stamp.
"""
self.flarelist = SDCFlareListManager.read_flarelist(self.file, update=self.update)

@classmethod
def read_flarelist(cls, file, update=False):
"""Reads or creates the LUT of all BSD RIDs and the request reason comment.
On creation or update an api endpoint from the STIX data center is used
to get the information and persists as a LUT locally.
Parameters
----------
file : Path
path the to LUT file.
update : bool, optional
should the LUT be updated at start up?, by default False
Returns
-------
Table
the LUT od RIDs and request reasons.
"""
if update or not file.exists():
# the api is limited to batch sizes of a month. in order to get the full table we have
# to ready each month after the start of STIX
last_date = datetime(2020, 1, 1, 0, 0, 0)
today = datetime.now() # - timedelta(days=60)
flare_df_lists = []
if file.exists():
old_list = pd.read_csv(file)
mds = old_list['start_UTC'].max()
try:
last_date = datetime.strptime(mds, '%Y-%m-%dT%H:%M:%S')
except ValueError:
last_date = datetime.strptime(mds, '%Y-%m-%dT%H:%M:%S.%f')
flare_df_lists = [old_list]
if not file.parent.exists():
logger.info(f'path not found to flare list file dir: {file.parent} creating dir')
file.parent.mkdir(parents=True, exist_ok=True)

try:
while (last_date < today):
last_date_1m = last_date + timedelta(days=30)
logger.info(f'download flare list chunk: {last_date.isoformat()}'
f'/{last_date_1m.isoformat()}')
flares = stixdcpy_req.fetch_flare_list(last_date.isoformat(),
last_date_1m.isoformat())
last_date = last_date_1m
if len(flares) > 0:
flare_df_lists.append(pd.DataFrame(flares))
logger.info(f'found {len(flares)} flares')
# the stix datacenter API is throttled to 2 calls per second
time.sleep(0.5)
except Exception:
logger.error("FLARELIST API ERROR", exc_info=True)

full_flare_list = pd.concat(flare_df_lists)

full_flare_list.drop_duplicates(inplace=True)
full_flare_list.sort_values(by="peak_UTC", inplace=True)
full_flare_list.reset_index(inplace=True, drop=True)
logger.info(f'write total {len(full_flare_list)} flares to local storage')
full_flare_list.to_csv(file, index_label=False)
else:
logger.info(f"read flare list from {file}")
full_flare_list = pd.read_csv(file)

return full_flare_list


if 'pytest' in sys.modules:
# only set the global in test scenario
from stixcore.data.test import test_data
SDCFlareListManager.instance = SDCFlareListManager(test_data.rid_lut.RID_LUT, update=False)
Loading

0 comments on commit 780760e

Please sign in to comment.