diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 00000000..90f3472c --- /dev/null +++ b/mypy.ini @@ -0,0 +1,32 @@ +[mypy] +warn_unused_configs = True +warn_redundant_casts = True + +[mypy-astropy.*] +ignore_missing_imports = True + +[mypy-lsst.*] +ignore_missing_imports = True +ignore_errors = True + +[mypy-lsst.afw.fits] +ignore_missing_imports = True +ignore_errors = True + +[mypy-lsst.daf.base] +ignore_missing_imports = True +ignore_errors = True + +[mypy-astro_metadata_translator.*] +ignore_missing_imports = False +ignore_errors = False +disallow_untyped_defs = True +disallow_incomplete_defs = True +strict_equality = True +warn_unreachable = True +warn_unused_ignores = True + +# version.py is added by scons and may not exist when we run mypy. + +[mypy-astro_metadata_translator.version] +ignore_missing_imports = True diff --git a/python/astro_metadata_translator/bin/translateheader.py b/python/astro_metadata_translator/bin/translateheader.py index 651121cf..d52f5259 100644 --- a/python/astro_metadata_translator/bin/translateheader.py +++ b/python/astro_metadata_translator/bin/translateheader.py @@ -14,6 +14,8 @@ Read file metadata from the specified files and report the translated content. """ +from __future__ import annotations + __all__ = ("main", "process_files") import argparse @@ -21,6 +23,7 @@ import logging import sys import traceback +from typing import IO, List, Sequence, Tuple import yaml @@ -58,7 +61,7 @@ ) -def build_argparser(): +def build_argparser() -> argparse.ArgumentParser: """Construct an argument parser for the ``translate_header.py`` script. Returns @@ -139,14 +142,14 @@ def build_argparser(): def read_file( - file, - hdrnum, - print_trace, - outstream=sys.stdout, - errstream=sys.stderr, - output_mode="verbose", - write_heading=False, -): + file: str, + hdrnum: int, + print_trace: bool, + outstream: IO = sys.stdout, + errstream: IO = sys.stderr, + output_mode: str = "verbose", + write_heading: bool = False, +) -> bool: """Read the specified file and process it. Parameters @@ -266,8 +269,14 @@ def read_file( def process_files( - files, regex, hdrnum, print_trace, outstream=sys.stdout, errstream=sys.stderr, output_mode="auto" -): + files: Sequence[str], + regex: str, + hdrnum: int, + print_trace: bool, + outstream: IO = sys.stdout, + errstream: IO = sys.stderr, + output_mode: str = "auto", +) -> Tuple[List[str], List[str]]: """Read and translate metadata from the specified files. Parameters @@ -323,7 +332,7 @@ def process_files( return okay, failed -def main(): +def main() -> int: """Read metadata from the supplied files and translate the content to standard form. diff --git a/python/astro_metadata_translator/bin/writeindex.py b/python/astro_metadata_translator/bin/writeindex.py index 6ab329ec..6bdadefb 100644 --- a/python/astro_metadata_translator/bin/writeindex.py +++ b/python/astro_metadata_translator/bin/writeindex.py @@ -9,12 +9,15 @@ # Use of this source code is governed by a 3-clause BSD-style # license that can be found in the LICENSE file. +from __future__ import annotations + __all__ = "write_index_files" import json import logging import os import sys +from typing import IO, List, MutableMapping, Optional, Sequence, Tuple from ..file_helpers import find_files from ..indexing import index_files @@ -23,15 +26,15 @@ def write_index_files( - files, - regex, - hdrnum, - print_trace, - content_mode="translated", - outpath=None, - outstream=sys.stdout, - errstream=sys.stderr, -): + files: Sequence[str], + regex: str, + hdrnum: int, + print_trace: bool, + content_mode: str = "translated", + outpath: Optional[str] = None, + outstream: IO = sys.stdout, + errstream: IO = sys.stderr, +) -> Tuple[List[str], List[str]]: """Process each file and create JSON index file. The index file will have common information in the toplevel. @@ -87,7 +90,7 @@ def write_index_files( failed = [] okay = [] - files_per_directory = {} + files_per_directory: MutableMapping[str, List[str]] = {} # Group each file by directory if no explicit output path if outpath is None: diff --git a/python/astro_metadata_translator/bin/writesidecar.py b/python/astro_metadata_translator/bin/writesidecar.py index afe0757f..54e4d004 100644 --- a/python/astro_metadata_translator/bin/writesidecar.py +++ b/python/astro_metadata_translator/bin/writesidecar.py @@ -9,16 +9,19 @@ # Use of this source code is governed by a 3-clause BSD-style # license that can be found in the LICENSE file. +from __future__ import annotations + __all__ = ("write_sidecar_files", "write_sidecar_file") import os import sys import traceback +from typing import IO, List, Sequence, Tuple from ..file_helpers import find_files, read_file_info -def _split_ext(file): +def _split_ext(file: str) -> Tuple[str, str]: """Split the extension from the file name and return it and the root. Special case handling of .gz and other compression extensions. @@ -34,7 +37,14 @@ def _split_ext(file): return root, ext -def write_sidecar_file(file, hdrnum, content_mode, print_trace, outstream=sys.stdout, errstream=sys.stderr): +def write_sidecar_file( + file: str, + hdrnum: int, + content_mode: str, + print_trace: bool, + outstream: IO = sys.stdout, + errstream: IO = sys.stderr, +) -> bool: """Write JSON summary to sidecar file. Parameters @@ -103,8 +113,14 @@ def write_sidecar_file(file, hdrnum, content_mode, print_trace, outstream=sys.st def write_sidecar_files( - files, regex, hdrnum, content_mode, print_trace, outstream=sys.stdout, errstream=sys.stderr -): + files: Sequence[str], + regex: str, + hdrnum: int, + content_mode: str, + print_trace: bool, + outstream: IO = sys.stdout, + errstream: IO = sys.stderr, +) -> Tuple[List[str], List[str]]: """Process each file and create sidecar file. Parameters diff --git a/python/astro_metadata_translator/cli/astrometadata.py b/python/astro_metadata_translator/cli/astrometadata.py index a6f6ad24..29ff2517 100644 --- a/python/astro_metadata_translator/cli/astrometadata.py +++ b/python/astro_metadata_translator/cli/astrometadata.py @@ -9,11 +9,14 @@ # Use of this source code is governed by a 3-clause BSD-style # license that can be found in the LICENSE file. +from __future__ import annotations + __all__ = ("main",) import importlib import logging import os +from typing import Sequence import click @@ -73,7 +76,7 @@ " python module names).", ) @click.pass_context -def main(ctx, log_level, traceback, packages): +def main(ctx: click.Context, log_level: int, traceback: bool, packages: Sequence[str]) -> None: ctx.ensure_object(dict) logging.basicConfig(level=log_level) @@ -81,13 +84,13 @@ def main(ctx, log_level, traceback, packages): # Traceback needs to be known to subcommands ctx.obj["TRACEBACK"] = traceback - packages = set(packages) + packages_set = set(packages) if PACKAGES_VAR in os.environ: new_packages = os.environ[PACKAGES_VAR].split(":") - packages.update(new_packages) + packages_set.update(new_packages) # Process import requests - for m in packages: + for m in packages_set: try: importlib.import_module(m) except (ImportError, ModuleNotFoundError): @@ -114,7 +117,9 @@ def main(ctx, log_level, traceback, packages): ) @regex_option @click.pass_context -def translate(ctx, files, quiet, hdrnum, mode, regex): +def translate( + ctx: click.Context, files: Sequence[str], quiet: bool, hdrnum: int, mode: str, regex: str +) -> None: # For quiet mode we want to translate everything but report nothing. if quiet: @@ -147,7 +152,7 @@ def translate(ctx, files, quiet, hdrnum, mode, regex): ) @regex_option @click.pass_context -def dump(ctx, files, hdrnum, mode, regex): +def dump(ctx: click.Context, files: Sequence[str], hdrnum: int, mode: str, regex: str) -> None: okay, failed = translate_header(files, regex, hdrnum, ctx.obj["TRACEBACK"], output_mode=mode) @@ -167,7 +172,7 @@ def dump(ctx, files, hdrnum, mode, regex): @regex_option @content_option @click.pass_context -def write_sidecar(ctx, files, hdrnum, regex, content): +def write_sidecar(ctx: click.Context, files: Sequence[str], hdrnum: int, regex: str, content: str) -> None: okay, failed = write_sidecar_files(files, regex, hdrnum, content, ctx.obj["TRACEBACK"]) if failed: @@ -194,7 +199,9 @@ def write_sidecar(ctx, files, hdrnum, regex, content): " Default is to write one index per directory where files are located.", ) @click.pass_context -def write_index(ctx, files, hdrnum, regex, content, outpath): +def write_index( + ctx: click.Context, files: Sequence[str], hdrnum: int, regex: str, content: str, outpath: str +) -> None: okay, failed = write_index_files( files, regex, hdrnum, ctx.obj["TRACEBACK"], content_mode=content, outpath=outpath ) diff --git a/python/astro_metadata_translator/file_helpers.py b/python/astro_metadata_translator/file_helpers.py index f15bdb65..1b19241b 100644 --- a/python/astro_metadata_translator/file_helpers.py +++ b/python/astro_metadata_translator/file_helpers.py @@ -11,6 +11,8 @@ """Support functions for script implementations.""" +from __future__ import annotations + __all__ = ("find_files", "read_basic_metadata_from_file", "read_file_info") import json @@ -18,6 +20,7 @@ import re import sys import traceback +from typing import IO, Any, Iterable, List, MutableMapping, Optional, Union from .headers import merge_headers from .observationInfo import ObservationInfo @@ -26,9 +29,11 @@ # Prefer afw over Astropy try: import lsst.daf.base # noqa: F401 need PropertyBase for readMetadata - from lsst.afw.fits import readMetadata + from lsst.afw.fits import FitsError, readMetadata - def _read_fits_metadata(file, hdu, can_raise=False): + def _read_fits_metadata( + file: str, hdu: int, can_raise: bool = False + ) -> Optional[MutableMapping[str, Any]]: """Read a FITS header using afw. Parameters @@ -54,7 +59,7 @@ def _read_fits_metadata(file, hdu, can_raise=False): """ try: return readMetadata(file, hdu=hdu) - except lsst.afw.fits.FitsError as e: + except FitsError as e: if can_raise: # Try to convert a basic fits error code if "(104)" in str(e): @@ -65,7 +70,9 @@ def _read_fits_metadata(file, hdu, can_raise=False): except ImportError: from astropy.io import fits - def _read_fits_metadata(file, hdu, can_raise=False): + def _read_fits_metadata( + file: str, hdu: int, can_raise: bool = False + ) -> Optional[MutableMapping[str, Any]]: """Read a FITS header using astropy.""" # For detailed docstrings see the afw implementation above @@ -83,7 +90,7 @@ def _read_fits_metadata(file, hdu, can_raise=False): return header -def find_files(files, regex): +def find_files(files: Iterable[str], regex: str) -> List[str]: """Find files for processing. Parameters @@ -93,6 +100,11 @@ def find_files(files, regex): regex : `str` Regular expression string used to filter files when a directory is scanned. + + Returns + ------- + found_files : `list` of `str` + The files that were found. """ file_regex = re.compile(regex) found_files = [] @@ -111,7 +123,9 @@ def find_files(files, regex): return found_files -def read_basic_metadata_from_file(file, hdrnum, errstream=sys.stderr, can_raise=True): +def read_basic_metadata_from_file( + file: str, hdrnum: int, errstream: IO = sys.stderr, can_raise: bool = True +) -> Optional[MutableMapping[str, Any]]: """Read a raw header from a file, merging if necessary Parameters @@ -174,14 +188,14 @@ def read_basic_metadata_from_file(file, hdrnum, errstream=sys.stderr, can_raise= def read_file_info( - file, - hdrnum, - print_trace=None, - content_mode="translated", - content_type="simple", - outstream=sys.stdout, - errstream=sys.stderr, -): + file: str, + hdrnum: int, + print_trace: Optional[bool] = None, + content_mode: str = "translated", + content_type: str = "simple", + outstream: IO = sys.stdout, + errstream: IO = sys.stderr, +) -> Optional[Union[str, MutableMapping[str, Any], ObservationInfo]]: """Read information from file Parameters diff --git a/python/astro_metadata_translator/headers.py b/python/astro_metadata_translator/headers.py index 74589929..35d49e5f 100644 --- a/python/astro_metadata_translator/headers.py +++ b/python/astro_metadata_translator/headers.py @@ -11,6 +11,8 @@ """Code to support header manipulation operations.""" +from __future__ import annotations + __all__ = ("merge_headers", "fix_header") import copy @@ -21,6 +23,7 @@ import posixpath from collections import Counter from collections.abc import Mapping +from typing import IO, Any, List, MutableMapping, Optional, Sequence, Tuple, Type, Union import pkg_resources import yaml @@ -40,7 +43,13 @@ """Keyword to add to header when header has been fixed.""" -def merge_headers(headers, mode="overwrite", sort=False, first=None, last=None): +def merge_headers( + headers: Sequence[MutableMapping[str, Any]], + mode: str = "overwrite", + sort: bool = False, + first: Optional[Sequence[str]] = None, + last: Optional[Sequence[str]] = None, +) -> MutableMapping[str, Any]: """Merge multiple headers into a single dict. Given a list of dict-like data headers, combine them following the @@ -118,7 +127,7 @@ def merge_headers(headers, mode="overwrite", sort=False, first=None, last=None): if sort: - def key_func(hdr): + def key_func(hdr: Mapping[str, Any]) -> Any: translator_class = None try: translator_class = MetadataTranslator.determine_translator(hdr) @@ -234,7 +243,11 @@ def key_func(hdr): # if mode != "append": - def retain_value(to_receive, to_retain, sources): + def retain_value( + to_receive: MutableMapping[str, Any], + to_retain: Optional[Sequence[str]], + sources: Tuple[Mapping[str, Any], ...], + ) -> None: if to_retain: for k in to_retain: # Look for values until we find one @@ -250,7 +263,7 @@ def retain_value(to_receive, to_retain, sources): return merged -def _read_yaml(fh, msg): +def _read_yaml(fh: IO[bytes], msg: str) -> Optional[Mapping[str, Any]]: """Read YAML from file descriptor. Parameters @@ -281,7 +294,9 @@ def _read_yaml(fh, msg): return content -def _find_from_file(header, paths, target_file): +def _find_from_file( + header: MutableMapping[str, Any], paths: Sequence[str], target_file: str +) -> Optional[str]: """Search file system for matching correction files. Parameters @@ -302,7 +317,7 @@ def _find_from_file(header, paths, target_file): for p in paths: correction_file = os.path.join(p, target_file) if os.path.exists(correction_file): - with open(correction_file) as fh: + with open(correction_file, "rb") as fh: log.debug("Applying header corrections from file %s", correction_file) corrections = _read_yaml(fh, f"file {correction_file}") @@ -316,7 +331,9 @@ def _find_from_file(header, paths, target_file): return None -def _find_from_resource(header, package, resource_root, target_file): +def _find_from_resource( + header: MutableMapping[str, Any], package: Optional[str], resource_root: Optional[str], target_file: str +) -> Optional[str]: """Search package resource for correction information. Parameters @@ -351,7 +368,12 @@ def _find_from_resource(header, package, resource_root, target_file): return None -def fix_header(header, search_path=None, translator_class=None, filename=None): +def fix_header( + header: MutableMapping[str, Any], + search_path: Optional[Union[str, Sequence[str]]] = None, + translator_class: Optional[Type[MetadataTranslator]] = None, + filename: Optional[str] = None, +) -> bool: """Update, in place, the supplied header with known corrections. Parameters @@ -446,7 +468,7 @@ class or else support automatic translation class determination. log.debug("Checking for header correction file named %s", target_file) # Work out the search path - paths = [] + paths: List[str] = [] if search_path is not None: if isinstance(search_path, str): # Allow a single path to be given as a string diff --git a/python/astro_metadata_translator/indexing.py b/python/astro_metadata_translator/indexing.py index a976554a..18b4268b 100644 --- a/python/astro_metadata_translator/indexing.py +++ b/python/astro_metadata_translator/indexing.py @@ -9,6 +9,8 @@ # Use of this source code is governed by a 3-clause BSD-style # license that can be found in the LICENSE file. +from __future__ import annotations + __all__ = ("read_index", "calculate_index", "index_files", "process_index_data") """Functions to support file indexing.""" @@ -19,6 +21,7 @@ import os import sys from copy import deepcopy +from typing import IO, Any, List, MutableMapping, Optional, Sequence, Tuple, Union from .file_helpers import read_file_info from .headers import merge_headers @@ -31,7 +34,15 @@ CONTENT_KEY = "__CONTENT__" -def index_files(files, root, hdrnum, print_trace, content, outstream=sys.stdout, errstream=sys.stderr): +def index_files( + files: Sequence[str], + root: Optional[str], + hdrnum: int, + print_trace: bool, + content: str, + outstream: IO = sys.stdout, + errstream: IO = sys.stderr, +) -> Tuple[MutableMapping[str, Union[str, MutableMapping[str, Any]]], List[str], List[str]]: """Create an index from the supplied files. No file is written. The Python structure returned is suitable @@ -81,10 +92,10 @@ def index_files(files, root, hdrnum, print_trace, content, outstream=sys.stdout, if content not in ("translated", "metadata"): raise ValueError("Unrecognized mode {mode}") - failed = [] - okay = [] + failed: List[str] = [] + okay: List[str] = [] - content_by_file = {} # Mapping of path to file content + content_by_file: MutableMapping[str, MutableMapping[str, Any]] = {} # Mapping of path to file content for file in sorted(files): if root is not None: path = os.path.join(root, file) @@ -98,6 +109,11 @@ def index_files(files, root, hdrnum, print_trace, content, outstream=sys.stdout, okay.append(path) # Store the information indexed by the filename within dir + # We may get a PropertyList here and can therefore not just + # assert Mapping for mypy. We therefore assert that it's not the + # other 2 options, which we were enforcing with the "simple" parameter + # in the call to read_file_info. + assert not isinstance(simple, (str, ObservationInfo)) content_by_file[file] = simple output = calculate_index(content_by_file, content) @@ -105,7 +121,9 @@ def index_files(files, root, hdrnum, print_trace, content, outstream=sys.stdout, return output, okay, failed -def calculate_index(headers, content_mode): +def calculate_index( + headers: MutableMapping[str, MutableMapping[str, Any]], content_mode: str +) -> MutableMapping[str, Union[str, MutableMapping[str, Any]]]: """Calculate an index data structure from the supplied headers. Parameters @@ -126,13 +144,14 @@ def calculate_index(headers, content_mode): raise ValueError(f"Unrecognized mode for index creation: {content_mode}") # Merge all the information into a primary plus diff - merged = merge_headers(headers.values(), mode="diff") + merged = merge_headers([hdr for hdr in headers.values()], mode="diff") # For a single file it is possible that the merged contents # are not a dict but are an LSST-style PropertyList. JSON needs - # dict though. + # dict though. mypy can't know about PropertyList so we must ignore + # the type error. if not isinstance(merged, collections.abc.Mapping): - merged = dict(merged) + merged = dict(merged) # type: ignore # The structure to write to file is intended to look like (in YAML): # __COMMON__: @@ -149,14 +168,19 @@ def calculate_index(headers, content_mode): # Put the common headers first in the output. # Store the mode so that we can work out how to read the file in - output = {CONTENT_KEY: content_mode, COMMON_KEY: merged} + output: MutableMapping[str, Union[str, MutableMapping[str, Any]]] = { + CONTENT_KEY: content_mode, + COMMON_KEY: merged, + } for file, diff in zip(headers, diff_dict): output[file] = diff return output -def read_index(path, force_dict=False): +def read_index( + path: str, force_dict: bool = False +) -> Union[ObservationGroup, MutableMapping[str, Union[str, MutableMapping[str, Any], ObservationInfo]]]: """Read an index file. Parameters @@ -181,7 +205,9 @@ def read_index(path, force_dict=False): return process_index_data(content, force_dict=force_dict) -def process_index_data(content, force_metadata=False, force_dict=False): +def process_index_data( + content: MutableMapping[str, Any], force_metadata: bool = False, force_dict: bool = False +) -> Union[ObservationGroup, MutableMapping[str, Union[str, MutableMapping[str, Any], ObservationInfo]]]: """Process the content read from a JSON index file. Parameters @@ -223,7 +249,7 @@ def process_index_data(content, force_metadata=False, force_dict=False): content_mode = unpacked.pop(CONTENT_KEY, None) if force_metadata: content_mode = "metadata" - elif content is None: + elif content_mode is None: log.warning("No '%s' key in data structure, assuming 'metadata'", CONTENT_KEY) content_mode = "metadata" @@ -237,8 +263,10 @@ def process_index_data(content, force_metadata=False, force_dict=False): # nothing more to be done return unpacked - obs_infos = [] - by_file = {} + obs_infos: List[ObservationInfo] = [] + # This type annotation is really MutableMapping[str, ObservationInfo] + # but mypy needs it to look like the function return value. + by_file: MutableMapping[str, Union[str, MutableMapping[str, Any], ObservationInfo]] = {} for file, hdr in unpacked.items(): info = ObservationInfo.from_simple(hdr) info.filename = file @@ -250,7 +278,7 @@ def process_index_data(content, force_metadata=False, force_dict=False): return ObservationGroup(obs_infos) -def read_sidecar(path): +def read_sidecar(path: str) -> Union[ObservationInfo, MutableMapping[str, MutableMapping[str, Any]]]: """Read a metadata sidecar file. Parameters @@ -273,7 +301,9 @@ def read_sidecar(path): return process_sidecar_data(content) -def process_sidecar_data(content, force_metadata=False): +def process_sidecar_data( + content: MutableMapping[str, Any], force_metadata: bool = False +) -> Union[ObservationInfo, MutableMapping[str, MutableMapping[str, Any]]]: """Process the content read from a JSON sidecar file. Parameters @@ -304,11 +334,11 @@ def process_sidecar_data(content, force_metadata=False): content_mode = content.pop(CONTENT_KEY, None) if force_metadata: content_mode = "metadata" - elif content is None: + elif content_mode is None: # All ObservationInfo objects will have observation_id and instrument # so if they are there we can guess guessing = True - if "observation_id" in content and "instrument" in content_mode: + if "observation_id" in content and "instrument" in content: content_mode = "translated" else: content_mode = "metadata" diff --git a/python/astro_metadata_translator/observationGroup.py b/python/astro_metadata_translator/observationGroup.py index a5ce74f1..7d263a3a 100644 --- a/python/astro_metadata_translator/observationGroup.py +++ b/python/astro_metadata_translator/observationGroup.py @@ -9,15 +9,35 @@ # Use of this source code is governed by a 3-clause BSD-style # license that can be found in the LICENSE file. +from __future__ import annotations + """Represent a collection of translated headers""" __all__ = ("ObservationGroup",) import logging from collections.abc import MutableSequence +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Dict, + Iterable, + Iterator, + List, + MutableMapping, + Optional, + Set, + Tuple, + Type, + Union, +) from .observationInfo import ObservationInfo +if TYPE_CHECKING: + from .translator import MetadataTranslator + log = logging.getLogger(__name__) @@ -41,31 +61,41 @@ class ObservationGroup(MutableSequence): `ObservationInfo` constructor default should be used. """ - def __init__(self, members, translator_class=None, pedantic=None): + def __init__( + self, + members: Iterable[Union[ObservationInfo, MutableMapping[str, Any]]], + translator_class: Optional[Type[MetadataTranslator]] = None, + pedantic: Optional[bool] = None, + ) -> None: self._members = [ self._coerce_value(m, translator_class=translator_class, pedantic=pedantic) for m in members ] # Cache of members in time order - self._sorted = None + self._sorted: Optional[List[ObservationInfo]] = None - def __len__(self): + def __len__(self) -> int: return len(self._members) - def __delitem__(self, index): + def __delitem__(self, index: int) -> None: # type: ignore del self._members[index] self._sorted = None - def __getitem__(self, index): + def __getitem__(self, index: int) -> ObservationInfo: # type: ignore return self._members[index] - def __str__(self): + def __str__(self) -> str: results = [] for obs_info in self._members: results.append(f"({obs_info.instrument}, {obs_info.datetime_begin})") return "[" + ", ".join(results) + "]" - def _coerce_value(self, value, translator_class=None, pedantic=None): + def _coerce_value( + self, + value: Union[ObservationInfo, MutableMapping[str, Any]], + translator_class: Optional[Type[MetadataTranslator]] = None, + pedantic: Optional[bool] = None, + ) -> ObservationInfo: """Given a value, ensure it is an `ObservationInfo`. Parameters @@ -94,7 +124,7 @@ def _coerce_value(self, value, translator_class=None, pedantic=None): if not isinstance(value, ObservationInfo): try: - kwargs = {"translator_class": translator_class} + kwargs: Dict[str, Any] = {"translator_class": translator_class} if pedantic is not None: kwargs["pedantic"] = pedantic value = ObservationInfo(value, **kwargs) @@ -103,10 +133,10 @@ def _coerce_value(self, value, translator_class=None, pedantic=None): return value - def __iter__(self): + def __iter__(self) -> Iterator[ObservationInfo]: return iter(self._members) - def __eq__(self, other): + def __eq__(self, other: Any) -> bool: """Compares equal if all the members are equal in the same order.""" if not isinstance(other, ObservationGroup): return NotImplemented @@ -116,7 +146,9 @@ def __eq__(self, other): return False return True - def __setitem__(self, index, value): + def __setitem__( # type: ignore + self, index: int, value: Union[ObservationInfo, MutableMapping[str, Any]] + ) -> None: """Store item in group. Item must be an `ObservationInfo` or something that can be passed @@ -126,21 +158,21 @@ def __setitem__(self, index, value): self._members[index] = value self._sorted = None - def insert(self, index, value): + def insert(self, index: int, value: Union[ObservationInfo, MutableMapping[str, Any]]) -> None: value = self._coerce_value(value) self._members.insert(index, value) self._sorted = None - def reverse(self): + def reverse(self) -> None: self._members.reverse() - def sort(self, key=None, reverse=False): + def sort(self, key: Optional[Callable] = None, reverse: bool = False) -> None: self._members.sort(key=key, reverse=reverse) if key is None and not reverse and self._sorted is None: # Store sorted order in cache self._sorted = self._members.copy() - def extremes(self): + def extremes(self) -> Tuple[ObservationInfo, ObservationInfo]: """Return the oldest observation in the group and the newest. If there is only one member of the group, the newest and oldest @@ -157,7 +189,7 @@ def extremes(self): self._sorted = sorted(self._members) return self._sorted[0], self._sorted[-1] - def newest(self): + def newest(self) -> ObservationInfo: """Return the newest observation in the group. Returns @@ -167,7 +199,7 @@ def newest(self): """ return self.extremes()[1] - def oldest(self): + def oldest(self) -> ObservationInfo: """Return the oldest observation in the group. Returns @@ -177,7 +209,7 @@ def oldest(self): """ return self.extremes()[0] - def property_values(self, property): + def property_values(self, property: str) -> Set[Any]: """Return a set of values associated with the specified property. Parameters @@ -192,7 +224,7 @@ def property_values(self, property): """ return {getattr(obs_info, property) for obs_info in self} - def to_simple(self): + def to_simple(self) -> List[MutableMapping[str, Any]]: """Convert the group to simplified form. Returns @@ -204,7 +236,7 @@ def to_simple(self): return [obsinfo.to_simple() for obsinfo in self] @classmethod - def from_simple(cls, simple): + def from_simple(cls, simple: List[Dict[str, Any]]) -> ObservationGroup: """Convert simplified form back to `ObservationGroup` Parameters diff --git a/python/astro_metadata_translator/observationInfo.py b/python/astro_metadata_translator/observationInfo.py index b770bd5b..653468f1 100644 --- a/python/astro_metadata_translator/observationInfo.py +++ b/python/astro_metadata_translator/observationInfo.py @@ -11,6 +11,8 @@ """Represent standard metadata from instrument headers""" +from __future__ import annotations + __all__ = ("ObservationInfo", "makeObservationInfo") import copy @@ -18,12 +20,13 @@ import json import logging import math +from typing import Any, Callable, Dict, FrozenSet, MutableMapping, Optional, Sequence, Set, Tuple, Type import astropy.time from astropy.coordinates import AltAz, SkyCoord from .headers import fix_header -from .properties import PROPERTIES +from .properties import PROPERTIES, PropertyDefinition from .translator import MetadataTranslator log = logging.getLogger(__name__) @@ -96,17 +99,17 @@ class ObservationInfo: def __init__( self, - header, - filename=None, - translator_class=None, - pedantic=False, - search_path=None, - required=None, - subset=None, - ): + header: Optional[MutableMapping[str, Any]], + filename: Optional[str] = None, + translator_class: Optional[Type[MetadataTranslator]] = None, + pedantic: bool = False, + search_path: Optional[Sequence[str]] = None, + required: Optional[Set[str]] = None, + subset: Optional[Set[str]] = None, + ) -> None: # Initialize the empty object - self._header = {} + self._header: MutableMapping[str, Any] = {} self.filename = filename self._translator = None self.translator_class_name = "" @@ -203,7 +206,9 @@ def __init__( super().__setattr__(property, value) # allows setting even write-protected extensions @staticmethod - def _get_all_properties(extensions=None): + def _get_all_properties( + extensions: Optional[Dict[str, PropertyDefinition]] = None + ) -> Dict[str, PropertyDefinition]: """Return the definitions of all properties Parameters @@ -223,7 +228,7 @@ def _get_all_properties(extensions=None): properties.update({"ext_" + pp: dd for pp, dd in extensions.items()}) return properties - def _declare_extensions(self, extensions): + def _declare_extensions(self, extensions: Optional[Dict[str, PropertyDefinition]]) -> None: """Declare and set up extension properties This should always be called internally as part of the creation of a @@ -256,7 +261,7 @@ class (and python ``property`` doesn't work on instances; only on self.extensions = extensions self.all_properties = self._get_all_properties(extensions) - def __setattr__(self, name, value): + def __setattr__(self, name: str, value: Any) -> Any: """Set attribute This provides read-only protection for the extension properties. The @@ -268,7 +273,7 @@ def __setattr__(self, name, value): return super().__setattr__(name, value) @classmethod - def _is_property_ok(cls, definition, value): + def _is_property_ok(cls, definition: PropertyDefinition, value: Any) -> bool: """Compare the supplied value against the expected type as defined for the corresponding property. @@ -305,7 +310,7 @@ def _is_property_ok(cls, definition, value): return True @property - def cards_used(self): + def cards_used(self) -> FrozenSet[str]: """Header cards used for the translation. Returns @@ -317,7 +322,7 @@ def cards_used(self): return frozenset() return self._translator.cards_used() - def stripped_header(self): + def stripped_header(self) -> MutableMapping[str, Any]: """Return a copy of the supplied header with used keywords removed. Returns @@ -332,7 +337,7 @@ def stripped_header(self): del hdr[c] return hdr - def __str__(self): + def __str__(self) -> str: # Put more interesting answers at front of list # and then do remainder priority = ("instrument", "telescope", "datetime_begin") @@ -348,7 +353,7 @@ def __str__(self): return result - def __eq__(self, other): + def __eq__(self, other: Any) -> bool: """Compares equal if standard properties are equal""" if not isinstance(other, ObservationInfo): return NotImplemented @@ -372,13 +377,17 @@ def __eq__(self, other): return False return True - def __lt__(self, other): + def __lt__(self, other: Any) -> bool: + if not isinstance(other, ObservationInfo): + return NotImplemented return self.datetime_begin < other.datetime_begin - def __gt__(self, other): + def __gt__(self, other: Any) -> bool: + if not isinstance(other, ObservationInfo): + return NotImplemented return self.datetime_begin > other.datetime_begin - def __getstate__(self): + def __getstate__(self) -> Tuple[Any, ...]: """Get pickleable state Returns the properties. Deliberately does not preserve the full @@ -396,7 +405,7 @@ def __getstate__(self): return state, self.extensions - def __setstate__(self, state): + def __setstate__(self, state: Tuple[Any, ...]) -> None: """Set object state from pickle Parameters @@ -412,12 +421,13 @@ def __setstate__(self, state): self._declare_extensions(extensions) for p in self.all_properties: if p.startswith("ext_"): - super().__setattr__(p, state[p]) # allows setting even write-protected extensions + # allows setting even write-protected extensions + super().__setattr__(p, state[p]) # type: ignore else: property = f"_{p}" - setattr(self, property, state[p]) + setattr(self, property, state[p]) # type: ignore - def to_simple(self): + def to_simple(self) -> MutableMapping[str, Any]: """Convert the contents of this object to simple dict form. The keys of the dict are the standard properties but the values @@ -462,7 +472,7 @@ def to_simple(self): return simple - def to_json(self): + def to_json(self) -> str: """Serialize the object to JSON string. Returns @@ -480,7 +490,7 @@ def to_json(self): return json.dumps(self.to_simple()) @classmethod - def from_simple(cls, simple): + def from_simple(cls, simple: MutableMapping[str, Any]) -> ObservationInfo: """Convert the entity returned by `to_simple` back into an `ObservationInfo`. @@ -510,7 +520,7 @@ def from_simple(cls, simple): properties = cls._get_all_properties(extensions) - processed = {} + processed: Dict[str, Any] = {} for k, v in simple.items(): if v is None: @@ -527,7 +537,7 @@ def from_simple(cls, simple): return cls.makeObservationInfo(extensions=extensions, **processed) @classmethod - def from_json(cls, json_str): + def from_json(cls, json_str: str) -> ObservationInfo: """Create `ObservationInfo` from JSON string. Parameters @@ -551,7 +561,9 @@ def from_json(cls, json_str): return cls.from_simple(simple) @classmethod - def makeObservationInfo(cls, *, extensions=None, **kwargs): # noqa: N802 + def makeObservationInfo( # noqa: N802 + cls, *, extensions: Optional[Dict[str, PropertyDefinition]] = None, **kwargs: Any + ) -> ObservationInfo: """Construct an `ObservationInfo` from the supplied parameters. Parameters @@ -604,7 +616,7 @@ def makeObservationInfo(cls, *, extensions=None, **kwargs): # noqa: N802 # Method to add the standard properties -def _make_property(property, doc, return_typedoc, return_type): +def _make_property(property: str, doc: str, return_typedoc: str, return_type: Type) -> Callable: """Create a getter method with associated docstring. Parameters @@ -624,7 +636,7 @@ def _make_property(property, doc, return_typedoc, return_type): Getter method for this property. """ - def getter(self): + def getter(self: ObservationInfo) -> Any: return getattr(self, f"_{property}") getter.__doc__ = f"""{doc} @@ -649,7 +661,9 @@ def getter(self): ) -def makeObservationInfo(*, extensions=None, **kwargs): # noqa: N802 +def makeObservationInfo( # noqa: N802 + *, extensions: Optional[Dict[str, PropertyDefinition]] = None, **kwargs: Any +) -> ObservationInfo: """Construct an `ObservationInfo` from the supplied parameters. Parameters diff --git a/python/astro_metadata_translator/properties.py b/python/astro_metadata_translator/properties.py index 7cab572d..db9d4e21 100644 --- a/python/astro_metadata_translator/properties.py +++ b/python/astro_metadata_translator/properties.py @@ -18,6 +18,7 @@ define the getter methods. """ +from __future__ import annotations __all__ = ( "PropertyDefinition", @@ -25,7 +26,7 @@ ) from dataclasses import dataclass -from typing import Any, Callable, Optional +from typing import Any, Callable, Optional, Tuple import astropy.coordinates import astropy.time @@ -38,7 +39,7 @@ # All assume the supplied parameter is not None. -def earthlocation_to_simple(location): +def earthlocation_to_simple(location: astropy.coordinates.EarthLocation) -> Tuple[float, ...]: """Convert EarthLocation to tuple. Parameters @@ -55,12 +56,12 @@ def earthlocation_to_simple(location): return tuple(c.to_value(astropy.units.m) for c in geocentric) -def simple_to_earthlocation(simple, **kwargs): +def simple_to_earthlocation(simple: Tuple[float, ...], **kwargs: Any) -> astropy.coordinates.EarthLocation: """Convert simple form back to EarthLocation.""" return astropy.coordinates.EarthLocation.from_geocentric(*simple, unit=astropy.units.m) -def datetime_to_simple(datetime): +def datetime_to_simple(datetime: astropy.time.Time) -> Tuple[float, float]: """Convert Time to tuple. Parameters @@ -77,63 +78,63 @@ def datetime_to_simple(datetime): return (tai.jd1, tai.jd2) -def simple_to_datetime(simple, **kwargs): +def simple_to_datetime(simple: Tuple[float, float], **kwargs: Any) -> astropy.time.Time: """Convert simple form back to astropy.time.Time""" return astropy.time.Time(*simple, format="jd", scale="tai") -def exptime_to_simple(exptime): +def exptime_to_simple(exptime: astropy.units.Quantity) -> float: """Convert exposure time Quantity to seconds.""" return exptime.to_value(astropy.units.s) -def simple_to_exptime(simple, **kwargs): +def simple_to_exptime(simple: float, **kwargs: Any) -> astropy.units.Quantity: """Convert simple form back to Quantity.""" return simple * astropy.units.s -def angle_to_simple(angle): +def angle_to_simple(angle: astropy.coordinates.Angle) -> float: """Convert Angle to degrees.""" return angle.to_value(astropy.units.deg) -def simple_to_angle(simple, **kwargs): +def simple_to_angle(simple: float, **kwargs: Any) -> astropy.coordinates.Angle: """Convert degrees to Angle.""" return astropy.coordinates.Angle(simple * astropy.units.deg) -def temperature_to_simple(temp): +def temperature_to_simple(temp: astropy.units.Quantity) -> float: """Convert temperature to kelvin.""" return temp.to(astropy.units.K, equivalencies=astropy.units.temperature()).to_value() -def simple_to_temperature(simple, **kwargs): +def simple_to_temperature(simple: float, **kwargs: Any) -> astropy.units.Quantity: """Convert scalar kelvin value back to quantity.""" return simple * astropy.units.K -def pressure_to_simple(press): +def pressure_to_simple(press: astropy.units.Quantity) -> float: """Convert pressure Quantity to hPa.""" return press.to_value(astropy.units.hPa) -def simple_to_pressure(simple, **kwargs): +def simple_to_pressure(simple: float, **kwargs: Any) -> astropy.units.Quantity: """Convert the pressure scalar back to Quantity.""" return simple * astropy.units.hPa -def skycoord_to_simple(skycoord): +def skycoord_to_simple(skycoord: astropy.coordinates.SkyCoord) -> Tuple[float, float]: """Convert SkyCoord to ICRS RA/Dec tuple""" icrs = skycoord.icrs return (icrs.ra.to_value(astropy.units.deg), icrs.dec.to_value(astropy.units.deg)) -def simple_to_skycoord(simple, **kwargs): +def simple_to_skycoord(simple: Tuple[float, float], **kwargs: Any) -> astropy.coordinates.SkyCoord: """Convert ICRS tuple to SkyCoord.""" return astropy.coordinates.SkyCoord(*simple, unit=astropy.units.deg) -def altaz_to_simple(altaz): +def altaz_to_simple(altaz: astropy.coordinates.AltAz) -> Tuple[float, float]: """Convert AltAz to Alt/Az tuple. Do not include obstime or location in simplification. It is assumed @@ -142,7 +143,7 @@ def altaz_to_simple(altaz): return (altaz.az.to_value(astropy.units.deg), altaz.alt.to_value(astropy.units.deg)) -def simple_to_altaz(simple, **kwargs): +def simple_to_altaz(simple: Tuple[float, float], **kwargs: Any) -> astropy.coordinates.AltAz: """Convert simple altaz tuple to AltAz. Will look for location and datetime_begin in kwargs. diff --git a/python/astro_metadata_translator/serialize/fits.py b/python/astro_metadata_translator/serialize/fits.py index 15c198f3..09ff7b87 100644 --- a/python/astro_metadata_translator/serialize/fits.py +++ b/python/astro_metadata_translator/serialize/fits.py @@ -10,11 +10,20 @@ # license that can be found in the LICENSE file. """Transform ObservationInfo into "standard" FITS headers.""" +from __future__ import annotations __all__ = ("info_to_fits", "dates_to_fits", "group_to_fits") +from typing import TYPE_CHECKING, Any, Dict, Tuple -def dates_to_fits(date_begin, date_end): +if TYPE_CHECKING: + import astropy.time + + from ..observationGroup import ObservationGroup + from ..observationInfo import ObservationInfo + + +def dates_to_fits(date_begin: astropy.time.Time, date_end: astropy.time.Time) -> Dict[str, Any]: """Convert two dates into FITS form. Parameters @@ -30,7 +39,7 @@ def dates_to_fits(date_begin, date_end): Header card keys and values following the FITS standard. If neither date is defined this may be empty. """ - cards = {} + cards: Dict[str, Any] = {} if date_begin is None and date_end is None: # no date headers can be written return cards @@ -50,7 +59,7 @@ def dates_to_fits(date_begin, date_end): return cards -def info_to_fits(obs_info): +def info_to_fits(obs_info: ObservationInfo) -> Tuple[Dict[str, Any], Dict[str, str]]: """Convert an `ObservationInfo` to something suitable for writing to a FITS file. @@ -80,7 +89,7 @@ def info_to_fits(obs_info): return cards, comments -def group_to_fits(obs_group): +def group_to_fits(obs_group: ObservationGroup) -> Tuple[Dict[str, Any], Dict[str, str]]: """Convert an `ObservationGroup` to something suitable for writing to a FITS file. diff --git a/python/astro_metadata_translator/tests.py b/python/astro_metadata_translator/tests.py index 7490193b..8084d9fb 100644 --- a/python/astro_metadata_translator/tests.py +++ b/python/astro_metadata_translator/tests.py @@ -9,12 +9,14 @@ # Use of this source code is governed by a 3-clause BSD-style # license that can be found in the LICENSE file. +from __future__ import annotations + __all__ = ("read_test_file", "MetadataAssertHelper") import os import pickle import warnings -from collections import OrderedDict +from typing import Any, Dict, MutableMapping, Optional, Type import astropy.units as u import astropy.utils.exceptions @@ -33,16 +35,16 @@ # For YAML >= 5.1 need a different Loader for the constructor try: - Loader = yaml.FullLoader + Loader: Optional[Type] = yaml.FullLoader except AttributeError: Loader = yaml.Loader # Define a YAML loader for lsst.daf.base.PropertySet serializations that # we can use if daf_base is not available. -def pl_constructor(loader, node): +def pl_constructor(loader: yaml.Loader, node: yaml.Node) -> Any: """Construct an OrderedDict from a YAML file containing a PropertyList.""" - pl = OrderedDict() + pl: Dict[str, Any] = {} yield pl state = loader.construct_sequence(node, deep=True) for key, dtype, value, comment in state: @@ -57,10 +59,10 @@ def pl_constructor(loader, node): if daf_base is None: - yaml.add_constructor("lsst.daf.base.PropertyList", pl_constructor, Loader=Loader) + yaml.add_constructor("lsst.daf.base.PropertyList", pl_constructor, Loader=Loader) # type: ignore -def read_test_file(filename, dir=None): +def read_test_file(filename: str, dir: Optional[str] = None) -> MutableMapping[str, Any]: """Read the named test file relative to the location of this helper Parameters @@ -91,7 +93,9 @@ class MetadataAssertHelper: translations. """ - def assertCoordinatesConsistent(self, obsinfo, max_sep=1.0, amdelta=0.01): # noqa: N802 + def assertCoordinatesConsistent( # noqa: N802 + self, obsinfo: ObservationInfo, max_sep: float = 1.0, amdelta: float = 0.01 + ) -> None: """Check that SkyCoord, AltAz, and airmass are self consistent. Parameters @@ -127,8 +131,13 @@ def assertCoordinatesConsistent(self, obsinfo, max_sep=1.0, amdelta=0.01): # no self.assertLess(sep.to_value(unit="arcmin"), max_sep, msg="AltAz inconsistent with RA/Dec") def assertObservationInfoFromYaml( # noqa: N802 - self, file, dir=None, check_wcs=True, wcs_params=None, **kwargs - ): + self, + file: str, + dir: Optional[str] = None, + check_wcs: bool = True, + wcs_params: Optional[Dict[str, Any]] = None, + **kwargs: Any, + ) -> None: """Check contents of an ObservationInfo. Parameters @@ -170,8 +179,13 @@ def assertObservationInfoFromYaml( # noqa: N802 raise AssertionError(f"ObservationInfo derived from {type(hdr)} type is inconsistent.") from e def assertObservationInfo( # noqa: N802 - self, header, filename=None, check_wcs=True, wcs_params=None, **kwargs - ): + self, + header: MutableMapping[str, Any], + filename: Optional[str] = None, + check_wcs: bool = True, + wcs_params: Optional[Dict[str, Any]] = None, + **kwargs: Any, + ) -> None: """Check contents of an ObservationInfo. Parameters @@ -226,7 +240,7 @@ def assertObservationInfo( # noqa: N802 # to work around the fact that (as of astropy 3.1) adding 0.0 seconds # to a Time results in a new Time object that is a few picoseconds in # the past. - def _format_date_for_testing(date): + def _format_date_for_testing(date: Optional[Time]) -> Optional[Time]: if date is not None: date.format = "isot" date.precision = 9 diff --git a/python/astro_metadata_translator/translator.py b/python/astro_metadata_translator/translator.py index 1af70440..6d7d995c 100644 --- a/python/astro_metadata_translator/translator.py +++ b/python/astro_metadata_translator/translator.py @@ -11,6 +11,8 @@ """Classes and support code for metadata translation""" +from __future__ import annotations + __all__ = ("MetadataTranslator", "StubTranslator", "cache_translation") import importlib @@ -19,12 +21,29 @@ import math import warnings from abc import abstractmethod +from typing import ( + Any, + Callable, + Dict, + FrozenSet, + Iterable, + Iterator, + List, + Mapping, + MutableMapping, + Optional, + Sequence, + Set, + Tuple, + Type, + Union, +) import astropy.io.fits.card import astropy.units as u from astropy.coordinates import Angle -from .properties import PROPERTIES +from .properties import PROPERTIES, PropertyDefinition log = logging.getLogger(__name__) @@ -32,10 +51,10 @@ CORRECTIONS_RESOURCE_ROOT = "corrections" """Cache of version strings indexed by class.""" -_VERSION_CACHE = dict() +_VERSION_CACHE: Dict[Type, str] = dict() -def cache_translation(func, method=None): +def cache_translation(func: Callable, method: Optional[str] = None) -> Callable: """Decorator to cache the result of a translation method. Especially useful when a translation uses many other translation @@ -57,7 +76,7 @@ def cache_translation(func, method=None): """ name = func.__name__ if method is None else method - def func_wrapper(self): + def func_wrapper(self: MetadataTranslator) -> Any: if name not in self._translation_cache: self._translation_cache[name] = func(self) return self._translation_cache[name] @@ -82,30 +101,36 @@ class MetadataTranslator: """ # These are all deliberately empty in the base class. - default_search_path = None + name: Optional[str] = None + """The declared name of the translator.""" + + default_search_path: Optional[Sequence[str]] = None """Default search path to use to locate header correction files.""" default_resource_package = __name__.split(".")[0] """Module name to use to locate the correction resources.""" - default_resource_root = None + default_resource_root: Optional[str] = None """Default package resource path root to use to locate header correction files within the ``default_resource_package`` package.""" - _trivial_map = {} + _trivial_map: Dict[str, Union[str, List[str], Tuple[Any, ...]]] = {} """Dict of one-to-one mappings for header translation from standard property to corresponding keyword.""" - _const_map = {} + _const_map: Dict[str, Any] = {} """Dict defining a constant for specified standard properties.""" - translators = dict() + translators: Dict[str, Type] = dict() """All registered metadata translation classes.""" - supported_instrument = None + supported_instrument: Optional[str] = None """Name of instrument understood by this translation class.""" - extensions = {} + all_properties: Dict[str, PropertyDefinition] = {} + """All the valid properties for this translator including extensions.""" + + extensions: Dict[str, PropertyDefinition] = {} """Extension properties (`str`: `PropertyDefinition`) Some instruments have important properties beyond the standard set; this is @@ -118,7 +143,7 @@ class MetadataTranslator: """ @classmethod - def defined_in_this_class(cls, name): + def defined_in_this_class(cls, name: str) -> Optional[bool]: """Report if the specified class attribute is defined specifically in this class. @@ -166,7 +191,7 @@ def defined_in_this_class(cls, name): return True @classmethod - def _make_const_mapping(cls, property_key, constant): + def _make_const_mapping(cls, property_key: str, constant: Any) -> Callable: """Make a translator method that returns a constant value. Parameters @@ -182,14 +207,14 @@ def _make_const_mapping(cls, property_key, constant): Function returning the constant. """ - def constant_translator(self): + def constant_translator(self: MetadataTranslator) -> Any: return constant if property_key in cls.all_properties: property_doc = cls.all_properties[property_key].doc return_type = cls.all_properties[property_key].py_type else: - return_type = type(constant).__name__ + return_type = type(constant) property_doc = f"Returns constant value for '{property_key}' property" constant_translator.__doc__ = f"""{property_doc} @@ -203,8 +228,15 @@ def constant_translator(self): @classmethod def _make_trivial_mapping( - cls, property_key, header_key, default=None, minimum=None, maximum=None, unit=None, checker=None - ): + cls, + property_key: str, + header_key: Union[str, Sequence[str]], + default: Optional[Any] = None, + minimum: Optional[Any] = None, + maximum: Optional[Any] = None, + unit: Optional[astropy.unit.Unit] = None, + checker: Optional[Callable] = None, + ) -> Callable: """Make a translator method returning a header value. The header value can be converted to a `~astropy.units.Quantity` @@ -253,7 +285,7 @@ def _make_trivial_mapping( return_type = "str` or `numbers.Number" property_doc = f"Map '{header_key}' header keyword to '{property_key}' property" - def trivial_translator(self): + def trivial_translator(self: MetadataTranslator) -> Any: if unit is not None: q = self.quantity_from_card( header_key, unit, default=default, minimum=minimum, maximum=maximum, checker=checker @@ -277,10 +309,9 @@ def trivial_translator(self): if checker is not None: try: checker(self) - return default except Exception: raise KeyError(f"Could not find {keywords} in header") - value = default + return default elif default is not None: value = default else: @@ -308,7 +339,7 @@ def trivial_translator(self): return trivial_translator @classmethod - def __init_subclass__(cls, **kwargs): + def __init_subclass__(cls, **kwargs: Any) -> None: """Register all subclasses with the base class and create dynamic translator methods. @@ -399,20 +430,20 @@ def __init_subclass__(cls, **kwargs): if property_key not in properties: log.warning(f"Unexpected constant translator for '{property_key}' defined in {cls}") - def __init__(self, header, filename=None): + def __init__(self, header: Mapping[str, Any], filename: Optional[str] = None) -> None: self._header = header self.filename = filename - self._used_cards = set() + self._used_cards: Set[str] = set() # Prefix to use for warnings about failed translations - self._log_prefix_cache = None + self._log_prefix_cache: Optional[str] = None # Cache assumes header is read-only once stored in object - self._translation_cache = {} + self._translation_cache: Dict[str, Any] = {} @classmethod @abstractmethod - def can_translate(cls, header, filename=None): + def can_translate(cls, header: MutableMapping[str, Any], filename: Optional[str] = None) -> bool: """Indicate whether this translation class can translate the supplied header. @@ -432,7 +463,9 @@ def can_translate(cls, header, filename=None): raise NotImplementedError() @classmethod - def can_translate_with_options(cls, header, options, filename=None): + def can_translate_with_options( + cls, header: Mapping[str, Any], options: Dict[str, Any], filename: Optional[str] = None + ) -> bool: """Helper method for `can_translate` allowing options. Parameters @@ -466,7 +499,9 @@ def can_translate_with_options(cls, header, options, filename=None): return False @classmethod - def determine_translator(cls, header, filename=None): + def determine_translator( + cls, header: Mapping[str, Any], filename: Optional[str] = None + ) -> Type[MetadataTranslator]: """Determine a translation class by examining the header Parameters @@ -502,7 +537,7 @@ def determine_translator(cls, header, filename=None): ) @classmethod - def translator_version(cls): + def translator_version(cls) -> str: """Return the version string for this translator class. Returns @@ -541,7 +576,9 @@ def translator_version(cls): return version @classmethod - def fix_header(cls, header, instrument, obsid, filename=None): + def fix_header( + cls, header: MutableMapping[str, Any], instrument: str, obsid: str, filename: Optional[str] = None + ) -> bool: """Apply global fixes to a supplied header. Parameters @@ -587,7 +624,7 @@ def fix_header(cls, header, instrument, obsid, filename=None): return False @staticmethod - def _construct_log_prefix(obsid, filename=None): + def _construct_log_prefix(obsid: str, filename: Optional[str] = None) -> str: """Construct a log prefix string from the obsid and filename. Parameters @@ -603,7 +640,7 @@ def _construct_log_prefix(obsid, filename=None): return obsid @property - def _log_prefix(self): + def _log_prefix(self) -> str: """Standard prefix that can be used for log messages to report useful context. @@ -626,7 +663,7 @@ def _log_prefix(self): self._log_prefix_cache = self._construct_log_prefix(obsid, self.filename) return self._log_prefix_cache - def _used_these_cards(self, *args): + def _used_these_cards(self, *args: str) -> None: """Indicate that the supplied cards have been used for translation. Parameters @@ -636,7 +673,7 @@ def _used_these_cards(self, *args): """ self._used_cards.update(set(args)) - def cards_used(self): + def cards_used(self) -> FrozenSet[str]: """Cards used during metadata extraction. Returns @@ -647,7 +684,9 @@ def cards_used(self): return frozenset(self._used_cards) @staticmethod - def validate_value(value, default, minimum=None, maximum=None): + def validate_value( + value: float, default: float, minimum: Optional[float] = None, maximum: Optional[float] = None + ) -> float: """Validate the supplied value, returning a new value if out of range Parameters @@ -680,7 +719,7 @@ def validate_value(value, default, minimum=None, maximum=None): return value @staticmethod - def is_keyword_defined(header, keyword): + def is_keyword_defined(header: Mapping[str, Any], keyword: Optional[str]) -> bool: """Return `True` if the value associated with the named keyword is present in the supplied header and defined. @@ -708,7 +747,7 @@ def is_keyword_defined(header, keyword): return True - def resource_root(self): + def resource_root(self) -> Tuple[Optional[str], Optional[str]]: """Package resource to use to locate correction resources within an installed package. @@ -723,7 +762,7 @@ def resource_root(self): """ return (self.default_resource_package, self.default_resource_root) - def search_paths(self): + def search_paths(self) -> List[str]: """Search paths to use when searching for header fix up correction files. @@ -738,10 +777,10 @@ def search_paths(self): Uses the classes ``default_search_path`` property if defined. """ if self.default_search_path is not None: - return [self.default_search_path] + return [p for p in self.default_search_path] return [] - def is_key_ok(self, keyword): + def is_key_ok(self, keyword: Optional[str]) -> bool: """Return `True` if the value associated with the named keyword is present in this header and defined. @@ -757,7 +796,7 @@ def is_key_ok(self, keyword): """ return self.is_keyword_defined(self._header, keyword) - def are_keys_ok(self, keywords): + def are_keys_ok(self, keywords: Iterable[str]) -> bool: """Are the supplied keys all present and defined? Parameters @@ -775,7 +814,15 @@ def are_keys_ok(self, keywords): return False return True - def quantity_from_card(self, keywords, unit, default=None, minimum=None, maximum=None, checker=None): + def quantity_from_card( + self, + keywords: Union[str, Sequence[str]], + unit: u.Unit, + default: Optional[float] = None, + minimum: Optional[float] = None, + maximum: Optional[float] = None, + checker: Optional[Callable] = None, + ) -> u.Quantity: """Calculate a Astropy Quantity from a header card and a unit. Parameters @@ -812,8 +859,8 @@ def quantity_from_card(self, keywords, unit, default=None, minimum=None, maximum KeyError The supplied header key is not present. """ - keywords = keywords if isinstance(keywords, list) else [keywords] - for k in keywords: + keyword_list = [keywords] if isinstance(keywords, str) else list(keywords) + for k in keyword_list: if self.is_key_ok(k): value = self._header[k] keyword = k @@ -838,7 +885,7 @@ def quantity_from_card(self, keywords, unit, default=None, minimum=None, maximum value = self.validate_value(value, default, maximum=maximum, minimum=minimum) return u.Quantity(value, unit=unit) - def _join_keyword_values(self, keywords, delim="+"): + def _join_keyword_values(self, keywords: Iterable[str], delim: str = "+") -> str: """Join values of all defined keywords with the specified delimiter. Parameters @@ -869,7 +916,7 @@ def _join_keyword_values(self, keywords, delim="+"): return joined @cache_translation - def to_detector_unique_name(self): + def to_detector_unique_name(self) -> str: """Return a unique name for the detector. Base class implementation attempts to combine ``detector_name`` with @@ -901,7 +948,7 @@ def to_detector_unique_name(self): return name @cache_translation - def to_exposure_group(self): + def to_exposure_group(self) -> Optional[str]: """Return the group label associated with this exposure. Base class implementation returns the ``exposure_id`` in string @@ -914,12 +961,14 @@ def to_exposure_group(self): """ exposure_id = self.to_exposure_id() if exposure_id is None: - return None + # mypy does not think this can ever happen but play it safe + # with subclasses. + return None # type: ignore else: return str(exposure_id) @cache_translation - def to_observation_reason(self): + def to_observation_reason(self) -> str: """Return the reason this observation was taken. Base class implementation returns the ``science`` if the @@ -937,7 +986,7 @@ def to_observation_reason(self): return "unknown" @cache_translation - def to_observing_day(self): + def to_observing_day(self) -> int: """Return the YYYYMMDD integer corresponding to the observing day. Base class implementation uses the TAI date of the start of the @@ -957,7 +1006,7 @@ def to_observing_day(self): return int(datetime_begin.tai.strftime("%Y%m%d")) @cache_translation - def to_observation_counter(self): + def to_observation_counter(self) -> int: """Return an integer corresponding to how this observation relates to other observations. @@ -975,7 +1024,9 @@ def to_observation_counter(self): return 0 @classmethod - def determine_translatable_headers(cls, filename, primary=None): + def determine_translatable_headers( + cls, filename: str, primary: Optional[MutableMapping[str, Any]] = None + ) -> Iterator[MutableMapping[str, Any]]: """Given a file return all the headers usable for metadata translation. This method can optionally be given a header from the file. This @@ -1036,10 +1087,14 @@ class to then call to obtain the real headers to be used for from .file_helpers import read_basic_metadata_from_file # Merge primary and secondary header if they exist. - yield read_basic_metadata_from_file(filename, -1) + header = read_basic_metadata_from_file(filename, -1) + assert header is not None # for mypy since can_raise=True + yield header -def _make_abstract_translator_method(property, doc, return_typedoc, return_type): +def _make_abstract_translator_method( + property: str, doc: str, return_typedoc: str, return_type: Type +) -> Callable: """Create a an abstract translation method for this property. Parameters @@ -1059,7 +1114,7 @@ def _make_abstract_translator_method(property, doc, return_typedoc, return_type) Translator method for this property. """ - def to_property(self): + def to_property(self: MetadataTranslator) -> None: raise NotImplementedError(f"Translator for '{property}' undefined.") to_property.__doc__ = f"""Return value of {property} from headers. @@ -1117,7 +1172,9 @@ class StubTranslator(MetadataTranslator): pass -def _make_forwarded_stub_translator_method(cls, property, doc, return_typedoc, return_type): +def _make_forwarded_stub_translator_method( + cls: Type[MetadataTranslator], property: str, doc: str, return_typedoc: str, return_type: Type +) -> Callable: """Create a stub translation method for this property that calls the base method and catches `NotImplementedError`. @@ -1142,7 +1199,7 @@ def _make_forwarded_stub_translator_method(cls, property, doc, return_typedoc, r """ method = f"to_{property}" - def to_stub(self): + def to_stub(self: MetadataTranslator) -> Any: parent = getattr(super(cls, self), method, None) try: if parent is not None: @@ -1178,6 +1235,6 @@ def to_stub(self): StubTranslator, f"to_{name}", _make_forwarded_stub_translator_method( - StubTranslator, name, definition.doc, definition.str_type, definition.py_type + StubTranslator, name, definition.doc, definition.str_type, definition.py_type # type: ignore ), ) diff --git a/python/astro_metadata_translator/translators/decam.py b/python/astro_metadata_translator/translators/decam.py index 3876ba7a..2a6387aa 100644 --- a/python/astro_metadata_translator/translators/decam.py +++ b/python/astro_metadata_translator/translators/decam.py @@ -11,11 +11,14 @@ """Metadata translation code for DECam FITS headers""" +from __future__ import annotations + __all__ = ("DecamTranslator",) import logging import posixpath import re +from typing import TYPE_CHECKING, Any, Dict, Iterator, List, MutableMapping, Optional, Tuple, Union import astropy.units as u from astropy.coordinates import Angle, EarthLocation @@ -25,6 +28,10 @@ from .fits import FitsTranslator from .helpers import altaz_from_degree_headers, is_non_science, tracking_from_degree_headers +if TYPE_CHECKING: + import astropy.coordinates + import astropy.time + log = logging.getLogger(__name__) @@ -47,7 +54,7 @@ class DecamTranslator(FitsTranslator): "boresight_rotation_coord": "sky", } - _trivial_map = { + _trivial_map: Dict[str, Union[str, List[str], Tuple[Any, ...]]] = { "exposure_time": ("EXPTIME", dict(unit=u.s)), "dark_time": ("DARKTIME", dict(unit=u.s)), "boresight_airmass": ("AIRMASS", dict(checker=is_non_science)), @@ -136,7 +143,7 @@ class DecamTranslator(FitsTranslator): } @classmethod - def can_translate(cls, header, filename=None): + def can_translate(cls, header: MutableMapping[str, Any], filename: Optional[str] = None) -> bool: """Indicate whether this translation class can translate the supplied header. @@ -166,7 +173,7 @@ def can_translate(cls, header, filename=None): return False @cache_translation - def to_exposure_id(self): + def to_exposure_id(self) -> int: """Calculate exposure ID. Returns @@ -179,7 +186,7 @@ def to_exposure_id(self): return value @cache_translation - def to_observation_counter(self): + def to_observation_counter(self) -> int: """Return the lifetime exposure number. Returns @@ -190,12 +197,12 @@ def to_observation_counter(self): return self.to_exposure_id() @cache_translation - def to_visit_id(self): + def to_visit_id(self) -> int: # Docstring will be inherited. Property defined in properties.py return self.to_exposure_id() @cache_translation - def to_datetime_end(self): + def to_datetime_end(self) -> astropy.time.Time: # Docstring will be inherited. Property defined in properties.py # Instcals have no DATE-END or DTUTC datetime_end = self._from_fits_date("DTUTC", scale="utc") @@ -203,7 +210,7 @@ def to_datetime_end(self): datetime_end = self.to_datetime_begin() + self.to_exposure_time() return datetime_end - def _translate_from_calib_id(self, field): + def _translate_from_calib_id(self, field: str) -> str: """Fetch the ID from the CALIB_ID header. Calibration products made with constructCalibs have some metadata @@ -211,11 +218,13 @@ def _translate_from_calib_id(self, field): """ data = self._header["CALIB_ID"] match = re.search(r".*%s=(\S+)" % field, data) + if not match: + raise RuntimeError(f"Header CALIB_ID with value '{data}' has not field '{field}'") self._used_these_cards("CALIB_ID") return match.groups()[0] @cache_translation - def to_physical_filter(self): + def to_physical_filter(self) -> Optional[str]: """Calculate physical filter. Return `None` if the keyword FILTER does not exist in the header, @@ -236,7 +245,7 @@ def to_physical_filter(self): return None @cache_translation - def to_location(self): + def to_location(self) -> astropy.coordinates.EarthLocation: """Calculate the observatory location. Returns @@ -257,7 +266,7 @@ def to_location(self): return value @cache_translation - def to_observation_type(self): + def to_observation_type(self) -> str: """Calculate the observation type. Returns @@ -274,19 +283,19 @@ def to_observation_type(self): return obstype @cache_translation - def to_tracking_radec(self): + def to_tracking_radec(self) -> astropy.coordinates.SkyCoord: # Docstring will be inherited. Property defined in properties.py radecsys = ("RADESYS",) radecpairs = (("TELRA", "TELDEC"),) return tracking_from_degree_headers(self, radecsys, radecpairs, unit=(u.hourangle, u.deg)) @cache_translation - def to_altaz_begin(self): + def to_altaz_begin(self) -> astropy.coordinates.AltAz: # Docstring will be inherited. Property defined in properties.py return altaz_from_degree_headers(self, (("ZD", "AZ"),), self.to_datetime_begin(), is_zd=set(["ZD"])) @cache_translation - def to_detector_exposure_id(self): + def to_detector_exposure_id(self) -> Optional[int]: # Docstring will be inherited. Property defined in properties.py exposure_id = self.to_exposure_id() if exposure_id is None: @@ -294,19 +303,21 @@ def to_detector_exposure_id(self): return int("{:07d}{:02d}".format(exposure_id, self.to_detector_num())) @cache_translation - def to_detector_group(self): + def to_detector_group(self) -> str: # Docstring will be inherited. Property defined in properties.py name = self.to_detector_unique_name() return name[0] @cache_translation - def to_detector_name(self): + def to_detector_name(self) -> str: # Docstring will be inherited. Property defined in properties.py name = self.to_detector_unique_name() return name[1:] @classmethod - def fix_header(cls, header, instrument, obsid, filename=None): + def fix_header( + cls, header: MutableMapping[str, Any], instrument: str, obsid: str, filename: Optional[str] = None + ) -> bool: """Fix DECam headers. Parameters @@ -351,7 +362,9 @@ def fix_header(cls, header, instrument, obsid, filename=None): return modified @classmethod - def determine_translatable_headers(cls, filename, primary=None): + def determine_translatable_headers( + cls, filename: str, primary: Optional[MutableMapping[str, Any]] = None + ) -> Iterator[MutableMapping[str, Any]]: """Given a file return all the headers usable for metadata translation. DECam files are multi-extension FITS with a primary header and @@ -390,6 +403,10 @@ class to then call to obtain the real headers to be used for # Circular dependency so must defer import. from ..headers import merge_headers + # This is convoluted because we need to turn an Optional variable + # to a Dict so that mypy is happy. + primary_hdr = primary if primary else {} + # Since we want to scan many HDUs we use astropy directly to keep # the file open rather than continually opening and closing it # as we go to each HDU. @@ -400,8 +417,8 @@ class to then call to obtain the real headers to be used for for hdu in fits_file: if first_pass: - if not primary: - primary = hdu.header + if not primary_hdr: + primary_hdr = hdu.header first_pass = False continue @@ -410,4 +427,4 @@ class to then call to obtain the real headers to be used for continue if header["CCDNUM"] > 62: # ignore guide CCDs continue - yield merge_headers([primary, header], mode="overwrite") + yield merge_headers([primary_hdr, header], mode="overwrite") diff --git a/python/astro_metadata_translator/translators/fits.py b/python/astro_metadata_translator/translators/fits.py index c89298c7..c47a7462 100644 --- a/python/astro_metadata_translator/translators/fits.py +++ b/python/astro_metadata_translator/translators/fits.py @@ -11,8 +11,12 @@ """Metadata translation code for standard FITS headers""" +from __future__ import annotations + __all__ = ("FitsTranslator",) +from typing import Any, Dict, List, MutableMapping, Optional, Tuple, Union + import astropy.units as u from astropy.coordinates import EarthLocation from astropy.time import Time @@ -33,10 +37,12 @@ class FitsTranslator(MetadataTranslator): """ # Direct translation from header key to standard form - _trivial_map = dict(instrument="INSTRUME", telescope="TELESCOP") + _trivial_map: Dict[str, Union[str, List[str], Tuple[Any, ...]]] = dict( + instrument="INSTRUME", telescope="TELESCOP" + ) @classmethod - def can_translate(cls, header, filename=None): + def can_translate(cls, header: MutableMapping[str, Any], filename: Optional[str] = None) -> bool: """Indicate whether this translation class can translate the supplied header. @@ -70,7 +76,9 @@ def can_translate(cls, header, filename=None): return instrument == cls.supported_instrument @classmethod - def _from_fits_date_string(cls, date_str, scale="utc", time_str=None): + def _from_fits_date_string( + cls, date_str: str, scale: str = "utc", time_str: Optional[str] = None + ) -> Time: """Parse standard FITS ISO-style date string and return time object Parameters @@ -96,7 +104,9 @@ def _from_fits_date_string(cls, date_str, scale="utc", time_str=None): return Time(date_str, format="isot", scale=scale) - def _from_fits_date(self, date_key, mjd_key=None, scale=None): + def _from_fits_date( + self, date_key: str, mjd_key: Optional[str] = None, scale: Optional[str] = None + ) -> Time: """Calculate a date object from the named FITS header Uses the TIMESYS header if present to determine the time scale, @@ -136,6 +146,7 @@ def _from_fits_date(self, date_key, mjd_key=None, scale=None): value = self._from_fits_date_string(date_str, scale=scale) used.append(date_key) elif self.is_key_ok(mjd_key): + assert mjd_key is not None # for mypy (is_key_ok checks this) value = Time(self._header[mjd_key], scale=scale, format="mjd") used.append(mjd_key) else: @@ -144,7 +155,7 @@ def _from_fits_date(self, date_key, mjd_key=None, scale=None): return value @cache_translation - def to_datetime_begin(self): + def to_datetime_begin(self) -> Time: """Calculate start time of observation. Uses FITS standard ``MJD-OBS`` or ``DATE-OBS``, in conjunction @@ -158,7 +169,7 @@ def to_datetime_begin(self): return self._from_fits_date("DATE-OBS", mjd_key="MJD-OBS") @cache_translation - def to_datetime_end(self): + def to_datetime_end(self) -> Time: """Calculate end time of observation. Uses FITS standard ``MJD-END`` or ``DATE-END``, in conjunction @@ -172,7 +183,7 @@ def to_datetime_end(self): return self._from_fits_date("DATE-END", mjd_key="MJD-END") @cache_translation - def to_location(self): + def to_location(self) -> EarthLocation: """Calculate the observatory location. Uses FITS standard ``OBSGEO-`` headers. diff --git a/python/astro_metadata_translator/translators/helpers.py b/python/astro_metadata_translator/translators/helpers.py index e62dc29c..11486e82 100644 --- a/python/astro_metadata_translator/translators/helpers.py +++ b/python/astro_metadata_translator/translators/helpers.py @@ -22,6 +22,8 @@ """ +from __future__ import annotations + __all__ = ( "to_location_via_telescope_name", "is_non_science", @@ -30,14 +32,20 @@ ) import logging +from typing import TYPE_CHECKING, Optional, Sequence, Set, Tuple import astropy.units as u from astropy.coordinates import AltAz, EarthLocation, SkyCoord +if TYPE_CHECKING: + import astropy.units + + from ..translator import MetadataTranslator + log = logging.getLogger(__name__) -def to_location_via_telescope_name(self): +def to_location_via_telescope_name(self: MetadataTranslator) -> EarthLocation: """Calculate the observatory location via the telescope name. Returns @@ -48,7 +56,7 @@ def to_location_via_telescope_name(self): return EarthLocation.of_site(self.to_telescope()) -def is_non_science(self): +def is_non_science(self: MetadataTranslator) -> None: """Raise an exception if this is a science observation. Raises @@ -61,7 +69,7 @@ def is_non_science(self): return -def altitude_from_zenith_distance(zd): +def altitude_from_zenith_distance(zd: astropy.units.Quantity) -> astropy.units.Quantity: """Convert zenith distance to altitude Parameters @@ -77,7 +85,12 @@ def altitude_from_zenith_distance(zd): return 90.0 * u.deg - zd -def tracking_from_degree_headers(self, radecsys, radecpairs, unit=u.deg): +def tracking_from_degree_headers( + self: MetadataTranslator, + radecsys: Sequence[str], + radecpairs: Tuple[Tuple[str, str], ...], + unit: astropy.units.Unit = u.deg, +) -> SkyCoord: """Calculate the tracking coordinates from lists of headers. Parameters @@ -132,7 +145,12 @@ def tracking_from_degree_headers(self, radecsys, radecpairs, unit=u.deg): return None -def altaz_from_degree_headers(self, altazpairs, obstime, is_zd=None): +def altaz_from_degree_headers( + self: MetadataTranslator, + altazpairs: Tuple[Tuple[str, str], ...], + obstime: astropy.time.Time, + is_zd: Optional[Set[str]] = None, +) -> AltAz: """Calculate the altitude/azimuth coordinates from lists of headers. If the altitude is found but is greater than 90 deg, it will be returned diff --git a/python/astro_metadata_translator/translators/hsc.py b/python/astro_metadata_translator/translators/hsc.py index 4f7707c6..024b7c7b 100644 --- a/python/astro_metadata_translator/translators/hsc.py +++ b/python/astro_metadata_translator/translators/hsc.py @@ -11,11 +11,14 @@ """Metadata translation code for HSC FITS headers""" +from __future__ import annotations + __all__ = ("HscTranslator",) import logging import posixpath import re +from typing import Any, MutableMapping, Optional import astropy.units as u from astropy.coordinates import Angle @@ -175,7 +178,7 @@ class HscTranslator(SuprimeCamTranslator): ] @classmethod - def can_translate(cls, header, filename=None): + def can_translate(cls, header: MutableMapping[str, Any], filename: Optional[str] = None) -> bool: """Indicate whether this translation class can translate the supplied header. @@ -206,7 +209,7 @@ def can_translate(cls, header, filename=None): return False @cache_translation - def to_exposure_id(self): + def to_exposure_id(self) -> int: """Calculate unique exposure integer for this observation Returns @@ -240,7 +243,7 @@ def to_exposure_id(self): return visit + 1000000 * (ord(letter) - ord("A")) @cache_translation - def to_boresight_rotation_angle(self): + def to_boresight_rotation_angle(self) -> Angle: # Docstring will be inherited. Property defined in properties.py # Rotation angle formula determined empirically from visual inspection # of HSC images. See DM-9111. @@ -249,7 +252,7 @@ def to_boresight_rotation_angle(self): return angle @cache_translation - def to_detector_num(self): + def to_detector_num(self) -> int: """Calculate the detector number. Focus CCDs were numbered incorrectly in the readout software during @@ -273,18 +276,18 @@ def to_detector_num(self): return ccd @cache_translation - def to_detector_exposure_id(self): + def to_detector_exposure_id(self) -> int: # Docstring will be inherited. Property defined in properties.py return self.to_exposure_id() * 200 + self.to_detector_num() @cache_translation - def to_detector_group(self): + def to_detector_group(self) -> str: # Docstring will be inherited. Property defined in properties.py unique = self.to_detector_unique_name() return unique.split("_")[0] @cache_translation - def to_detector_unique_name(self): + def to_detector_unique_name(self) -> str: # Docstring will be inherited. Property defined in properties.py # Mapping from number to unique name is defined solely in camera # geom files. @@ -293,7 +296,7 @@ def to_detector_unique_name(self): return self._DETECTOR_NUM_TO_UNIQUE_NAME[num] @cache_translation - def to_detector_name(self): + def to_detector_name(self) -> str: # Docstring will be inherited. Property defined in properties.py # Name is defined from unique name unique = self.to_detector_unique_name() diff --git a/python/astro_metadata_translator/translators/megaprime.py b/python/astro_metadata_translator/translators/megaprime.py index cbacc7df..c190f978 100644 --- a/python/astro_metadata_translator/translators/megaprime.py +++ b/python/astro_metadata_translator/translators/megaprime.py @@ -11,10 +11,13 @@ """Metadata translation code for CFHT MegaPrime FITS headers""" +from __future__ import annotations + __all__ = ("MegaPrimeTranslator",) import posixpath import re +from typing import TYPE_CHECKING, Any, Dict, Iterator, List, MutableMapping, Optional, Tuple, Union import astropy.units as u from astropy.coordinates import Angle, EarthLocation @@ -24,6 +27,11 @@ from .fits import FitsTranslator from .helpers import altaz_from_degree_headers, tracking_from_degree_headers +if TYPE_CHECKING: + import astropy.coordinates + import astropy.time + import astropy.units + class MegaPrimeTranslator(FitsTranslator): """Metadata translator for CFHT MegaPrime standard headers.""" @@ -45,7 +53,7 @@ class MegaPrimeTranslator(FitsTranslator): "detector_group": None, } - _trivial_map = { + _trivial_map: Dict[str, Union[str, List[str], Tuple[Any, ...]]] = { "physical_filter": "FILTER", "dark_time": ("DARKTIME", dict(unit=u.s)), "exposure_time": ("EXPTIME", dict(unit=u.s)), @@ -61,7 +69,7 @@ class MegaPrimeTranslator(FitsTranslator): } @cache_translation - def to_datetime_begin(self): + def to_datetime_begin(self) -> astropy.time.Time: # Docstring will be inherited. Property defined in properties.py # We know it is UTC value = self._from_fits_date_string( @@ -71,7 +79,7 @@ def to_datetime_begin(self): return value @cache_translation - def to_datetime_end(self): + def to_datetime_end(self) -> astropy.time.Time: # Docstring will be inherited. Property defined in properties.py # Older files are missing UTCEND if self.is_key_ok("UTCEND"): @@ -86,7 +94,7 @@ def to_datetime_end(self): return value @cache_translation - def to_location(self): + def to_location(self) -> EarthLocation: """Calculate the observatory location. Returns @@ -108,7 +116,7 @@ def to_location(self): return value @cache_translation - def to_detector_name(self): + def to_detector_name(self) -> str: # Docstring will be inherited. Property defined in properties.py if self.is_key_ok("EXTNAME"): name = self._header["EXTNAME"] @@ -121,12 +129,12 @@ def to_detector_name(self): return "ccd99" @cache_translation - def to_detector_num(self): + def to_detector_num(self) -> int: name = self.to_detector_name() return int(name[3:]) @cache_translation - def to_observation_type(self): + def to_observation_type(self) -> str: """Calculate the observation type. Returns @@ -141,7 +149,7 @@ def to_observation_type(self): return obstype @cache_translation - def to_tracking_radec(self): + def to_tracking_radec(self) -> astropy.coordinates.SkyCoord: """Calculate the tracking RA/Dec for this observation. Currently will be `None` for geocentric apparent coordinates. @@ -160,19 +168,19 @@ def to_tracking_radec(self): return tracking_from_degree_headers(self, radecsys, radecpairs) @cache_translation - def to_altaz_begin(self): + def to_altaz_begin(self) -> astropy.coordinates.AltAz: # Docstring will be inherited. Property defined in properties.py return altaz_from_degree_headers( self, (("TELALT", "TELAZ"), ("BORE-ALT", "BORE-AZ")), self.to_datetime_begin() ) @cache_translation - def to_detector_exposure_id(self): + def to_detector_exposure_id(self) -> int: # Docstring will be inherited. Property defined in properties.py return self.to_exposure_id() * 36 + self.to_detector_num() @cache_translation - def to_pressure(self): + def to_pressure(self) -> astropy.units.Quantity: # Docstring will be inherited. Property defined in properties.py # Can be either AIRPRESS in Pa or PRESSURE in mbar for key, unit in (("PRESSURE", u.hPa), ("AIRPRESS", u.Pa)): @@ -182,7 +190,7 @@ def to_pressure(self): raise KeyError(f"{self._log_prefix}: Could not find pressure keywords in header") @cache_translation - def to_observation_counter(self): + def to_observation_counter(self) -> int: """Return the lifetime exposure number. Returns @@ -193,7 +201,9 @@ def to_observation_counter(self): return self.to_exposure_id() @classmethod - def determine_translatable_headers(cls, filename, primary=None): + def determine_translatable_headers( + cls, filename: str, primary: Optional[MutableMapping[str, Any]] = None + ) -> Iterator[MutableMapping[str, Any]]: """Given a file return all the headers usable for metadata translation. MegaPrime files are multi-extension FITS with a primary header and diff --git a/python/astro_metadata_translator/translators/sdss.py b/python/astro_metadata_translator/translators/sdss.py index c5f9def2..5ad49af4 100644 --- a/python/astro_metadata_translator/translators/sdss.py +++ b/python/astro_metadata_translator/translators/sdss.py @@ -11,9 +11,12 @@ """Metadata translation code for SDSS FITS headers""" +from __future__ import annotations + __all__ = ("SdssTranslator",) import posixpath +from typing import TYPE_CHECKING, Any, MutableMapping, Optional import astropy.units as u from astropy.coordinates import AltAz, Angle, EarthLocation @@ -22,6 +25,10 @@ from .fits import FitsTranslator from .helpers import tracking_from_degree_headers +if TYPE_CHECKING: + import astropy.coordinates + import astropy.time + class SdssTranslator(FitsTranslator): """Metadata translator for SDSS standard headers. @@ -98,7 +105,7 @@ class SdssTranslator(FitsTranslator): } @classmethod - def can_translate(cls, header, filename=None): + def can_translate(cls, header: MutableMapping[str, Any], filename: Optional[str] = None) -> bool: """Indicate whether this translation class can translate the supplied header. @@ -127,7 +134,7 @@ def can_translate(cls, header, filename=None): return False @cache_translation - def to_detector_unique_name(self): + def to_detector_unique_name(self) -> str: # Docstring will be inherited. Property defined in properties.py if self.is_key_ok("CAMCOL"): return self.to_physical_filter() + str(self._header["CAMCOL"]) @@ -135,12 +142,12 @@ def to_detector_unique_name(self): raise ValueError(f"{self._log_prefix}: CAMCOL key is not definded") @cache_translation - def to_detector_num(self): + def to_detector_num(self) -> int: # Docstring will be inherited. Property defined in properties.py return self.detector_name_id_map[self.to_detector_unique_name()] @cache_translation - def to_observation_id(self): + def to_observation_id(self) -> str: """Calculate the observation ID. Returns @@ -152,7 +159,7 @@ def to_observation_id(self): return " ".join([str(self._header[el]) for el in ["RUN", "CAMCOL", "FILTER", "FRAME"]]) @cache_translation - def to_datetime_begin(self): + def to_datetime_begin(self) -> astropy.time.Time: # Docstring will be inherited. Property defined in properties.py # We know it is UTC value = self._from_fits_date_string( @@ -162,12 +169,12 @@ def to_datetime_begin(self): return value @cache_translation - def to_datetime_end(self): + def to_datetime_end(self) -> astropy.time.Time: # Docstring will be inherited. Property defined in properties.py return self.to_datetime_begin() + self.to_exposure_time() @cache_translation - def to_location(self): + def to_location(self) -> EarthLocation: """Calculate the observatory location. Returns @@ -182,7 +189,7 @@ def to_location(self): return value @cache_translation - def to_observation_type(self): + def to_observation_type(self) -> str: """Calculate the observation type. Returns @@ -198,14 +205,14 @@ def to_observation_type(self): return obstype @cache_translation - def to_tracking_radec(self): + def to_tracking_radec(self) -> astropy.coordinates.SkyCoord: # Docstring will be inherited. Property defined in properties.py radecsys = ("RADECSYS",) radecpairs = (("RA", "DEC"),) return tracking_from_degree_headers(self, radecsys, radecpairs, unit=u.deg) @cache_translation - def to_altaz_begin(self): + def to_altaz_begin(self) -> AltAz: # Docstring will be inherited. Property defined in properties.py try: az = self._header["AZ"] @@ -225,14 +232,15 @@ def to_altaz_begin(self): raise (e) @cache_translation - def to_boresight_airmass(self): + def to_boresight_airmass(self) -> Optional[float]: # Docstring will be inherited. Property defined in properties.py altaz = self.to_altaz_begin() if altaz is not None: return altaz.secz.value # This is an estimate + return None @cache_translation - def to_detector_exposure_id(self): + def to_detector_exposure_id(self) -> Optional[int]: # Docstring will be inherited. Property defined in properties.py try: frame_field_map = dict(r=0, i=2, u=4, z=6, g=8) @@ -249,7 +257,7 @@ def to_detector_exposure_id(self): return ((int(run) * 10 + filter_id_map[filt]) * 10 + int(camcol)) * 10000 + int(field) @cache_translation - def to_detector_group(self): + def to_detector_group(self) -> str: # Docstring will be inherited. Property defined in properties.py if self.is_key_ok("CAMCOL"): return str(self._header["CAMCOL"]) diff --git a/python/astro_metadata_translator/translators/subaru.py b/python/astro_metadata_translator/translators/subaru.py index 88456136..d95b6092 100644 --- a/python/astro_metadata_translator/translators/subaru.py +++ b/python/astro_metadata_translator/translators/subaru.py @@ -11,6 +11,8 @@ """Metadata translation code for Subaru telescope""" +from __future__ import annotations + __all__ = ("SubaruTranslator",) from astropy.coordinates import EarthLocation @@ -23,7 +25,7 @@ class SubaruTranslator(FitsTranslator): """Metadata translator for Subaru telescope headers.""" @cache_translation - def to_location(self): + def to_location(self) -> EarthLocation: """Returns the location of the Subaru telescope on Mauna Kea. Hardcodes the location and does not look at any headers. @@ -36,7 +38,7 @@ def to_location(self): return EarthLocation.from_geodetic(-155.476667, 19.825556, 4139.0) @cache_translation - def to_observation_counter(self): + def to_observation_counter(self) -> int: """Return the lifetime exposure number. Returns diff --git a/python/astro_metadata_translator/translators/suprimecam.py b/python/astro_metadata_translator/translators/suprimecam.py index 9b252a5c..b498dd6c 100644 --- a/python/astro_metadata_translator/translators/suprimecam.py +++ b/python/astro_metadata_translator/translators/suprimecam.py @@ -11,11 +11,14 @@ """Metadata translation code for SuprimeCam FITS headers""" +from __future__ import annotations + __all__ = ("SuprimeCamTranslator",) import logging import posixpath import re +from typing import TYPE_CHECKING, Any, Dict, List, MutableMapping, Optional, Tuple, Union import astropy.units as u from astropy.coordinates import Angle, SkyCoord @@ -24,6 +27,10 @@ from .helpers import altaz_from_degree_headers from .subaru import SubaruTranslator +if TYPE_CHECKING: + import astropy.coordinates + import astropy.time + log = logging.getLogger(__name__) @@ -42,7 +49,7 @@ class SuprimeCamTranslator(SubaruTranslator): _const_map = {"boresight_rotation_coord": "unknown", "detector_group": None} """Constant mappings""" - _trivial_map = { + _trivial_map: Dict[str, Union[str, List[str], Tuple[Any, ...]]] = { "observation_id": "EXP-ID", "object": "OBJECT", "science_program": "PROP-ID", @@ -61,7 +68,7 @@ class SuprimeCamTranslator(SubaruTranslator): _DAY0 = 53005 @classmethod - def can_translate(cls, header, filename=None): + def can_translate(cls, header: MutableMapping[str, Any], filename: Optional[str] = None) -> bool: """Indicate whether this translation class can translate the supplied header. @@ -87,7 +94,7 @@ def can_translate(cls, header, filename=None): return True return False - def _get_adjusted_mjd(self): + def _get_adjusted_mjd(self) -> int: """Calculate the modified julian date offset from reference day Returns @@ -100,7 +107,7 @@ def _get_adjusted_mjd(self): return int(mjd) - self._DAY0 @cache_translation - def to_physical_filter(self): + def to_physical_filter(self) -> str: # Docstring will be inherited. Property defined in properties.py value = self._header["FILTER01"].strip().upper() self._used_these_cards("FILTER01") @@ -112,7 +119,7 @@ def to_physical_filter(self): return value @cache_translation - def to_datetime_begin(self): + def to_datetime_begin(self) -> astropy.time.Time: # Docstring will be inherited. Property defined in properties.py # We know it is UTC value = self._from_fits_date_string( @@ -122,7 +129,7 @@ def to_datetime_begin(self): return value @cache_translation - def to_datetime_end(self): + def to_datetime_end(self) -> astropy.time.Time: # Docstring will be inherited. Property defined in properties.py # We know it is UTC value = self._from_fits_date_string( @@ -141,7 +148,7 @@ def to_datetime_end(self): return value @cache_translation - def to_exposure_id(self): + def to_exposure_id(self) -> int: """Calculate unique exposure integer for this observation Returns @@ -165,7 +172,7 @@ def to_exposure_id(self): return exposure @cache_translation - def to_visit_id(self): + def to_visit_id(self) -> int: """Calculate the unique integer ID for this visit. Assumed to be identical to the exposure ID in this implementation. @@ -178,7 +185,7 @@ def to_visit_id(self): return self.to_exposure_id() @cache_translation - def to_observation_type(self): + def to_observation_type(self) -> str: """Calculate the observation type. Returns @@ -193,7 +200,7 @@ def to_observation_type(self): return obstype @cache_translation - def to_tracking_radec(self): + def to_tracking_radec(self) -> SkyCoord: # Docstring will be inherited. Property defined in properties.py radec = SkyCoord( self._header["RA2000"], @@ -207,24 +214,24 @@ def to_tracking_radec(self): return radec @cache_translation - def to_altaz_begin(self): + def to_altaz_begin(self) -> astropy.coordinates.AltAz: # Docstring will be inherited. Property defined in properties.py return altaz_from_degree_headers(self, (("ALTITUDE", "AZIMUTH"),), self.to_datetime_begin()) @cache_translation - def to_boresight_rotation_angle(self): + def to_boresight_rotation_angle(self) -> Angle: # Docstring will be inherited. Property defined in properties.py angle = Angle(self.quantity_from_card("INR-STR", u.deg)) angle = angle.wrap_at("360d") return angle @cache_translation - def to_detector_exposure_id(self): + def to_detector_exposure_id(self) -> int: # Docstring will be inherited. Property defined in properties.py return self.to_exposure_id() * 10 + self.to_detector_num() @cache_translation - def to_detector_name(self): + def to_detector_name(self) -> str: # Docstring will be inherited. Property defined in properties.py # See https://subarutelescope.org/Observing/Instruments/SCam/ccd.html num = self.to_detector_num()