From ee54eac82b6bf46ccbc6958697509fe2be441004 Mon Sep 17 00:00:00 2001 From: DavidGrumm-NOAA Date: Mon, 27 Jan 2025 15:10:51 -0500 Subject: [PATCH] Add fetch job and update stage_ic to work with fetched ICs (#3141) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Most jobs require the initial conditions to be available on local disk. The existing “stage_ic” task copies/stages these initial condition into the experiment's COM directory. This PR for the “fetch” task extends that functionality to copy from HPSS (on HPSS-accessible machines) into COM. Resolves #2988 --------- Co-authored-by: David Huber --- ci/cases/yamls/gfs_defaults_ci.yaml | 4 +- jobs/JGLOBAL_FETCH | 23 ++++ jobs/rocoto/fetch.sh | 18 +++ parm/config/gefs/config.base | 1 + parm/config/gefs/config.fetch | 1 + parm/config/gfs/config.base | 5 + parm/config/gfs/config.fetch | 19 ++++ parm/config/gfs/config.resources | 4 +- parm/config/gfs/config.stage_ic | 7 +- parm/fetch/gfs_ATM_cold_forecast-only.yaml.j2 | 16 +++ .../fetch/gfs_S2SW_cold_forecast-only.yaml.j2 | 37 ++++++ scripts/exglobal_fetch.py | 39 +++++++ ush/python/pygfs/__init__.py | 1 + ush/python/pygfs/task/fetch.py | 105 ++++++++++++++++++ workflow/applications/applications.py | 3 + workflow/applications/gfs_forecast_only.py | 17 ++- workflow/hosts/azurepw.yaml | 3 +- workflow/hosts/s4.yaml | 1 + workflow/rocoto/gfs_tasks.py | 32 +++++- workflow/rocoto/tasks.py | 4 +- 20 files changed, 329 insertions(+), 11 deletions(-) create mode 100755 jobs/JGLOBAL_FETCH create mode 100755 jobs/rocoto/fetch.sh create mode 120000 parm/config/gefs/config.fetch create mode 100644 parm/config/gfs/config.fetch create mode 100755 parm/fetch/gfs_ATM_cold_forecast-only.yaml.j2 create mode 100755 parm/fetch/gfs_S2SW_cold_forecast-only.yaml.j2 create mode 100755 scripts/exglobal_fetch.py create mode 100755 ush/python/pygfs/task/fetch.py diff --git a/ci/cases/yamls/gfs_defaults_ci.yaml b/ci/cases/yamls/gfs_defaults_ci.yaml index 91377826d7..9fcfa7d604 100644 --- a/ci/cases/yamls/gfs_defaults_ci.yaml +++ b/ci/cases/yamls/gfs_defaults_ci.yaml @@ -2,5 +2,7 @@ defaults: !INC {{ HOMEgfs }}/parm/config/gfs/yaml/defaults.yaml base: ACCOUNT: {{ 'HPC_ACCOUNT' | getenv }} - DO_TEST_MODE: "NO" + DO_TEST_MODE: "YES" + FETCHDIR: "/NCEPDEV/emc-global/1year/David.Grumm/test_data" DO_METP: "NO" + diff --git a/jobs/JGLOBAL_FETCH b/jobs/JGLOBAL_FETCH new file mode 100755 index 0000000000..e10049169a --- /dev/null +++ b/jobs/JGLOBAL_FETCH @@ -0,0 +1,23 @@ +#! /usr/bin/env bash + +source "${HOMEgfs}/ush/preamble.sh" +source "${HOMEgfs}/ush/jjob_header.sh" -e "fetch" -c "base fetch" + +# Execute fetching +"${SCRgfs}/exglobal_fetch.py" +err=$? + +############################################################### +# Check for errors and exit if any of the above failed +if [[ "${err}" -ne 0 ]]; then + echo "FATAL ERROR: Unable to fetch ICs to ${ROTDIR}; ABORT!" + exit "${err}" +fi + +########################################## +# Remove the Temporary working directory +########################################## +cd "${DATAROOT}" || (echo "${DATAROOT} does not exist. ABORT!"; exit 1) +[[ ${KEEPDATA} = "NO" ]] && rm -rf "${DATA}" + +exit 0 diff --git a/jobs/rocoto/fetch.sh b/jobs/rocoto/fetch.sh new file mode 100755 index 0000000000..ee34f6bd92 --- /dev/null +++ b/jobs/rocoto/fetch.sh @@ -0,0 +1,18 @@ +#! /usr/bin/env bash + +source "${HOMEgfs}/ush/preamble.sh" + +# Source FV3GFS workflow modules +. "${HOMEgfs}/ush/load_fv3gfs_modules.sh" +status=$? +[[ "${status}" -ne 0 ]] && exit "${status}" + +export job="fetch" +export jobid="${job}.$$" + +# Execute the JJOB +"${HOMEgfs}/jobs/JGLOBAL_FETCH" +status=$? + + +exit "${status}" diff --git a/parm/config/gefs/config.base b/parm/config/gefs/config.base index fc641fef6b..400be7eb17 100644 --- a/parm/config/gefs/config.base +++ b/parm/config/gefs/config.base @@ -103,6 +103,7 @@ export ROTDIR="@COMROOT@/${PSLOT}" export ARCDIR="${NOSCRUB}/archive/${PSLOT}" export ATARDIR="@ATARDIR@" +export FETCHDIR="@FETCHDIR@" # HPSS or local directory where IC tarball(s) can be found. # Commonly defined parameters in JJOBS export envir=${envir:-"prod"} diff --git a/parm/config/gefs/config.fetch b/parm/config/gefs/config.fetch new file mode 120000 index 0000000000..21e571a2fd --- /dev/null +++ b/parm/config/gefs/config.fetch @@ -0,0 +1 @@ +../gfs/config.fetch \ No newline at end of file diff --git a/parm/config/gfs/config.base b/parm/config/gfs/config.base index fbbe5c782f..96954b4acb 100644 --- a/parm/config/gfs/config.base +++ b/parm/config/gfs/config.base @@ -132,6 +132,7 @@ if [[ "${PDY}${cyc}" -ge "2019092100" && "${PDY}${cyc}" -le "2019110700" ]]; the fi export ARCDIR="${NOSCRUB}/archive/${PSLOT}" export ATARDIR="@ATARDIR@" +export FETCHDIR="@FETCHDIR@" # Commonly defined parameters in JJOBS export envir=${envir:-"prod"} @@ -474,6 +475,10 @@ export DO_VRFY_OCEANDA="@DO_VRFY_OCEANDA@" # Run SOCA Ocean and Seaice DA verif export FHMAX_FITS=132 [[ "${FHMAX_FITS}" -gt "${FHMAX_GFS}" ]] && export FHMAX_FITS=${FHMAX_GFS} +# User may choose to reset these at experiment setup time +export DO_FETCH_HPSS="NO" # Copy from HPSS (on HPSS-accessible machines) onto COM +export DO_FETCH_LOCAL="NO" # Copy from local disk onto COM + # Archiving options export HPSSARCH="@HPSSARCH@" # save data to HPSS archive export LOCALARCH="@LOCALARCH@" # save data to local archive diff --git a/parm/config/gfs/config.fetch b/parm/config/gfs/config.fetch new file mode 100644 index 0000000000..86ab5e3e2f --- /dev/null +++ b/parm/config/gfs/config.fetch @@ -0,0 +1,19 @@ +#! /usr/bin/env bash + +########## config.fetch ########## + +echo "BEGIN: config.fetch" + +# Get task specific resources +source "${EXPDIR}/config.resources" fetch + +# Determine start type +if [[ "${EXP_WARM_START}" == ".false." ]]; then + ic_type="cold" +else + ic_type="warm" +fi + +export FETCH_YAML_TMPL="${PARMgfs}/fetch/${NET}_${APP}_${ic_type}_${MODE}.yaml.j2" + +echo "END: config.fetch" diff --git a/parm/config/gfs/config.resources b/parm/config/gfs/config.resources index ec09b6d455..06acc4e36e 100644 --- a/parm/config/gfs/config.resources +++ b/parm/config/gfs/config.resources @@ -11,7 +11,7 @@ if (( $# != 1 )); then echo "Must specify an input task argument to set resource variables!" echo "argument can be any one of the following:" - echo "stage_ic aerosol_init" + echo "stage_ic aerosol_init fetch" echo "prep prepatmiodaobs" echo "atmanlinit atmanlvar atmanlfv3inc atmanlfinal" echo "atmensanlinit atmensanlobs atmensanlsol atmensanlletkf atmensanlfv3inc atmensanlfinal" @@ -1059,7 +1059,7 @@ case ${step} in export is_exclusive=True ;; - "arch" | "earc" | "getic") + "arch" | "earc" | "getic" | "fetch") walltime="06:00:00" ntasks=1 tasks_per_node=1 diff --git a/parm/config/gfs/config.stage_ic b/parm/config/gfs/config.stage_ic index bb12cd3ba1..3fb0663cb7 100644 --- a/parm/config/gfs/config.stage_ic +++ b/parm/config/gfs/config.stage_ic @@ -7,7 +7,12 @@ echo "BEGIN: config.stage_ic" # Get task specific resources source "${EXPDIR}/config.resources" stage_ic -export ICSDIR="@ICSDIR@" # User provided ICSDIR; blank if not provided +if [[ "${DO_FETCH_HPSS^^}" =~ "Y" || "${DO_FETCH_LOCAL^^}" =~ "Y" ]]; then + export ICSDIR="${DATAROOT}" # fetch untars data into DATAROOT +else + export ICSDIR="@ICSDIR@" # User provided ICSDIR; blank if not provided +fi + export BASE_IC="@BASE_IC@" # Platform home for staged ICs export STAGE_IC_YAML_TMPL="${PARMgfs}/stage/master_gfs.yaml.j2" diff --git a/parm/fetch/gfs_ATM_cold_forecast-only.yaml.j2 b/parm/fetch/gfs_ATM_cold_forecast-only.yaml.j2 new file mode 100755 index 0000000000..927527c760 --- /dev/null +++ b/parm/fetch/gfs_ATM_cold_forecast-only.yaml.j2 @@ -0,0 +1,16 @@ +{% set cycle_YMDH = current_cycle | to_YMDH %} +{% set cycle_YMD = current_cycle | to_YMD %} +{% set cycle_HH = current_cycle | strftime("%H") %} +{% set atm_dir = RUN + "." ~ cycle_YMD ~ "/" ~ cycle_HH ~ "/model/atmos/input" %} +target: + tarball : "{{ FETCHDIR }}/{{ cycle_YMDH }}/atm_cold.tar" + on_hpss: True + contents: + # ATM + - {{atm_dir}}/gfs_ctrl.nc + {% for ftype in ["gfs_data", "sfc_data"] %} + {% for ntile in range(1, ntiles + 1) %} + - {{atm_dir}}/{{ ftype }}.tile{{ ntile }}.nc + {% endfor %} # ntile + {% endfor %} # ftype + destination: "{{ DATAROOT }}" diff --git a/parm/fetch/gfs_S2SW_cold_forecast-only.yaml.j2 b/parm/fetch/gfs_S2SW_cold_forecast-only.yaml.j2 new file mode 100755 index 0000000000..2588b85117 --- /dev/null +++ b/parm/fetch/gfs_S2SW_cold_forecast-only.yaml.j2 @@ -0,0 +1,37 @@ +{% set cycle_YMDH = current_cycle | to_YMDH %} +{% set cycle_YMD = current_cycle | to_YMD %} +{% set cycle_HH = current_cycle | strftime("%H") %} +{% set prev_cycle_YMD = previous_cycle | to_YMD %} +{% set prev_cycle_HH = previous_cycle | strftime("%H") %} +# For cold starts, the ATM component is in the current cycle RUN.YYYYMMDD/HH +# For ocean/ice, some files are in the current cyle, some in the previous +# For waves, all files are in the previous cycle +# Previous cycles are always gdas (gdas.YYYYMMDD/HH) +{% set atm_dir = RUN + "." ~ cycle_YMD ~ "/" ~ cycle_HH ~ "/model/atmos/input" %} +{% set ocean_dir = RUN + "." ~ cycle_YMD ~ "/" ~ cycle_HH ~ "/model/ocean/restart" %} +{% set ice_dir = RUN + "." ~ cycle_YMD ~ "/" ~ cycle_HH ~ "/model/ice/restart" %} +{% set prev_ocean_dir = "gdas." ~ prev_cycle_YMD ~ "/" ~ prev_cycle_HH ~ "/model/ocean/restart" %} +{% set prev_ice_dir = "gdas." ~ prev_cycle_YMD ~ "/" ~ prev_cycle_HH ~ "/model/ice/restart" %} +{% set prev_wave_dir = "gdas." ~ prev_cycle_YMD ~ "/" ~ prev_cycle_HH ~ "/model/wave/restart" %} +{% set restart_prefix = cycle_YMD ~ "." ~ cycle_HH ~ "0000" %} +untar: + tarball : "{{ FETCHDIR }}/{{ cycle_YMDH }}/s2sw_cold.tar" + on_hpss: True + contents: + # ATM + - {{atm_dir}}/gfs_ctrl.nc + {% for ftype in ["gfs_data", "sfc_data"] %} + {% for ntile in range(1, ntiles + 1) %} + - {{atm_dir}}/{{ ftype }}.tile{{ ntile }}.nc + {% endfor %} # ntile + {% endfor %} # ftype + # Ocean + - {{ocean_dir}}/{{restart_prefix}}.MOM.res.nc + - {{prev_ocean_dir}}/{{restart_prefix}}.MOM.res.nc + # Ice + - {{ice_dir}}/{{restart_prefix}}.cice_model.res.nc + - {{prev_ice_dir}}/{{restart_prefix}}.cice_model.res.nc + # Wave + - {{prev_wave_dir}}/{{restart_prefix}}.restart.ww3 + - {{prev_wave_dir}}/{{restart_prefix}}.restart.{{waveGRD}} + destination: "{{ DATAROOT }}" diff --git a/scripts/exglobal_fetch.py b/scripts/exglobal_fetch.py new file mode 100755 index 0000000000..d9efe24d7f --- /dev/null +++ b/scripts/exglobal_fetch.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 + +import os + +from pygfs.task.fetch import Fetch +from wxflow import AttrDict, Logger, cast_strdict_as_dtypedict, logit + +# initialize root logger +logger = Logger(level=os.environ.get("LOGGING_LEVEL", "DEBUG"), colored_log=True) + + +@logit(logger) +def main(): + + config = cast_strdict_as_dtypedict(os.environ) + + # Instantiate the Fetch object + fetch = Fetch(config) + + # Pull out all the configuration keys needed to run the fetch step + keys = ['current_cycle', 'previous_cycle', 'RUN', 'PDY', 'PARMgfs', 'PSLOT', 'ROTDIR', + 'FETCH_YAML_TMPL', 'FETCHDIR', 'ntiles', 'DATAROOT', 'waveGRD'] + + fetch_dict = AttrDict() + for key in keys: + fetch_dict[key] = fetch.task_config.get(key) + if fetch_dict[key] is None: + print(f"Warning: key ({key}) not found in task_config!") + + # Determine which archives to retrieve from HPSS + # Read the input YAML file to get the list of tarballs on tape + fetchdir_set = fetch.configure(fetch_dict) + + # Pull the data from tape or locally and store the specified destination + fetch.execute_pull_data(fetchdir_set) + + +if __name__ == '__main__': + main() diff --git a/ush/python/pygfs/__init__.py b/ush/python/pygfs/__init__.py index 6b7b7eb4c9..caa3929f3e 100644 --- a/ush/python/pygfs/__init__.py +++ b/ush/python/pygfs/__init__.py @@ -14,6 +14,7 @@ from .task.oceanice_products import OceanIceProducts from .task.gfs_forecast import GFSForecast from .utils import marine_da_utils +from .task.fetch import Fetch __docformat__ = "restructuredtext" __version__ = "0.1.0" diff --git a/ush/python/pygfs/task/fetch.py b/ush/python/pygfs/task/fetch.py new file mode 100755 index 0000000000..19db8746c1 --- /dev/null +++ b/ush/python/pygfs/task/fetch.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python3 + +import os +from logging import getLogger +from typing import Any, Dict + +from wxflow import (Hsi, Task, htar, + logit, parse_j2yaml, chdir) +# import tarfile + + +logger = getLogger(__name__.split('.')[-1]) + + +class Fetch(Task): + """Task to pull ROTDIR data from HPSS (or locally) + """ + + @logit(logger, name="Fetch") + def __init__(self, config: Dict[str, Any]) -> None: + """Constructor for the Fetch task + The constructor is responsible for collecting necessary yamls based on + the runtime options and RUN. + + Parameters + ---------- + config : Dict[str, Any] + Incoming configuration for the task from the environment + + Returns + ------- + None + """ + super().__init__(config) + + @logit(logger) + def configure(self, fetch_dict: Dict[str, Any]): + """Determine which tarballs will need to be extracted + + Parameters + ---------- + fetch_dict : Dict[str, Any] + Task specific keys, e.g. COM directories, etc + + Return + ------ + parsed_fetch: Dict[str, Any] + Dictionary derived from the yaml file with necessary HPSS info. + """ + self.hsi = Hsi() + + fetch_yaml = fetch_dict.FETCH_YAML_TMPL + fetch_parm = os.path.join(fetch_dict.PARMgfs, "fetch") + + parsed_fetch = parse_j2yaml(os.path.join(fetch_parm, fetch_yaml), + fetch_dict) + return parsed_fetch + + @logit(logger) + def execute_pull_data(self, fetchdir_set: Dict[str, Any]) -> None: + """Pull data from HPSS based on a yaml dictionary and store at the + specified destination. + + Parameters + ---------- + fetchdir_set: Dict[str, Any], + Dict defining set of tarballs to pull and where to put them. + + Return + None + """ + + f_names = fetchdir_set.target.contents + if len(f_names) <= 0: # Abort if no files + raise FileNotFoundError("FATAL ERROR: The tar ball has no files") + + on_hpss = fetchdir_set.target.on_hpss + dest = fetchdir_set.target.destination + tarball = fetchdir_set.targettarball + + # Select action whether no_hpss is True or not, and pull these + # data from tape or locally and place where it needs to go + # DG - these need testing + with chdir(dest): + logger.info(f"Changed working directory to {dest}") + if on_hpss is True: # htar all files in fnames + htar_obj = htar.Htar() + htar_obj.xvf(tarball, f_names) + else: # tar all files in fnames + raise NotImplementedError("The fetch job does not yet support pulling from local archives") + +# with tarfile.open(dest, "w") as tar: +# for filename in f_names: +# tar.add(filename) + # Verify all data files were extracted + missing_files = [] + for f in f_names: + if not os.path.exists(f): + missing_files.append(f) + if len(missing_files) > 0: + message = "Failed to extract all required files. Missing files:\n" + for f in missing_files: + message += f"{f}\n" + + raise FileNotFoundError(message) diff --git a/workflow/applications/applications.py b/workflow/applications/applications.py index 22e299df20..88be488b47 100644 --- a/workflow/applications/applications.py +++ b/workflow/applications/applications.py @@ -102,6 +102,9 @@ def _get_run_options(self, conf: Configuration) -> Dict[str, Any]: run_options[run]['do_hpssarch'] = run_base.get('HPSSARCH', False) run_options[run]['fcst_segments'] = run_base.get('FCST_SEGMENTS', None) + run_options[run]['do_fetch_hpss'] = run_base.get('DO_FETCH_HPSS', False) + run_options[run]['do_fetch_local'] = run_base.get('DO_FETCH_LOCAL', False) + if not AppConfig.is_monotonic(run_options[run]['fcst_segments']): raise ValueError(f'Forecast segments do not increase monotonically: {",".join(self.fcst_segments)}') diff --git a/workflow/applications/gfs_forecast_only.py b/workflow/applications/gfs_forecast_only.py index de1c8cef27..7409d4adec 100644 --- a/workflow/applications/gfs_forecast_only.py +++ b/workflow/applications/gfs_forecast_only.py @@ -28,8 +28,12 @@ def _get_app_configs(self, run): Returns the config_files that are involved in the forecast-only app """ + configs = [] options = self.run_options[run] - configs = ['stage_ic', 'fcst', 'arch', 'cleanup'] + if options['do_fetch_hpss'] or options['do_fetch_local']: + configs += ['fetch'] + + configs += ['stage_ic', 'fcst', 'arch', 'cleanup'] if options['do_atm']: @@ -98,15 +102,22 @@ def get_task_names(self): This is the place where that order is set. """ - tasks = ['stage_ic'] options = self.run_options[self.run] + tasks = [] + + if options['do_fetch_hpss'] or options['do_fetch_local']: + tasks += ['fetch'] + + tasks += ['stage_ic'] + if options['do_aero_fcst'] and not options['exp_warm_start']: tasks += ['aerosol_init'] if options['do_wave']: tasks += ['waveinit'] - # tasks += ['waveprep'] # TODO - verify if waveprep is executed in forecast-only mode when APP=ATMW|S2SW + # tasks += ['waveprep'] # TODO - verify if waveprep is executed in ... + # ... forecast-only mode when APP=ATMW|S2SW tasks += ['fcst'] diff --git a/workflow/hosts/azurepw.yaml b/workflow/hosts/azurepw.yaml index d7c064dc60..abab09b414 100644 --- a/workflow/hosts/azurepw.yaml +++ b/workflow/hosts/azurepw.yaml @@ -26,5 +26,6 @@ MAKE_NSSTBUFR: 'NO' MAKE_ACFTBUFR: 'NO' DO_TRACKER: 'NO' DO_GENESIS: 'NO' -DO_METP: 'NO' +DO_METP: 'NO' SUPPORTED_RESOLUTIONS: ['C48', 'C96', 'C384', 'C768'] # TODO: Test and support all cubed-sphere resolutions. + diff --git a/workflow/hosts/s4.yaml b/workflow/hosts/s4.yaml index 2e77c112b1..bd7f7d0b5f 100644 --- a/workflow/hosts/s4.yaml +++ b/workflow/hosts/s4.yaml @@ -26,3 +26,4 @@ MAKE_NSSTBUFR: 'YES' MAKE_ACFTBUFR: 'YES' SUPPORTED_RESOLUTIONS: ['C384', 'C192', 'C96', 'C48'] AERO_INPUTS_DIR: /data/prod/glopara/gocart_emissions + diff --git a/workflow/rocoto/gfs_tasks.py b/workflow/rocoto/gfs_tasks.py index bacd4ebf95..2b2bd30b81 100644 --- a/workflow/rocoto/gfs_tasks.py +++ b/workflow/rocoto/gfs_tasks.py @@ -16,8 +16,37 @@ def _is_this_a_gdas_task(run, task_name): raise TypeError(f'{task_name} must be part of the "enkfgdas" cycle and not {run}') # Specific Tasks begin here + def fetch(self): + + cycledef = 'gdas_half' if self.run in ['gdas', 'enkfgdas'] else self.run + + resources = self.get_resource('fetch') + task_name = f'{self.run}_fetch' + task_dict = {'task_name': task_name, + 'resources': resources, + 'envars': self.envars, + 'cycledef': cycledef, + 'command': f'{self.HOMEgfs}/jobs/rocoto/fetch.sh', + 'job_name': f'{self.pslot}_{task_name}_@H', + 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', + 'maxtries': '&MAXTRIES;' + } + + task = rocoto.create_task(task_dict) + + return task + def stage_ic(self): + dependencies = None + if self.options['do_fetch_hpss'] or self.options['do_fetch_local']: + deps = [] + dep_dict = { + 'type': 'task', 'name': f'{self.run}_fetch', + } + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps) + cycledef = 'gdas_half' if self.run in ['gdas', 'enkfgdas'] else self.run resources = self.get_resource('stage_ic') @@ -29,7 +58,8 @@ def stage_ic(self): 'command': f'{self.HOMEgfs}/jobs/rocoto/stage_ic.sh', 'job_name': f'{self.pslot}_{task_name}_@H', 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', - 'maxtries': '&MAXTRIES;' + 'maxtries': '&MAXTRIES;', + 'dependency': dependencies } task = rocoto.create_task(task_dict) diff --git a/workflow/rocoto/tasks.py b/workflow/rocoto/tasks.py index 3c215414b5..c491f26800 100644 --- a/workflow/rocoto/tasks.py +++ b/workflow/rocoto/tasks.py @@ -11,8 +11,8 @@ class Tasks: - SERVICE_TASKS = ['arch', 'earc', 'stage_ic', 'cleanup'] - VALID_TASKS = ['aerosol_init', 'stage_ic', + SERVICE_TASKS = ['arch', 'earc', 'stage_ic', 'fetch', 'cleanup'] + VALID_TASKS = ['aerosol_init', 'stage_ic', 'fetch', 'prep', 'anal', 'sfcanl', 'analcalc', 'analdiag', 'arch', "cleanup", 'prepatmiodaobs', 'atmanlinit', 'atmanlvar', 'atmanlfv3inc', 'atmanlfinal', 'prepoceanobs',