-
Notifications
You must be signed in to change notification settings - Fork 180
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add fetch job and update stage_ic to work with fetched ICs (#3141)
Most jobs require the initial conditions to be available on local disk. The existing “stage_ic” task copies/stages these initial condition into the experiment's COM directory. This PR for the “fetch” task extends that functionality to copy from HPSS (on HPSS-accessible machines) into COM. Resolves #2988 --------- Co-authored-by: David Huber <[email protected]>
- Loading branch information
1 parent
76cecfb
commit ee54eac
Showing
20 changed files
with
329 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
#! /usr/bin/env bash | ||
|
||
source "${HOMEgfs}/ush/preamble.sh" | ||
source "${HOMEgfs}/ush/jjob_header.sh" -e "fetch" -c "base fetch" | ||
|
||
# Execute fetching | ||
"${SCRgfs}/exglobal_fetch.py" | ||
err=$? | ||
|
||
############################################################### | ||
# Check for errors and exit if any of the above failed | ||
if [[ "${err}" -ne 0 ]]; then | ||
echo "FATAL ERROR: Unable to fetch ICs to ${ROTDIR}; ABORT!" | ||
exit "${err}" | ||
fi | ||
|
||
########################################## | ||
# Remove the Temporary working directory | ||
########################################## | ||
cd "${DATAROOT}" || (echo "${DATAROOT} does not exist. ABORT!"; exit 1) | ||
[[ ${KEEPDATA} = "NO" ]] && rm -rf "${DATA}" | ||
|
||
exit 0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
#! /usr/bin/env bash | ||
|
||
source "${HOMEgfs}/ush/preamble.sh" | ||
|
||
# Source FV3GFS workflow modules | ||
. "${HOMEgfs}/ush/load_fv3gfs_modules.sh" | ||
status=$? | ||
[[ "${status}" -ne 0 ]] && exit "${status}" | ||
|
||
export job="fetch" | ||
export jobid="${job}.$$" | ||
|
||
# Execute the JJOB | ||
"${HOMEgfs}/jobs/JGLOBAL_FETCH" | ||
status=$? | ||
|
||
|
||
exit "${status}" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
../gfs/config.fetch |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
#! /usr/bin/env bash | ||
|
||
########## config.fetch ########## | ||
|
||
echo "BEGIN: config.fetch" | ||
|
||
# Get task specific resources | ||
source "${EXPDIR}/config.resources" fetch | ||
|
||
# Determine start type | ||
if [[ "${EXP_WARM_START}" == ".false." ]]; then | ||
ic_type="cold" | ||
else | ||
ic_type="warm" | ||
fi | ||
|
||
export FETCH_YAML_TMPL="${PARMgfs}/fetch/${NET}_${APP}_${ic_type}_${MODE}.yaml.j2" | ||
|
||
echo "END: config.fetch" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
{% set cycle_YMDH = current_cycle | to_YMDH %} | ||
{% set cycle_YMD = current_cycle | to_YMD %} | ||
{% set cycle_HH = current_cycle | strftime("%H") %} | ||
{% set atm_dir = RUN + "." ~ cycle_YMD ~ "/" ~ cycle_HH ~ "/model/atmos/input" %} | ||
target: | ||
tarball : "{{ FETCHDIR }}/{{ cycle_YMDH }}/atm_cold.tar" | ||
on_hpss: True | ||
contents: | ||
# ATM | ||
- {{atm_dir}}/gfs_ctrl.nc | ||
{% for ftype in ["gfs_data", "sfc_data"] %} | ||
{% for ntile in range(1, ntiles + 1) %} | ||
- {{atm_dir}}/{{ ftype }}.tile{{ ntile }}.nc | ||
{% endfor %} # ntile | ||
{% endfor %} # ftype | ||
destination: "{{ DATAROOT }}" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
{% set cycle_YMDH = current_cycle | to_YMDH %} | ||
{% set cycle_YMD = current_cycle | to_YMD %} | ||
{% set cycle_HH = current_cycle | strftime("%H") %} | ||
{% set prev_cycle_YMD = previous_cycle | to_YMD %} | ||
{% set prev_cycle_HH = previous_cycle | strftime("%H") %} | ||
# For cold starts, the ATM component is in the current cycle RUN.YYYYMMDD/HH | ||
# For ocean/ice, some files are in the current cyle, some in the previous | ||
# For waves, all files are in the previous cycle | ||
# Previous cycles are always gdas (gdas.YYYYMMDD/HH) | ||
{% set atm_dir = RUN + "." ~ cycle_YMD ~ "/" ~ cycle_HH ~ "/model/atmos/input" %} | ||
{% set ocean_dir = RUN + "." ~ cycle_YMD ~ "/" ~ cycle_HH ~ "/model/ocean/restart" %} | ||
{% set ice_dir = RUN + "." ~ cycle_YMD ~ "/" ~ cycle_HH ~ "/model/ice/restart" %} | ||
{% set prev_ocean_dir = "gdas." ~ prev_cycle_YMD ~ "/" ~ prev_cycle_HH ~ "/model/ocean/restart" %} | ||
{% set prev_ice_dir = "gdas." ~ prev_cycle_YMD ~ "/" ~ prev_cycle_HH ~ "/model/ice/restart" %} | ||
{% set prev_wave_dir = "gdas." ~ prev_cycle_YMD ~ "/" ~ prev_cycle_HH ~ "/model/wave/restart" %} | ||
{% set restart_prefix = cycle_YMD ~ "." ~ cycle_HH ~ "0000" %} | ||
untar: | ||
tarball : "{{ FETCHDIR }}/{{ cycle_YMDH }}/s2sw_cold.tar" | ||
on_hpss: True | ||
contents: | ||
# ATM | ||
- {{atm_dir}}/gfs_ctrl.nc | ||
{% for ftype in ["gfs_data", "sfc_data"] %} | ||
{% for ntile in range(1, ntiles + 1) %} | ||
- {{atm_dir}}/{{ ftype }}.tile{{ ntile }}.nc | ||
{% endfor %} # ntile | ||
{% endfor %} # ftype | ||
# Ocean | ||
- {{ocean_dir}}/{{restart_prefix}}.MOM.res.nc | ||
- {{prev_ocean_dir}}/{{restart_prefix}}.MOM.res.nc | ||
# Ice | ||
- {{ice_dir}}/{{restart_prefix}}.cice_model.res.nc | ||
- {{prev_ice_dir}}/{{restart_prefix}}.cice_model.res.nc | ||
# Wave | ||
- {{prev_wave_dir}}/{{restart_prefix}}.restart.ww3 | ||
- {{prev_wave_dir}}/{{restart_prefix}}.restart.{{waveGRD}} | ||
destination: "{{ DATAROOT }}" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
#!/usr/bin/env python3 | ||
|
||
import os | ||
|
||
from pygfs.task.fetch import Fetch | ||
from wxflow import AttrDict, Logger, cast_strdict_as_dtypedict, logit | ||
|
||
# initialize root logger | ||
logger = Logger(level=os.environ.get("LOGGING_LEVEL", "DEBUG"), colored_log=True) | ||
|
||
|
||
@logit(logger) | ||
def main(): | ||
|
||
config = cast_strdict_as_dtypedict(os.environ) | ||
|
||
# Instantiate the Fetch object | ||
fetch = Fetch(config) | ||
|
||
# Pull out all the configuration keys needed to run the fetch step | ||
keys = ['current_cycle', 'previous_cycle', 'RUN', 'PDY', 'PARMgfs', 'PSLOT', 'ROTDIR', | ||
'FETCH_YAML_TMPL', 'FETCHDIR', 'ntiles', 'DATAROOT', 'waveGRD'] | ||
|
||
fetch_dict = AttrDict() | ||
for key in keys: | ||
fetch_dict[key] = fetch.task_config.get(key) | ||
if fetch_dict[key] is None: | ||
print(f"Warning: key ({key}) not found in task_config!") | ||
|
||
# Determine which archives to retrieve from HPSS | ||
# Read the input YAML file to get the list of tarballs on tape | ||
fetchdir_set = fetch.configure(fetch_dict) | ||
|
||
# Pull the data from tape or locally and store the specified destination | ||
fetch.execute_pull_data(fetchdir_set) | ||
|
||
|
||
if __name__ == '__main__': | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
#!/usr/bin/env python3 | ||
|
||
import os | ||
from logging import getLogger | ||
from typing import Any, Dict | ||
|
||
from wxflow import (Hsi, Task, htar, | ||
logit, parse_j2yaml, chdir) | ||
# import tarfile | ||
|
||
|
||
logger = getLogger(__name__.split('.')[-1]) | ||
|
||
|
||
class Fetch(Task): | ||
"""Task to pull ROTDIR data from HPSS (or locally) | ||
""" | ||
|
||
@logit(logger, name="Fetch") | ||
def __init__(self, config: Dict[str, Any]) -> None: | ||
"""Constructor for the Fetch task | ||
The constructor is responsible for collecting necessary yamls based on | ||
the runtime options and RUN. | ||
Parameters | ||
---------- | ||
config : Dict[str, Any] | ||
Incoming configuration for the task from the environment | ||
Returns | ||
------- | ||
None | ||
""" | ||
super().__init__(config) | ||
|
||
@logit(logger) | ||
def configure(self, fetch_dict: Dict[str, Any]): | ||
"""Determine which tarballs will need to be extracted | ||
Parameters | ||
---------- | ||
fetch_dict : Dict[str, Any] | ||
Task specific keys, e.g. COM directories, etc | ||
Return | ||
------ | ||
parsed_fetch: Dict[str, Any] | ||
Dictionary derived from the yaml file with necessary HPSS info. | ||
""" | ||
self.hsi = Hsi() | ||
|
||
fetch_yaml = fetch_dict.FETCH_YAML_TMPL | ||
fetch_parm = os.path.join(fetch_dict.PARMgfs, "fetch") | ||
|
||
parsed_fetch = parse_j2yaml(os.path.join(fetch_parm, fetch_yaml), | ||
fetch_dict) | ||
return parsed_fetch | ||
|
||
@logit(logger) | ||
def execute_pull_data(self, fetchdir_set: Dict[str, Any]) -> None: | ||
"""Pull data from HPSS based on a yaml dictionary and store at the | ||
specified destination. | ||
Parameters | ||
---------- | ||
fetchdir_set: Dict[str, Any], | ||
Dict defining set of tarballs to pull and where to put them. | ||
Return | ||
None | ||
""" | ||
|
||
f_names = fetchdir_set.target.contents | ||
if len(f_names) <= 0: # Abort if no files | ||
raise FileNotFoundError("FATAL ERROR: The tar ball has no files") | ||
|
||
on_hpss = fetchdir_set.target.on_hpss | ||
dest = fetchdir_set.target.destination | ||
tarball = fetchdir_set.targettarball | ||
|
||
# Select action whether no_hpss is True or not, and pull these | ||
# data from tape or locally and place where it needs to go | ||
# DG - these need testing | ||
with chdir(dest): | ||
logger.info(f"Changed working directory to {dest}") | ||
if on_hpss is True: # htar all files in fnames | ||
htar_obj = htar.Htar() | ||
htar_obj.xvf(tarball, f_names) | ||
else: # tar all files in fnames | ||
raise NotImplementedError("The fetch job does not yet support pulling from local archives") | ||
|
||
# with tarfile.open(dest, "w") as tar: | ||
# for filename in f_names: | ||
# tar.add(filename) | ||
# Verify all data files were extracted | ||
missing_files = [] | ||
for f in f_names: | ||
if not os.path.exists(f): | ||
missing_files.append(f) | ||
if len(missing_files) > 0: | ||
message = "Failed to extract all required files. Missing files:\n" | ||
for f in missing_files: | ||
message += f"{f}\n" | ||
|
||
raise FileNotFoundError(message) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.