Skip to content

Commit

Permalink
Merge branch 'NOAA-EMC:develop' into ctests_extended
Browse files Browse the repository at this point in the history
  • Loading branch information
TerrenceMcGuinness-NOAA authored Jan 28, 2025
2 parents 9bfe5e0 + ee54eac commit e58bf3b
Show file tree
Hide file tree
Showing 20 changed files with 329 additions and 11 deletions.
4 changes: 3 additions & 1 deletion ci/cases/yamls/gfs_defaults_ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,7 @@ defaults:
!INC {{ HOMEgfs }}/parm/config/gfs/yaml/defaults.yaml
base:
ACCOUNT: {{ 'HPC_ACCOUNT' | getenv }}
DO_TEST_MODE: "NO"
DO_TEST_MODE: "YES"
FETCHDIR: "/NCEPDEV/emc-global/1year/David.Grumm/test_data"
DO_METP: "NO"

23 changes: 23 additions & 0 deletions jobs/JGLOBAL_FETCH
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#! /usr/bin/env bash

source "${HOMEgfs}/ush/preamble.sh"
source "${HOMEgfs}/ush/jjob_header.sh" -e "fetch" -c "base fetch"

# Execute fetching
"${SCRgfs}/exglobal_fetch.py"
err=$?

###############################################################
# Check for errors and exit if any of the above failed
if [[ "${err}" -ne 0 ]]; then
echo "FATAL ERROR: Unable to fetch ICs to ${ROTDIR}; ABORT!"
exit "${err}"
fi

##########################################
# Remove the Temporary working directory
##########################################
cd "${DATAROOT}" || (echo "${DATAROOT} does not exist. ABORT!"; exit 1)
[[ ${KEEPDATA} = "NO" ]] && rm -rf "${DATA}"

exit 0
18 changes: 18 additions & 0 deletions jobs/rocoto/fetch.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#! /usr/bin/env bash

source "${HOMEgfs}/ush/preamble.sh"

# Source FV3GFS workflow modules
. "${HOMEgfs}/ush/load_fv3gfs_modules.sh"
status=$?
[[ "${status}" -ne 0 ]] && exit "${status}"

export job="fetch"
export jobid="${job}.$$"

# Execute the JJOB
"${HOMEgfs}/jobs/JGLOBAL_FETCH"
status=$?


exit "${status}"
1 change: 1 addition & 0 deletions parm/config/gefs/config.base
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ export ROTDIR="@COMROOT@/${PSLOT}"

export ARCDIR="${NOSCRUB}/archive/${PSLOT}"
export ATARDIR="@ATARDIR@"
export FETCHDIR="@FETCHDIR@" # HPSS or local directory where IC tarball(s) can be found.

# Commonly defined parameters in JJOBS
export envir=${envir:-"prod"}
Expand Down
1 change: 1 addition & 0 deletions parm/config/gefs/config.fetch
5 changes: 5 additions & 0 deletions parm/config/gfs/config.base
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ if [[ "${PDY}${cyc}" -ge "2019092100" && "${PDY}${cyc}" -le "2019110700" ]]; the
fi
export ARCDIR="${NOSCRUB}/archive/${PSLOT}"
export ATARDIR="@ATARDIR@"
export FETCHDIR="@FETCHDIR@"

# Commonly defined parameters in JJOBS
export envir=${envir:-"prod"}
Expand Down Expand Up @@ -474,6 +475,10 @@ export DO_VRFY_OCEANDA="@DO_VRFY_OCEANDA@" # Run SOCA Ocean and Seaice DA verif
export FHMAX_FITS=132
[[ "${FHMAX_FITS}" -gt "${FHMAX_GFS}" ]] && export FHMAX_FITS=${FHMAX_GFS}

# User may choose to reset these at experiment setup time
export DO_FETCH_HPSS="NO" # Copy from HPSS (on HPSS-accessible machines) onto COM
export DO_FETCH_LOCAL="NO" # Copy from local disk onto COM

# Archiving options
export HPSSARCH="@HPSSARCH@" # save data to HPSS archive
export LOCALARCH="@LOCALARCH@" # save data to local archive
Expand Down
19 changes: 19 additions & 0 deletions parm/config/gfs/config.fetch
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#! /usr/bin/env bash

########## config.fetch ##########

echo "BEGIN: config.fetch"

# Get task specific resources
source "${EXPDIR}/config.resources" fetch

# Determine start type
if [[ "${EXP_WARM_START}" == ".false." ]]; then
ic_type="cold"
else
ic_type="warm"
fi

export FETCH_YAML_TMPL="${PARMgfs}/fetch/${NET}_${APP}_${ic_type}_${MODE}.yaml.j2"

echo "END: config.fetch"
4 changes: 2 additions & 2 deletions parm/config/gfs/config.resources
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ if (( $# != 1 )); then

echo "Must specify an input task argument to set resource variables!"
echo "argument can be any one of the following:"
echo "stage_ic aerosol_init"
echo "stage_ic aerosol_init fetch"
echo "prep prepatmiodaobs"
echo "atmanlinit atmanlvar atmanlfv3inc atmanlfinal"
echo "atmensanlinit atmensanlobs atmensanlsol atmensanlletkf atmensanlfv3inc atmensanlfinal"
Expand Down Expand Up @@ -1059,7 +1059,7 @@ case ${step} in
export is_exclusive=True
;;

"arch" | "earc" | "getic")
"arch" | "earc" | "getic" | "fetch")
walltime="06:00:00"
ntasks=1
tasks_per_node=1
Expand Down
7 changes: 6 additions & 1 deletion parm/config/gfs/config.stage_ic
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ echo "BEGIN: config.stage_ic"
# Get task specific resources
source "${EXPDIR}/config.resources" stage_ic

export ICSDIR="@ICSDIR@" # User provided ICSDIR; blank if not provided
if [[ "${DO_FETCH_HPSS^^}" =~ "Y" || "${DO_FETCH_LOCAL^^}" =~ "Y" ]]; then
export ICSDIR="${DATAROOT}" # fetch untars data into DATAROOT
else
export ICSDIR="@ICSDIR@" # User provided ICSDIR; blank if not provided
fi

export BASE_IC="@BASE_IC@" # Platform home for staged ICs

export STAGE_IC_YAML_TMPL="${PARMgfs}/stage/master_gfs.yaml.j2"
Expand Down
16 changes: 16 additions & 0 deletions parm/fetch/gfs_ATM_cold_forecast-only.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{% set cycle_YMDH = current_cycle | to_YMDH %}
{% set cycle_YMD = current_cycle | to_YMD %}
{% set cycle_HH = current_cycle | strftime("%H") %}
{% set atm_dir = RUN + "." ~ cycle_YMD ~ "/" ~ cycle_HH ~ "/model/atmos/input" %}
target:
tarball : "{{ FETCHDIR }}/{{ cycle_YMDH }}/atm_cold.tar"
on_hpss: True
contents:
# ATM
- {{atm_dir}}/gfs_ctrl.nc
{% for ftype in ["gfs_data", "sfc_data"] %}
{% for ntile in range(1, ntiles + 1) %}
- {{atm_dir}}/{{ ftype }}.tile{{ ntile }}.nc
{% endfor %} # ntile
{% endfor %} # ftype
destination: "{{ DATAROOT }}"
37 changes: 37 additions & 0 deletions parm/fetch/gfs_S2SW_cold_forecast-only.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{% set cycle_YMDH = current_cycle | to_YMDH %}
{% set cycle_YMD = current_cycle | to_YMD %}
{% set cycle_HH = current_cycle | strftime("%H") %}
{% set prev_cycle_YMD = previous_cycle | to_YMD %}
{% set prev_cycle_HH = previous_cycle | strftime("%H") %}
# For cold starts, the ATM component is in the current cycle RUN.YYYYMMDD/HH
# For ocean/ice, some files are in the current cyle, some in the previous
# For waves, all files are in the previous cycle
# Previous cycles are always gdas (gdas.YYYYMMDD/HH)
{% set atm_dir = RUN + "." ~ cycle_YMD ~ "/" ~ cycle_HH ~ "/model/atmos/input" %}
{% set ocean_dir = RUN + "." ~ cycle_YMD ~ "/" ~ cycle_HH ~ "/model/ocean/restart" %}
{% set ice_dir = RUN + "." ~ cycle_YMD ~ "/" ~ cycle_HH ~ "/model/ice/restart" %}
{% set prev_ocean_dir = "gdas." ~ prev_cycle_YMD ~ "/" ~ prev_cycle_HH ~ "/model/ocean/restart" %}
{% set prev_ice_dir = "gdas." ~ prev_cycle_YMD ~ "/" ~ prev_cycle_HH ~ "/model/ice/restart" %}
{% set prev_wave_dir = "gdas." ~ prev_cycle_YMD ~ "/" ~ prev_cycle_HH ~ "/model/wave/restart" %}
{% set restart_prefix = cycle_YMD ~ "." ~ cycle_HH ~ "0000" %}
untar:
tarball : "{{ FETCHDIR }}/{{ cycle_YMDH }}/s2sw_cold.tar"
on_hpss: True
contents:
# ATM
- {{atm_dir}}/gfs_ctrl.nc
{% for ftype in ["gfs_data", "sfc_data"] %}
{% for ntile in range(1, ntiles + 1) %}
- {{atm_dir}}/{{ ftype }}.tile{{ ntile }}.nc
{% endfor %} # ntile
{% endfor %} # ftype
# Ocean
- {{ocean_dir}}/{{restart_prefix}}.MOM.res.nc
- {{prev_ocean_dir}}/{{restart_prefix}}.MOM.res.nc
# Ice
- {{ice_dir}}/{{restart_prefix}}.cice_model.res.nc
- {{prev_ice_dir}}/{{restart_prefix}}.cice_model.res.nc
# Wave
- {{prev_wave_dir}}/{{restart_prefix}}.restart.ww3
- {{prev_wave_dir}}/{{restart_prefix}}.restart.{{waveGRD}}
destination: "{{ DATAROOT }}"
39 changes: 39 additions & 0 deletions scripts/exglobal_fetch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#!/usr/bin/env python3

import os

from pygfs.task.fetch import Fetch
from wxflow import AttrDict, Logger, cast_strdict_as_dtypedict, logit

# initialize root logger
logger = Logger(level=os.environ.get("LOGGING_LEVEL", "DEBUG"), colored_log=True)


@logit(logger)
def main():

config = cast_strdict_as_dtypedict(os.environ)

# Instantiate the Fetch object
fetch = Fetch(config)

# Pull out all the configuration keys needed to run the fetch step
keys = ['current_cycle', 'previous_cycle', 'RUN', 'PDY', 'PARMgfs', 'PSLOT', 'ROTDIR',
'FETCH_YAML_TMPL', 'FETCHDIR', 'ntiles', 'DATAROOT', 'waveGRD']

fetch_dict = AttrDict()
for key in keys:
fetch_dict[key] = fetch.task_config.get(key)
if fetch_dict[key] is None:
print(f"Warning: key ({key}) not found in task_config!")

# Determine which archives to retrieve from HPSS
# Read the input YAML file to get the list of tarballs on tape
fetchdir_set = fetch.configure(fetch_dict)

# Pull the data from tape or locally and store the specified destination
fetch.execute_pull_data(fetchdir_set)


if __name__ == '__main__':
main()
1 change: 1 addition & 0 deletions ush/python/pygfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from .task.oceanice_products import OceanIceProducts
from .task.gfs_forecast import GFSForecast
from .utils import marine_da_utils
from .task.fetch import Fetch

__docformat__ = "restructuredtext"
__version__ = "0.1.0"
Expand Down
105 changes: 105 additions & 0 deletions ush/python/pygfs/task/fetch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#!/usr/bin/env python3

import os
from logging import getLogger
from typing import Any, Dict

from wxflow import (Hsi, Task, htar,
logit, parse_j2yaml, chdir)
# import tarfile


logger = getLogger(__name__.split('.')[-1])


class Fetch(Task):
"""Task to pull ROTDIR data from HPSS (or locally)
"""

@logit(logger, name="Fetch")
def __init__(self, config: Dict[str, Any]) -> None:
"""Constructor for the Fetch task
The constructor is responsible for collecting necessary yamls based on
the runtime options and RUN.
Parameters
----------
config : Dict[str, Any]
Incoming configuration for the task from the environment
Returns
-------
None
"""
super().__init__(config)

@logit(logger)
def configure(self, fetch_dict: Dict[str, Any]):
"""Determine which tarballs will need to be extracted
Parameters
----------
fetch_dict : Dict[str, Any]
Task specific keys, e.g. COM directories, etc
Return
------
parsed_fetch: Dict[str, Any]
Dictionary derived from the yaml file with necessary HPSS info.
"""
self.hsi = Hsi()

fetch_yaml = fetch_dict.FETCH_YAML_TMPL
fetch_parm = os.path.join(fetch_dict.PARMgfs, "fetch")

parsed_fetch = parse_j2yaml(os.path.join(fetch_parm, fetch_yaml),
fetch_dict)
return parsed_fetch

@logit(logger)
def execute_pull_data(self, fetchdir_set: Dict[str, Any]) -> None:
"""Pull data from HPSS based on a yaml dictionary and store at the
specified destination.
Parameters
----------
fetchdir_set: Dict[str, Any],
Dict defining set of tarballs to pull and where to put them.
Return
None
"""

f_names = fetchdir_set.target.contents
if len(f_names) <= 0: # Abort if no files
raise FileNotFoundError("FATAL ERROR: The tar ball has no files")

on_hpss = fetchdir_set.target.on_hpss
dest = fetchdir_set.target.destination
tarball = fetchdir_set.targettarball

# Select action whether no_hpss is True or not, and pull these
# data from tape or locally and place where it needs to go
# DG - these need testing
with chdir(dest):
logger.info(f"Changed working directory to {dest}")
if on_hpss is True: # htar all files in fnames
htar_obj = htar.Htar()
htar_obj.xvf(tarball, f_names)
else: # tar all files in fnames
raise NotImplementedError("The fetch job does not yet support pulling from local archives")

# with tarfile.open(dest, "w") as tar:
# for filename in f_names:
# tar.add(filename)
# Verify all data files were extracted
missing_files = []
for f in f_names:
if not os.path.exists(f):
missing_files.append(f)
if len(missing_files) > 0:
message = "Failed to extract all required files. Missing files:\n"
for f in missing_files:
message += f"{f}\n"

raise FileNotFoundError(message)
3 changes: 3 additions & 0 deletions workflow/applications/applications.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ def _get_run_options(self, conf: Configuration) -> Dict[str, Any]:
run_options[run]['do_hpssarch'] = run_base.get('HPSSARCH', False)
run_options[run]['fcst_segments'] = run_base.get('FCST_SEGMENTS', None)

run_options[run]['do_fetch_hpss'] = run_base.get('DO_FETCH_HPSS', False)
run_options[run]['do_fetch_local'] = run_base.get('DO_FETCH_LOCAL', False)

if not AppConfig.is_monotonic(run_options[run]['fcst_segments']):
raise ValueError(f'Forecast segments do not increase monotonically: {",".join(self.fcst_segments)}')

Expand Down
17 changes: 14 additions & 3 deletions workflow/applications/gfs_forecast_only.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@ def _get_app_configs(self, run):
Returns the config_files that are involved in the forecast-only app
"""

configs = []
options = self.run_options[run]
configs = ['stage_ic', 'fcst', 'arch', 'cleanup']
if options['do_fetch_hpss'] or options['do_fetch_local']:
configs += ['fetch']

configs += ['stage_ic', 'fcst', 'arch', 'cleanup']

if options['do_atm']:

Expand Down Expand Up @@ -98,15 +102,22 @@ def get_task_names(self):
This is the place where that order is set.
"""

tasks = ['stage_ic']
options = self.run_options[self.run]

tasks = []

if options['do_fetch_hpss'] or options['do_fetch_local']:
tasks += ['fetch']

tasks += ['stage_ic']

if options['do_aero_fcst'] and not options['exp_warm_start']:
tasks += ['aerosol_init']

if options['do_wave']:
tasks += ['waveinit']
# tasks += ['waveprep'] # TODO - verify if waveprep is executed in forecast-only mode when APP=ATMW|S2SW
# tasks += ['waveprep'] # TODO - verify if waveprep is executed in ...
# ... forecast-only mode when APP=ATMW|S2SW

tasks += ['fcst']

Expand Down
3 changes: 2 additions & 1 deletion workflow/hosts/azurepw.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ MAKE_NSSTBUFR: 'NO'
MAKE_ACFTBUFR: 'NO'
DO_TRACKER: 'NO'
DO_GENESIS: 'NO'
DO_METP: 'NO'
DO_METP: 'NO'
SUPPORTED_RESOLUTIONS: ['C48', 'C96', 'C384', 'C768'] # TODO: Test and support all cubed-sphere resolutions.

Loading

0 comments on commit e58bf3b

Please sign in to comment.