diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index b0b51922c5..81e387d361 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -76,7 +76,8 @@ scripts/exgdas_atmos_gempak_gif_ncdc.sh @GwenChen-NOAA scripts/exgdas_atmos_nawips.sh @GwenChen-NOAA scripts/exgdas_atmos_verfozn.sh @EdwardSafford-NOAA scripts/exgdas_atmos_verfrad.sh @EdwardSafford-NOAA -scripts/exgdas_enkf_earc.py @DavidHuber-NOAA +scripts/exgdas_enkf_earc_vrfy.py @DavidHuber-NOAA +scripts/exgdas_enkf_earc_tars.py @DavidHuber-NOAA scripts/exgdas_enkf_ecen.sh @CoryMartin-NOAA @RussTreadon-NOAA @CatherineThomas-NOAA scripts/exgdas_enkf_post.sh @CoryMartin-NOAA @RussTreadon-NOAA @CatherineThomas-NOAA scripts/exgdas_enkf_select_obs.sh @CoryMartin-NOAA @RussTreadon-NOAA @CatherineThomas-NOAA diff --git a/docs/source/jobs.rst b/docs/source/jobs.rst index 2cdecb01de..c5f45b9659 100644 --- a/docs/source/jobs.rst +++ b/docs/source/jobs.rst @@ -18,7 +18,7 @@ An experimental run is different from operations in the following ways: * Addition steps in experimental mode: - - archive (arch) + - archive (arch_vrfy & arch_tars) - cleanup (cleanup) @@ -39,7 +39,9 @@ Jobs in the GFS Configuration | analdiag | Creates netCDF diagnostic files containing observation values, innovation (O-F), error, quality control, as well as | | | other analysis-related quantities (cnvstat, radstat, ozstat files). | +-------------------+-----------------------------------------------------------------------------------------------------------------------+ -| arch | Archives select files from the deterministic model and cleans up older data. | +|arch_tars | Optional archive job that backs up the COM data structure. | ++-------------------+-----------------------------------------------------------------------------------------------------------------------+ +|arch_vrfy | Archives select files from the deterministic model and cleans up older data. | +-------------------+-----------------------------------------------------------------------------------------------------------------------+ | earcN/eamn | Archival script for EnKF: 1) Write select EnKF output to HPSS; 2) Copy select files to online archive; 3) Clean up | | | EnKF temporary run directories; 4) Remove "old" EnKF files from rotating directory. | diff --git a/jobs/JGDAS_ENKF_ARCHIVE b/jobs/JGDAS_ENKF_ARCHIVE_TARS similarity index 91% rename from jobs/JGDAS_ENKF_ARCHIVE rename to jobs/JGDAS_ENKF_ARCHIVE_TARS index 021c454afc..04fe8e3141 100755 --- a/jobs/JGDAS_ENKF_ARCHIVE +++ b/jobs/JGDAS_ENKF_ARCHIVE_TARS @@ -1,7 +1,7 @@ #! /usr/bin/env bash source "${HOMEgfs}/ush/preamble.sh" -source "${HOMEgfs}/ush/jjob_header.sh" -e "earc" -c "base earc" +source "${HOMEgfs}/ush/jjob_header.sh" -e "earc_tars" -c "base earc_tars" ############################################## @@ -17,7 +17,7 @@ MEMDIR="ensstat" YMD=${PDY} HH=${cyc} declare_from_tmpl -rx \ # Run archive script ############################################################### -"${SCRgfs}/exgdas_enkf_earc.py" +"${SCRgfs}/exgdas_enkf_earc_tars.py" status=$? [[ ${status} -ne 0 ]] && exit "${status}" diff --git a/jobs/JGDAS_ENKF_ARCHIVE_VRFY b/jobs/JGDAS_ENKF_ARCHIVE_VRFY new file mode 100755 index 0000000000..0094401987 --- /dev/null +++ b/jobs/JGDAS_ENKF_ARCHIVE_VRFY @@ -0,0 +1,43 @@ +#! /usr/bin/env bash + +source "${HOMEgfs}/ush/preamble.sh" +source "${HOMEgfs}/ush/jjob_header.sh" -e "earc_vrfy" -c "base earc_vrfy" + + +############################################## +# Set variables used in the script +############################################## +YMD=${PDY} HH=${cyc} declare_from_tmpl -rx COM_TOP +MEMDIR="ensstat" YMD=${PDY} HH=${cyc} declare_from_tmpl -rx \ + COMIN_ATMOS_ANALYSIS_ENSSTAT:COM_ATMOS_ANALYSIS_TMPL \ + COMIN_ATMOS_HISTORY_ENSSTAT:COM_ATMOS_HISTORY_TMPL \ + COMIN_SNOW_ANALYSIS_ENSSTAT:COM_SNOW_ANALYSIS_TMPL + +############################################################### +# Run archive script +############################################################### + +"${SCRgfs}/exgdas_enkf_earc_vrfy.py" +status=$? +[[ ${status} -ne 0 ]] && exit "${status}" + +############################################################### + +############################################## +# End JOB SPECIFIC work +############################################## + +############################################## +# Final processing +############################################## +if [[ -e "${pgmout}" ]] ; then + cat "${pgmout}" +fi + +########################################## +# Remove the Temporary working directory +########################################## +cd "${DATAROOT}" || (echo "${DATAROOT} does not exist. ABORT!"; exit 1) +[[ ${KEEPDATA} = "NO" ]] && rm -rf "${DATA}" + +exit 0 diff --git a/jobs/JGLOBAL_ARCHIVE b/jobs/JGLOBAL_ARCHIVE_TARS similarity index 51% rename from jobs/JGLOBAL_ARCHIVE rename to jobs/JGLOBAL_ARCHIVE_TARS index f62386cdd9..e89370bd37 100755 --- a/jobs/JGLOBAL_ARCHIVE +++ b/jobs/JGLOBAL_ARCHIVE_TARS @@ -1,48 +1,49 @@ #! /usr/bin/env bash source "${HOMEgfs}/ush/preamble.sh" -source "${HOMEgfs}/ush/jjob_header.sh" -e "arch" -c "base arch wave" -source "${USHgfs}/wave_domain_grid.sh" +source "${HOMEgfs}/ush/jjob_header.sh" -e "arch_tars" -c "base arch_tars wave" +source "${USHgfs}/wave_domain_grid.sh" + ############################################## # Set variables used in the script ############################################## YMD=${PDY} HH=${cyc} declare_from_tmpl -rx \ - COMIN_ATMOS_ANALYSIS:COM_ATMOS_ANALYSIS_TMPL \ - COMIN_ATMOS_BUFR:COM_ATMOS_BUFR_TMPL \ - COMIN_ATMOS_GEMPAK:COM_ATMOS_GEMPAK_TMPL \ - COMIN_ATMOS_GENESIS:COM_ATMOS_GENESIS_TMPL \ - COMIN_ATMOS_HISTORY:COM_ATMOS_HISTORY_TMPL \ - COMIN_ATMOS_INPUT:COM_ATMOS_INPUT_TMPL \ - COMIN_ATMOS_MASTER:COM_ATMOS_MASTER_TMPL \ - COMIN_ATMOS_RESTART:COM_ATMOS_RESTART_TMPL \ - COMIN_ATMOS_TRACK:COM_ATMOS_TRACK_TMPL \ - COMIN_ATMOS_WMO:COM_ATMOS_WMO_TMPL \ - COMIN_CHEM_HISTORY:COM_CHEM_HISTORY_TMPL \ - COMIN_CHEM_ANALYSIS:COM_CHEM_ANALYSIS_TMPL \ - COMIN_MED_RESTART:COM_MED_RESTART_TMPL \ - COMIN_SNOW_ANALYSIS:COM_SNOW_ANALYSIS_TMPL \ - COMIN_ICE_HISTORY:COM_ICE_HISTORY_TMPL \ - COMIN_ICE_INPUT:COM_ICE_INPUT_TMPL \ - COMIN_ICE_RESTART:COM_ICE_RESTART_TMPL \ - COMIN_ICE_GRIB:COM_ICE_GRIB_TMPL \ - COMIN_OBS:COM_OBS_TMPL \ - COMIN_TOP:COM_TOP_TMPL \ - COMIN_OCEAN_HISTORY:COM_OCEAN_HISTORY_TMPL \ - COMIN_OCEAN_RESTART:COM_OCEAN_RESTART_TMPL \ - COMIN_OCEAN_GRIB:COM_OCEAN_GRIB_TMPL \ - COMIN_OCEAN_NETCDF:COM_OCEAN_NETCDF_TMPL \ - COMIN_OCEAN_ANALYSIS:COM_OCEAN_ANALYSIS_TMPL \ - COMIN_OCEAN_BMATRIX:COM_OCEAN_BMATRIX_TMPL \ - COMIN_ICE_BMATRIX:COM_ICE_BMATRIX_TMPL \ - COMIN_WAVE_GRID:COM_WAVE_GRID_TMPL \ - COMIN_WAVE_HISTORY:COM_WAVE_HISTORY_TMPL \ - COMIN_WAVE_STATION:COM_WAVE_STATION_TMPL \ - COMIN_WAVE_RESTART:COM_WAVE_RESTART_TMPL \ - COMIN_ATMOS_OZNMON:COM_ATMOS_OZNMON_TMPL \ - COMIN_ATMOS_RADMON:COM_ATMOS_RADMON_TMPL \ - COMIN_ATMOS_MINMON:COM_ATMOS_MINMON_TMPL \ - COMIN_CONF:COM_CONF_TMPL \ - COMOUT_ATMOS_TRACK:COM_ATMOS_TRACK_TMPL + COMIN_ATMOS_ANALYSIS:COM_ATMOS_ANALYSIS_TMPL \ + COMIN_ATMOS_BUFR:COM_ATMOS_BUFR_TMPL \ + COMIN_ATMOS_GEMPAK:COM_ATMOS_GEMPAK_TMPL \ + COMIN_ATMOS_GENESIS:COM_ATMOS_GENESIS_TMPL \ + COMIN_ATMOS_HISTORY:COM_ATMOS_HISTORY_TMPL \ + COMIN_ATMOS_INPUT:COM_ATMOS_INPUT_TMPL \ + COMIN_ATMOS_MASTER:COM_ATMOS_MASTER_TMPL \ + COMIN_ATMOS_RESTART:COM_ATMOS_RESTART_TMPL \ + COMIN_ATMOS_TRACK:COM_ATMOS_TRACK_TMPL \ + COMIN_ATMOS_WMO:COM_ATMOS_WMO_TMPL \ + COMIN_CHEM_HISTORY:COM_CHEM_HISTORY_TMPL \ + COMIN_CHEM_ANALYSIS:COM_CHEM_ANALYSIS_TMPL \ + COMIN_MED_RESTART:COM_MED_RESTART_TMPL \ + COMIN_SNOW_ANALYSIS:COM_SNOW_ANALYSIS_TMPL \ + COMIN_ICE_HISTORY:COM_ICE_HISTORY_TMPL \ + COMIN_ICE_INPUT:COM_ICE_INPUT_TMPL \ + COMIN_ICE_RESTART:COM_ICE_RESTART_TMPL \ + COMIN_ICE_GRIB:COM_ICE_GRIB_TMPL \ + COMIN_OBS:COM_OBS_TMPL \ + COMIN_TOP:COM_TOP_TMPL \ + COMIN_OCEAN_HISTORY:COM_OCEAN_HISTORY_TMPL \ + COMIN_OCEAN_RESTART:COM_OCEAN_RESTART_TMPL \ + COMIN_OCEAN_GRIB:COM_OCEAN_GRIB_TMPL \ + COMIN_OCEAN_NETCDF:COM_OCEAN_NETCDF_TMPL \ + COMIN_OCEAN_ANALYSIS:COM_OCEAN_ANALYSIS_TMPL \ + COMIN_OCEAN_BMATRIX:COM_OCEAN_BMATRIX_TMPL \ + COMIN_ICE_BMATRIX:COM_ICE_BMATRIX_TMPL \ + COMIN_WAVE_GRID:COM_WAVE_GRID_TMPL \ + COMIN_WAVE_HISTORY:COM_WAVE_HISTORY_TMPL \ + COMIN_WAVE_STATION:COM_WAVE_STATION_TMPL \ + COMIN_WAVE_RESTART:COM_WAVE_RESTART_TMPL \ + COMIN_ATMOS_OZNMON:COM_ATMOS_OZNMON_TMPL \ + COMIN_ATMOS_RADMON:COM_ATMOS_RADMON_TMPL \ + COMIN_ATMOS_MINMON:COM_ATMOS_MINMON_TMPL \ + COMIN_CONF:COM_CONF_TMPL \ + COMOUT_ATMOS_TRACK:COM_ATMOS_TRACK_TMPL for grid in "0p25" "0p50" "1p00"; do YMD=${PDY} HH=${cyc} GRID=${grid} declare_from_tmpl -rx \ @@ -70,7 +71,7 @@ fi # Run archive script ############################################################### -${GLOBALARCHIVESH:-${SCRgfs}/exglobal_archive.py} +${GLOBALARCHIVESH:-${SCRgfs}/exglobal_archive_tars.py} status=$? [[ ${status} -ne 0 ]] && exit "${status}" diff --git a/jobs/JGLOBAL_ARCHIVE_VRFY b/jobs/JGLOBAL_ARCHIVE_VRFY new file mode 100755 index 0000000000..01dd876ee1 --- /dev/null +++ b/jobs/JGLOBAL_ARCHIVE_VRFY @@ -0,0 +1,50 @@ +#! /usr/bin/env bash + +source "${HOMEgfs}/ush/preamble.sh" +source "${HOMEgfs}/ush/jjob_header.sh" -e "arch_vrfy" -c "base arch_vrfy wave" + + +############################################## +# Set variables used in the script +############################################## +YMD=${PDY} HH=${cyc} declare_from_tmpl -rx \ + COMIN_ATMOS_ANALYSIS:COM_ATMOS_ANALYSIS_TMPL \ + COMIN_ATMOS_GENESIS:COM_ATMOS_GENESIS_TMPL \ + COMIN_ATMOS_HISTORY:COM_ATMOS_HISTORY_TMPL \ + COMIN_ATMOS_TRACK:COM_ATMOS_TRACK_TMPL \ + COMIN_CHEM_ANALYSIS:COM_CHEM_ANALYSIS_TMPL \ + COMIN_SNOW_ANALYSIS:COM_SNOW_ANALYSIS_TMPL \ + COMIN_OBS:COM_OBS_TMPL \ + COMOUT_ATMOS_TRACK:COM_ATMOS_TRACK_TMPL + +for grid in "0p25" "0p50" "1p00"; do + YMD=${PDY} HH=${cyc} GRID=${grid} declare_from_tmpl -rx \ + "COMIN_ATMOS_GRIB_${grid}:COM_ATMOS_GRIB_GRID_TMPL" +done + +############################################################### +# Run archive script +############################################################### + +${GLOBALARCHIVESH:-${SCRgfs}/exglobal_archive_vrfy.py} +status=$? +[[ ${status} -ne 0 ]] && exit "${status}" + +############################################## +# End JOB SPECIFIC work +############################################## + +############################################## +# Final processing +############################################## +if [[ -e "${pgmout}" ]] ; then + cat "${pgmout}" +fi + +########################################## +# Remove the Temporary working directory +########################################## +cd "${DATAROOT}" || (echo "${DATAROOT} does not exist. ABORT!"; exit 1) +[[ ${KEEPDATA} = "NO" ]] && rm -rf "${DATA}" + +exit 0 diff --git a/jobs/rocoto/arch.sh b/jobs/rocoto/arch_tars.sh similarity index 90% rename from jobs/rocoto/arch.sh rename to jobs/rocoto/arch_tars.sh index 083e319bf5..d9c58a9b38 100755 --- a/jobs/rocoto/arch.sh +++ b/jobs/rocoto/arch_tars.sh @@ -13,12 +13,12 @@ status=$? PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}${HOMEgfs}/ush/python" export PYTHONPATH -export job="arch" +export job="arch_tars" export jobid="${job}.$$" ############################################################### # Execute the JJOB -"${HOMEgfs}"/jobs/JGLOBAL_ARCHIVE +"${HOMEgfs}"/jobs/JGLOBAL_ARCHIVE_TARS status=$? exit "${status}" diff --git a/jobs/rocoto/arch_vrfy.sh b/jobs/rocoto/arch_vrfy.sh new file mode 100755 index 0000000000..fee66c01b6 --- /dev/null +++ b/jobs/rocoto/arch_vrfy.sh @@ -0,0 +1,24 @@ +#! /usr/bin/env bash + +source "${HOMEgfs}/ush/preamble.sh" + +############################################################### +# Source FV3GFS workflow modules +. "${HOMEgfs}"/ush/load_fv3gfs_modules.sh +status=$? +[[ ${status} -ne 0 ]] && exit "${status}" + +############################################################### +# setup python path for workflow utilities and tasks +PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}${HOMEgfs}/ush/python" +export PYTHONPATH + +export job="arch_vrfy" +export jobid="${job}.$$" + +############################################################### +# Execute the JJOB +"${HOMEgfs}"/jobs/JGLOBAL_ARCHIVE_VRFY +status=$? + +exit "${status}" diff --git a/jobs/rocoto/earc.sh b/jobs/rocoto/earc_tars.sh similarity index 90% rename from jobs/rocoto/earc.sh rename to jobs/rocoto/earc_tars.sh index 4a9263b509..a796d90232 100755 --- a/jobs/rocoto/earc.sh +++ b/jobs/rocoto/earc_tars.sh @@ -13,12 +13,12 @@ status=$? PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}${HOMEgfs}/ush/python" export PYTHONPATH -export job="earc" +export job="earc_tars" export jobid="${job}.$$" ############################################################### # Execute the JJOB -"${HOMEgfs}/jobs/JGDAS_ENKF_ARCHIVE" +"${HOMEgfs}/jobs/JGDAS_ENKF_ARCHIVE_TARS" status=$? exit "${status}" diff --git a/jobs/rocoto/earc_vrfy.sh b/jobs/rocoto/earc_vrfy.sh new file mode 100755 index 0000000000..2b8c0db05b --- /dev/null +++ b/jobs/rocoto/earc_vrfy.sh @@ -0,0 +1,24 @@ +#! /usr/bin/env bash + +source "${HOMEgfs}/ush/preamble.sh" + +############################################################### +# Source FV3GFS workflow modules +. "${HOMEgfs}/ush/load_fv3gfs_modules.sh" +status=$? +[[ ${status} -ne 0 ]] && exit "${status}" + +############################################################### +# setup python path for workflow utilities and tasks +PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}${HOMEgfs}/ush/python" +export PYTHONPATH + +export job="earc_vrfy" +export jobid="${job}.$$" + +############################################################### +# Execute the JJOB +"${HOMEgfs}/jobs/JGDAS_ENKF_ARCHIVE_VRFY" +status=$? + +exit "${status}" diff --git a/parm/archive/gfsa.yaml.j2 b/parm/archive/gfsa.yaml.j2 index 4efe281120..a7cfaafcd5 100644 --- a/parm/archive/gfsa.yaml.j2 +++ b/parm/archive/gfsa.yaml.j2 @@ -6,7 +6,7 @@ gfsa: # Logs # TODO explicitly name all logs to include {% for log in glob("logs/" ~ cycle_YMDH ~ "/gfs*.log") %} - {% if not "gfs_arch.log" in log %} + {% if not "gfs_arch_tars.log" in log %} - "{{ log }}" {% endif %} {% endfor %} diff --git a/parm/config/gefs/config.arch b/parm/config/gefs/config.arch_tars similarity index 56% rename from parm/config/gefs/config.arch rename to parm/config/gefs/config.arch_tars index a23bcce6ae..7605cc04c1 100644 --- a/parm/config/gefs/config.arch +++ b/parm/config/gefs/config.arch_tars @@ -1,15 +1,15 @@ #! /usr/bin/env bash -########## config.arch ########## +########## config.arch_tars ########## # Archive specific -echo "BEGIN: config.arch" +echo "BEGIN: config.arch_tars" # Get task specific resources -. "${EXPDIR}/config.resources" arch +. "${EXPDIR}/config.resources" arch_tars export ARCH_GAUSSIAN="YES" export ARCH_GAUSSIAN_FHMAX=${FHMAX_GFS} export ARCH_GAUSSIAN_FHINC=${FHOUT_GFS} -echo "END: config.arch" +echo "END: config.arch_tars" diff --git a/parm/config/gfs/config.arch b/parm/config/gefs/config.arch_vrfy similarity index 56% rename from parm/config/gfs/config.arch rename to parm/config/gefs/config.arch_vrfy index a23bcce6ae..cb668a48e2 100644 --- a/parm/config/gfs/config.arch +++ b/parm/config/gefs/config.arch_vrfy @@ -1,15 +1,15 @@ #! /usr/bin/env bash -########## config.arch ########## +########## config.arch_vrfy ########## # Archive specific -echo "BEGIN: config.arch" +echo "BEGIN: config.arch_vrfy" # Get task specific resources -. "${EXPDIR}/config.resources" arch +. "${EXPDIR}/config.resources" arch_vrfy export ARCH_GAUSSIAN="YES" export ARCH_GAUSSIAN_FHMAX=${FHMAX_GFS} export ARCH_GAUSSIAN_FHINC=${FHOUT_GFS} -echo "END: config.arch" +echo "END: config.arch_vrfy" diff --git a/parm/config/gefs/config.base b/parm/config/gefs/config.base index 400be7eb17..e1fb6e587a 100644 --- a/parm/config/gefs/config.base +++ b/parm/config/gefs/config.base @@ -334,11 +334,16 @@ export DO_METP="NO" # Run METPLUS jobs - set METPLUS settings in config. export DO_FIT2OBS="NO" # Run fit to observations package # Archiving options +export VRFYARCH="@VRFYARCH@" # save verification data locally export HPSSARCH="@HPSSARCH@" # save data to HPSS archive export LOCALARCH="@LOCALARCH@" # save data to local archive if [[ ${HPSSARCH} = "YES" ]] && [[ ${LOCALARCH} = "YES" ]]; then echo "Both HPSS and local archiving selected. Please choose one or the other." exit 3 +elif [[ ${HPSSARCH} = "YES" ]] || [[ ${LOCALARCH} = "YES" ]]; then + export DO_ARCHTAR="YES" +else + export DO_ARCHTAR="NO" fi export ARCH_CYC=00 # Archive data at this cycle for warm start and/or forecast-only capabilities export ARCH_WARMICFREQ=4 # Archive frequency in days for warm start capability @@ -348,8 +353,6 @@ export ARCH_EXPDIR_FREQ=0 # How often to archive the EXPDIR in hours or 0 for export ARCH_HASHES='YES' # Archive the hashes of the GW and submodules and 'git status' for each; requires ARCH_EXPDIR export ARCH_DIFFS='NO' # Archive the output of 'git diff' for the GW; requires ARCH_EXPDIR -export DELETE_COM_IN_ARCHIVE_JOB="YES" # NO=retain ROTDIR. YES default in arch.sh and earc.sh. - # Number of regional collectives to create soundings for export NUM_SND_COLLECTIVES=${NUM_SND_COLLECTIVES:-9} diff --git a/parm/config/gefs/config.resources b/parm/config/gefs/config.resources index bb33f3eb02..8270ea92b5 100644 --- a/parm/config/gefs/config.resources +++ b/parm/config/gefs/config.resources @@ -323,7 +323,7 @@ case ${step} in export is_exclusive=False ;; - "arch") + "arch_tars") export walltime="06:00:00" export ntasks=1 export tasks_per_node=1 @@ -331,6 +331,14 @@ case ${step} in export memory="4096M" ;; + "arch_vrfy") + export walltime="00:15:00" + export ntasks=1 + export tasks_per_node=1 + export threads_per_task=1 + export memory="4096M" + ;; + "cleanup") export walltime="00:30:00" export ntasks=1 diff --git a/parm/config/gefs/config.resources.AWSPW b/parm/config/gefs/config.resources.AWSPW index f91460b6aa..43cfcf56cc 100644 --- a/parm/config/gefs/config.resources.AWSPW +++ b/parm/config/gefs/config.resources.AWSPW @@ -18,7 +18,7 @@ case ${step} in max_tasks_per_node=48 ;; - "arch") + "arch_vrfy" | "arch_tars") export PARTITION_BATCH="process" max_tasks_per_node=24 ;; diff --git a/parm/config/gfs/config.arch_tars b/parm/config/gfs/config.arch_tars new file mode 100644 index 0000000000..f46ff45a3a --- /dev/null +++ b/parm/config/gfs/config.arch_tars @@ -0,0 +1,15 @@ +#! /usr/bin/env bash + +########## config.arch_tars ########## +# Archive specific + +echo "BEGIN: config.arch_tars" + +# Get task specific resources +. "${EXPDIR}/config.resources" "arch_tars" + +export ARCH_GAUSSIAN="YES" +export ARCH_GAUSSIAN_FHMAX=${FHMAX_GFS} +export ARCH_GAUSSIAN_FHINC=${FHOUT_GFS} + +echo "END: config.arch_tars" diff --git a/parm/config/gfs/config.arch_vrfy b/parm/config/gfs/config.arch_vrfy new file mode 100644 index 0000000000..6bcbdb57fc --- /dev/null +++ b/parm/config/gfs/config.arch_vrfy @@ -0,0 +1,15 @@ +#! /usr/bin/env bash + +########## config.arch_vrfy ########## +# Archive specific + +echo "BEGIN: config.arch_vrfy" + +# Get task specific resources +. "${EXPDIR}/config.resources" "arch_vrfy" + +export ARCH_GAUSSIAN="YES" +export ARCH_GAUSSIAN_FHMAX=${FHMAX_GFS} +export ARCH_GAUSSIAN_FHINC=${FHOUT_GFS} + +echo "END: config.arch_vrfy" diff --git a/parm/config/gfs/config.base b/parm/config/gfs/config.base index 96954b4acb..e28e7ef226 100644 --- a/parm/config/gfs/config.base +++ b/parm/config/gfs/config.base @@ -82,7 +82,7 @@ export DO_MOS="NO" # GFS Model Output Statistics - Only su # NO for retrospective parallel; YES for real-time parallel # arch.sh uses REALTIME for MOS. Need to set REALTIME=YES -# if want MOS written to HPSS. Should update arch.sh to +# if want MOS written to HPSS. Should update arch_vrfy.sh and arch_tars to # use RUNMOS flag export REALTIME="YES" @@ -485,6 +485,10 @@ export LOCALARCH="@LOCALARCH@" # save data to local archive if [[ ${HPSSARCH} = "YES" ]] && [[ ${LOCALARCH} = "YES" ]]; then echo "FATAL ERROR: Both HPSS and local archiving selected. Please choose one or the other." exit 4 +elif [[ ${HPSSARCH} = "YES" ]] || [[ ${LOCALARCH} = "YES" ]]; then + export DO_ARCHTAR="YES" +else + export DO_ARCHTAR="NO" fi export ARCH_CYC=00 # Archive data at this cycle for warm start and/or forecast-only capabilities export ARCH_WARMICFREQ=4 # Archive frequency in days for warm start capability diff --git a/parm/config/gfs/config.earc b/parm/config/gfs/config.earc_tars similarity index 72% rename from parm/config/gfs/config.earc rename to parm/config/gfs/config.earc_tars index 00a2fa95bd..c87c5e7784 100644 --- a/parm/config/gfs/config.earc +++ b/parm/config/gfs/config.earc_tars @@ -1,14 +1,14 @@ #! /usr/bin/env bash -########## config.earc ########## +########## config.earc_tars ########## # Ensemble archive specific -echo "BEGIN: config.earc" +echo "BEGIN: config.earc_tars" # Get task specific resources -. $EXPDIR/config.resources earc +. "${EXPDIR}/config.resources" "earc_tars" -# Set the number of ensemble members to archive per earc job +# Set the number of ensemble members to archive per earc_tars job case "${CASE_ENS}" in "C48" | "C96") export NMEM_EARCGRP=80 @@ -32,4 +32,4 @@ esac export RMOLDSTD_ENKF=144 export RMOLDEND_ENKF=24 -echo "END: config.earc" +echo "END: config.earc_tars" diff --git a/parm/config/gfs/config.earc_vrfy b/parm/config/gfs/config.earc_vrfy new file mode 100644 index 0000000000..9fe52b7ee4 --- /dev/null +++ b/parm/config/gfs/config.earc_vrfy @@ -0,0 +1,15 @@ +#! /usr/bin/env bash + +########## config.earc_vrfy ########## +# Ensemble archive specific + +echo "BEGIN: config.earc_vrfy" + +# Get task specific resources" +. "${EXPDIR}/config.resources" "earc_vrfy" + +#--starting and ending hours of previous cycles to be removed from rotating directory +export RMOLDSTD_ENKF=144 +export RMOLDEND_ENKF=24 + +echo "END: config.earc_vrfy" diff --git a/parm/config/gfs/config.resources b/parm/config/gfs/config.resources index 06acc4e36e..23fe8cdf18 100644 --- a/parm/config/gfs/config.resources +++ b/parm/config/gfs/config.resources @@ -20,8 +20,8 @@ if (( $# != 1 )); then echo "anal sfcanl analcalc analdiag fcst echgres" echo "upp atmos_products" echo "tracker genesis genesis_fsu" - echo "verfozn verfrad vminmon fit2obs metp arch cleanup" - echo "eobs ediag eomg eupd ecen esfc efcs epos earc" + echo "verfozn verfrad vminmon fit2obs metp arc_vrfy arc_tars cleanup" + echo "eobs ediag eomg eupd ecen esfc efcs epos earc_vrfy earc_tars" echo "init_chem mom6ic oceanice_products" echo "waveinit waveprep wavepostsbs wavepostbndpnt wavepostbndpntbll wavepostpnt" echo "wavegempak waveawipsbulls waveawipsgridded" @@ -1059,7 +1059,7 @@ case ${step} in export is_exclusive=True ;; - "arch" | "earc" | "getic" | "fetch") + "arch_tars" | "earc_tars" | "getic" | "fetch") walltime="06:00:00" ntasks=1 tasks_per_node=1 @@ -1067,6 +1067,14 @@ case ${step} in memory="4096M" ;; + "arch_vrfy" | "earc_vrfy") + walltime="00:15:00" + ntasks=1 + tasks_per_node=1 + threads_per_task=1 + memory="4096M" + ;; + "cleanup") walltime="00:15:00" ntasks=1 diff --git a/parm/config/gfs/config.resources.AWSPW b/parm/config/gfs/config.resources.AWSPW index 22fe110670..d044c475fb 100644 --- a/parm/config/gfs/config.resources.AWSPW +++ b/parm/config/gfs/config.resources.AWSPW @@ -18,7 +18,7 @@ case ${step} in max_tasks_per_node=48 ;; - "arch") + "arch_vrfy" | "arch_tars") export PARTITION_BATCH="process" max_tasks_per_node=24 ;; diff --git a/parm/config/gfs/config.resources.WCOSS2 b/parm/config/gfs/config.resources.WCOSS2 index 342286d008..be55214cac 100644 --- a/parm/config/gfs/config.resources.WCOSS2 +++ b/parm/config/gfs/config.resources.WCOSS2 @@ -37,7 +37,7 @@ case ${step} in export memory="200GB" ;; - "arch" | "earc" | "getic") + "arch_vrfy" | "arch_tars" | "earc_vrfy" | "earc_tars" | "getic") declare -x "memory"="50GB" ;; diff --git a/scripts/exgdas_enkf_earc.py b/scripts/exgdas_enkf_earc_tars.py similarity index 92% rename from scripts/exgdas_enkf_earc.py rename to scripts/exgdas_enkf_earc_tars.py index 107d541a41..63c978d6a8 100755 --- a/scripts/exgdas_enkf_earc.py +++ b/scripts/exgdas_enkf_earc_tars.py @@ -46,10 +46,7 @@ def main(): os.chdir(config.ROTDIR) # Determine which archives to create - arcdir_set, atardir_sets = archive.configure(archive_dict) - - # Populate the product archive (ARCDIR) - archive.execute_store_products(arcdir_set) + atardir_sets = archive.configure_tars(archive_dict) # Create the backup tarballs and store in ATARDIR for atardir_set in atardir_sets: diff --git a/scripts/exgdas_enkf_earc_vrfy.py b/scripts/exgdas_enkf_earc_vrfy.py new file mode 100755 index 0000000000..b7dd33a89a --- /dev/null +++ b/scripts/exgdas_enkf_earc_vrfy.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 + +import os + +from pygfs.task.archive import Archive +from wxflow import AttrDict, Logger, cast_strdict_as_dtypedict, chdir, logit + +# initialize root logger +logger = Logger(level=os.environ.get("LOGGING_LEVEL", "DEBUG"), colored_log=True) + + +@logit(logger) +def main(): + + config = cast_strdict_as_dtypedict(os.environ) + + # Instantiate the Archive object + archive = Archive(config) + + # Pull out all the configuration keys needed to run the rest of archive steps + keys = ['current_cycle', 'RUN', 'PSLOT', 'ROTDIR', 'PARMgfs', 'VFYARC', + 'ARCDIR', 'MODE', 'DO_JEDIATMENS', 'DO_FIT2OBS', 'DO_JEDIATMVAR', + 'DO_JEDISNOWDA', 'DO_AERO_ANL', 'DO_PREP_OBS_AERO', 'NET', 'MODE', 'FHOUT_GFS', + 'FHMAX_HF_GFS', 'FHOUT_GFS', 'FHMAX_FITS', 'FHMAX', 'FHOUT', 'FHMAX_GFS'] + + archive_dict = AttrDict() + for key in keys: + archive_dict[key] = archive.task_config.get(key) + if archive_dict[key] is None: + print(f"Warning: key ({key}) not found in task_config!") + + # Also import all COMIN* directory and template variables + for key in archive.task_config.keys(): + if key.startswith("COM"): + archive_dict[key] = archive.task_config[key] + + cwd = os.getcwd() + + os.chdir(config.ROTDIR) + + # Determine which archives to create + arcdir_set = archive.configure_vrfy(archive_dict) + + # Populate the product archive (ARCDIR) + archive.execute_store_products(arcdir_set) + + os.chdir(cwd) + + +if __name__ == '__main__': + main() diff --git a/scripts/exglobal_archive.py b/scripts/exglobal_archive_tars.py similarity index 92% rename from scripts/exglobal_archive.py rename to scripts/exglobal_archive_tars.py index f64db172ac..dc8b39050c 100755 --- a/scripts/exglobal_archive.py +++ b/scripts/exglobal_archive_tars.py @@ -35,7 +35,7 @@ def main(): 'ARCH_GAUSSIAN_FHMAX', 'ARCH_GAUSSIAN_FHINC', 'ARCH_GAUSSIAN_FHINC', 'DOIAU', 'OCNRES', 'ICERES', 'NUM_SND_COLLECTIVES', 'FHOUT_WAV', 'FHOUT_HF_WAV', 'FHMAX_WAV', 'FHMAX_HF_WAV', 'FHMAX_WAV_GFS', - 'restart_interval_gdas', 'restart_interval_gfs', + 'restart_interval_gdas', 'restart_interval_gfs', 'DO_ARCHTAR', 'DO_AERO_ANL', 'DO_AERO_FCST', 'DO_CA', 'DOIBP_WAV', 'DO_JEDIOCNVAR', 'DOHYBVAR_OCN', 'NMEM_ENS', 'DO_JEDIATMVAR', 'DO_VRFY_OCEANDA', 'FHMAX_FITS', 'waveGRD', 'IAUFHRS', 'DO_FIT2OBS', 'NET', 'FHOUT_HF_GFS', 'FHMAX_HF_GFS', 'REPLAY_ICS', @@ -57,10 +57,7 @@ def main(): with chdir(config.ROTDIR): # Determine which archives to create - arcdir_set, atardir_sets = archive.configure(archive_dict) - - # Populate the product archive (ARCDIR) - archive.execute_store_products(arcdir_set) + atardir_sets = archive.configure_tars(archive_dict) # Create the backup tarballs and store in ATARDIR for atardir_set in atardir_sets: diff --git a/scripts/exglobal_archive_vrfy.py b/scripts/exglobal_archive_vrfy.py new file mode 100755 index 0000000000..4d08b994f0 --- /dev/null +++ b/scripts/exglobal_archive_vrfy.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python3 + +import os + +from pygfs.task.archive import Archive +from wxflow import AttrDict, Logger, cast_strdict_as_dtypedict, logit, chdir + +# initialize root logger +logger = Logger(level=os.environ.get("LOGGING_LEVEL", "DEBUG"), colored_log=True) + + +@logit(logger) +def main(): + + config = cast_strdict_as_dtypedict(os.environ) + + # Instantiate the Archive object + archive = Archive(config) + + # update these keys to be 3 digits if they are part of archive.task_config.keys + for key in ['OCNRES', 'ICERES']: + try: + archive.task_config[key] = f"{archive.task_config[key]:03d}" + except KeyError as ee: + logger.info(f"key ({key}) not found in archive.task_config!") + + # Pull out all the configuration keys needed to run the rest of archive steps + keys = ['current_cycle', 'RUN', 'PSLOT', 'ROTDIR', 'PARMgfs', 'VFYARC', 'REPLAY_ICS', + 'ARCDIR', 'MODE', 'DO_JEDIATMENS', 'DO_FIT2OBS', 'DO_JEDIATMVAR', 'FHMIN_GFS', + 'DO_JEDISNOWDA', 'DO_AERO_ANL', 'DO_PREP_OBS_AERO', 'NET', 'MODE', 'FHOUT_GFS', + 'FHMAX_HF_GFS', 'FHOUT_GFS', 'FHMAX_FITS', 'FHMAX', 'FHOUT', 'FHMAX_GFS'] + + archive_dict = AttrDict() + for key in keys: + try: + archive_dict[key] = archive.task_config[key] + except KeyError as ee: + logger.warning(f"WARNING: key ({key}) not found in archive.task_config!") + + # Also import all COMIN* and COMOUT* directory and template variables + for key in archive.task_config.keys(): + if key.startswith(("COM_", "COMIN_", "COMOUT_")): + archive_dict[key] = archive.task_config.get(key) + + with chdir(config.ROTDIR): + + # Determine which archives to create + arcdir_set = archive.configure_vrfy(archive_dict) + + # Populate the product archive (ARCDIR) + archive.execute_store_products(arcdir_set) + + # Clean up any temporary files + archive.clean() + + +if __name__ == '__main__': + main() diff --git a/ush/python/pygfs/task/archive.py b/ush/python/pygfs/task/archive.py index ed63a22230..2620bc840b 100644 --- a/ush/python/pygfs/task/archive.py +++ b/ush/python/pygfs/task/archive.py @@ -48,8 +48,8 @@ def __init__(self, config: Dict[str, Any]) -> None: self.archive_expdir = False @logit(logger) - def configure(self, arch_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[str, Any]]): - """Determine which tarballs will need to be created. + def configure_vrfy(self, arch_dict: Dict[str, Any]) -> (Dict[str, Any]): + """Determine which files will need to be created to archive to arcdir. Parameters ---------- @@ -60,8 +60,6 @@ def configure(self, arch_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[str ------ arcdir_set : Dict[str, Any] Set of FileHandler instructions to copy files to the ARCDIR - atardir_sets : List[Dict[str, Any]] - List of tarballs and instructions for creating them via tar or htar """ if not os.path.isdir(arch_dict.ROTDIR): @@ -89,6 +87,44 @@ def configure(self, arch_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[str arcdir_set = Archive._construct_arcdir_set(arcdir_j2yaml, arch_dict) + # Collect datasets that need to be archived + self.tar_cmd = "" + + return arcdir_set + + @logit(logger) + def configure_tars(self, arch_dict: Dict[str, Any]) -> (List[Dict[str, Any]]): + """Determine which tarballs will need to be created. + + Parameters + ---------- + arch_dict : Dict[str, Any] + Task specific keys, e.g. runtime options (DO_AERO_FCST, DO_ICE, etc) + + Return + ------ + atardir_sets : List[Dict[str, Any]] + List of tarballs and instructions for creating them via tar or htar + """ + + if not os.path.isdir(arch_dict.ROTDIR): + raise FileNotFoundError(f"FATAL ERROR: The ROTDIR ({arch_dict.ROTDIR}) does not exist!") + + if arch_dict.RUN in ["gdas", "gfs"]: + + # Copy the cyclone track files and rename the experiments + # TODO This really doesn't belong in archiving and should be moved elsewhere + Archive._rename_cyclone_expt(arch_dict) + + archive_parm = os.path.join(arch_dict.PARMgfs, "archive") + + # Add the glob.glob function for capturing log filenames + # TODO remove this kludge once log filenames are explicit + arch_dict['glob'] = glob.glob + + # Add the os.path.exists function to the dict for yaml parsing + arch_dict['path_exists'] = os.path.exists + if not os.path.isdir(arch_dict.ROTDIR): raise FileNotFoundError(f"FATAL ERROR: The ROTDIR ({arch_dict.ROTDIR}) does not exist!") @@ -109,9 +145,8 @@ def configure(self, arch_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[str self.chgrp_cmd = chgrp self.chmod_cmd = os.chmod self.rm_cmd = rm_p - else: # Only perform local archiving. Do not create tarballs. - self.tar_cmd = "" - return arcdir_set, [] + else: + raise ValueError("FATAL ERROR: Neither HPSSARCH nor LOCALARCH are set to True!") # Determine if we are archiving the EXPDIR this cycle (always skip for ensembles) if "enkf" not in arch_dict.RUN and arch_dict.ARCH_EXPDIR: @@ -138,7 +173,7 @@ def configure(self, arch_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[str atardir_sets.append(dataset) - return arcdir_set, atardir_sets + return atardir_sets @logit(logger) def execute_store_products(self, arcdir_set: Dict[str, Any]) -> None: diff --git a/workflow/applications/applications.py b/workflow/applications/applications.py index 88be488b47..8672034195 100644 --- a/workflow/applications/applications.py +++ b/workflow/applications/applications.py @@ -90,6 +90,7 @@ def _get_run_options(self, conf: Configuration) -> Dict[str, Any]: run_options[run]['do_goes'] = run_base.get('DO_GOES', False) run_options[run]['do_mos'] = run_base.get('DO_MOS', False) run_options[run]['do_extractvars'] = run_base.get('DO_EXTRACTVARS', False) + run_options[run]['do_archtar'] = run_base.get('DO_ARCHTAR', False) run_options[run]['do_atm'] = run_base.get('DO_ATM', True) run_options[run]['do_wave'] = run_base.get('DO_WAVE', False) diff --git a/workflow/applications/gefs.py b/workflow/applications/gefs.py index 2e08ddc21d..9650912459 100644 --- a/workflow/applications/gefs.py +++ b/workflow/applications/gefs.py @@ -28,7 +28,7 @@ def _get_app_configs(self, run): Returns the config_files that are involved in gefs """ options = self.run_options[run] - configs = ['stage_ic', 'fcst', 'atmos_products', 'arch', 'cleanup'] + configs = ['stage_ic', 'fcst', 'atmos_products'] if options['nens'] > 0: configs += ['efcs', 'atmos_ensstat'] @@ -47,6 +47,11 @@ def _get_app_configs(self, run): if options['do_extractvars']: configs += ['extractvars'] + if options['do_archtar']: + configs += ['arch_tars'] + + configs += ['arch_vrfy', 'cleanup'] + return configs @staticmethod @@ -93,6 +98,9 @@ def get_task_names(self): if options['do_extractvars']: tasks += ['extractvars'] - tasks += ['arch', 'cleanup'] + if options['do_archtar']: + tasks += ['arch_tars'] + + tasks += ['arch_vrfy', 'cleanup'] return {f"{self.run}": tasks} diff --git a/workflow/applications/gfs_cycled.py b/workflow/applications/gfs_cycled.py index 2bf793eafa..b348bb4273 100644 --- a/workflow/applications/gfs_cycled.py +++ b/workflow/applications/gfs_cycled.py @@ -76,7 +76,10 @@ def _get_app_configs(self, run): if options['do_ocean'] or options['do_ice']: configs += ['oceanice_products'] - configs += ['stage_ic', 'sfcanl', 'analcalc', 'fcst', 'upp', 'atmos_products', 'arch', 'cleanup'] + configs += ['stage_ic', 'sfcanl', 'analcalc', 'fcst', 'upp', 'atmos_products', 'arch_vrfy', 'cleanup'] + + if options['do_archtar']: + configs += ['arch_tars'] if options['do_hybvar']: if options['do_jediatmens']: @@ -84,7 +87,11 @@ def _get_app_configs(self, run): 'atmensanlletkf', 'atmensanlfv3inc', 'atmensanlfinal'] else: configs += ['eobs', 'eomg', 'ediag', 'eupd'] - configs += ['ecen', 'esfc', 'efcs', 'echgres', 'epos', 'earc'] + + configs += ['ecen', 'esfc', 'efcs', 'echgres', 'epos', 'earc_vrfy'] + + if options['do_archtar']: + configs += ['earc_tars'] if options['do_fit2obs']: configs += ['fit2obs'] @@ -294,8 +301,11 @@ def get_task_names(self): 'mos_stn_prdgen', 'mos_grd_prdgen', 'mos_ext_stn_prdgen', 'mos_ext_grd_prdgen', 'mos_wx_prdgen', 'mos_wx_ext_prdgen'] - # Last two items - task_names[run] += ['arch', 'cleanup'] + # Last items + task_names[run] += ['arch_vrfy'] + if options['do_archtar']: + task_names[run] += ['arch_tars'] + task_names[run] += ['cleanup'] # Ensemble tasks elif 'enkf' in run: @@ -317,6 +327,10 @@ def get_task_names(self): task_names[run].append('esnowanl') if options['do_jedisnowda'] else 0 task_names[run].append('efcs') if 'gdas' in run else 0 task_names[run].append('epos') if 'gdas' in run else 0 - task_names[run] += ['stage_ic', 'ecen', 'esfc', 'earc', 'cleanup'] + + task_names[run] += ['stage_ic', 'ecen', 'esfc'] + if options['do_archtar']: + task_names[run] += ['earc_tars'] + task_names[run] += ['earc_vrfy', 'cleanup'] return task_names diff --git a/workflow/applications/gfs_forecast_only.py b/workflow/applications/gfs_forecast_only.py index 7409d4adec..6645b892d9 100644 --- a/workflow/applications/gfs_forecast_only.py +++ b/workflow/applications/gfs_forecast_only.py @@ -30,10 +30,11 @@ def _get_app_configs(self, run): configs = [] options = self.run_options[run] + if options['do_fetch_hpss'] or options['do_fetch_local']: configs += ['fetch'] - configs += ['stage_ic', 'fcst', 'arch', 'cleanup'] + configs += ['stage_ic', 'fcst', 'arch_vrfy', 'cleanup'] if options['do_atm']: @@ -85,6 +86,9 @@ def _get_app_configs(self, run): 'mos_stn_prdgen', 'mos_grd_prdgen', 'mos_ext_stn_prdgen', 'mos_ext_grd_prdgen', 'mos_wx_prdgen', 'mos_wx_ext_prdgen'] + if options['do_archtar']: + configs += ['arch_tars'] + return configs @staticmethod @@ -173,6 +177,9 @@ def get_task_names(self): 'mos_stn_prdgen', 'mos_grd_prdgen', 'mos_ext_stn_prdgen', 'mos_ext_grd_prdgen', 'mos_wx_prdgen', 'mos_wx_ext_prdgen'] - tasks += ['arch', 'cleanup'] # arch and cleanup **must** be the last tasks + if options['do_archtar']: + tasks += ['arch_tars'] + + tasks += ['arch_vrfy', 'cleanup'] # arch_tar, arch_vrfy, and cleanup **must** be the last tasks return {f"{self.run}": tasks} diff --git a/workflow/rocoto/gefs_tasks.py b/workflow/rocoto/gefs_tasks.py index f1b1cd1ea2..8b1522ae32 100644 --- a/workflow/rocoto/gefs_tasks.py +++ b/workflow/rocoto/gefs_tasks.py @@ -541,7 +541,7 @@ def extractvars(self): return task - def arch(self): + def arch_vrfy(self): deps = [] dep_dict = {'type': 'metatask', 'name': 'gefs_atmos_prod'} deps.append(rocoto.add_dependency(dep_dict)) @@ -568,14 +568,58 @@ def arch(self): deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps, dep_condition='and') - resources = self.get_resource('arch') - task_name = 'gefs_arch' + resources = self.get_resource('arch_vrfy') + task_name = 'gefs_arch_vrfy' task_dict = {'task_name': task_name, 'resources': resources, 'envars': self.envars, 'cycledef': 'gefs', 'dependency': dependencies, - 'command': f'{self.HOMEgfs}/jobs/rocoto/arch.sh', + 'command': f'{self.HOMEgfs}/jobs/rocoto/arch_vrfy.sh', + 'job_name': f'{self.pslot}_{task_name}_@H', + 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', + 'maxtries': '&MAXTRIES;' + } + + task = rocoto.create_task(task_dict) + + return task + + def arch_tars(self): + deps = [] + dep_dict = {'type': 'metatask', 'name': 'gefs_atmos_prod'} + deps.append(rocoto.add_dependency(dep_dict)) + dep_dict = {'type': 'metatask', 'name': 'gefs_atmos_ensstat'} + deps.append(rocoto.add_dependency(dep_dict)) + if self.options['do_ice']: + dep_dict = {'type': 'metatask', 'name': 'gefs_ice_prod'} + deps.append(rocoto.add_dependency(dep_dict)) + if self.options['do_ocean']: + dep_dict = {'type': 'metatask', 'name': 'gefs_ocean_prod'} + deps.append(rocoto.add_dependency(dep_dict)) + if self.options['do_wave']: + dep_dict = {'type': 'metatask', 'name': 'gefs_wave_post_grid'} + deps.append(rocoto.add_dependency(dep_dict)) + dep_dict = {'type': 'metatask', 'name': 'gefs_wave_post_pnt'} + deps.append(rocoto.add_dependency(dep_dict)) + if self.options['do_wave_bnd']: + dep_dict = {'type': 'metatask', 'name': 'gefs_wave_post_bndpnt'} + deps.append(rocoto.add_dependency(dep_dict)) + dep_dict = {'type': 'metatask', 'name': 'gefs_wave_post_bndpnt_bull'} + deps.append(rocoto.add_dependency(dep_dict)) + if self.options['do_extractvars']: + dep_dict = {'type': 'metatask', 'name': 'gefs_extractvars'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps, dep_condition='and') + + resources = self.get_resource('arch_tars') + task_name = 'gefs_arch_tars' + task_dict = {'task_name': task_name, + 'resources': resources, + 'envars': self.envars, + 'cycledef': 'gefs', + 'dependency': dependencies, + 'command': f'{self.HOMEgfs}/jobs/rocoto/arch_tars.sh', 'job_name': f'{self.pslot}_{task_name}_@H', 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', 'maxtries': '&MAXTRIES;' @@ -587,9 +631,12 @@ def arch(self): def cleanup(self): deps = [] - dep_dict = {'type': 'task', 'name': 'gefs_arch'} + dep_dict = {'type': 'task', 'name': 'gefs_arch_vrfy'} deps.append(rocoto.add_dependency(dep_dict)) - dependencies = rocoto.create_dependency(dep=deps) + if self.options['do_archtar']: + dep_dict = {'type': 'task', 'name': 'gefs_arch_tars'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps, dep_condition='and') resources = self.get_resource('cleanup') task_name = 'gefs_cleanup' task_dict = {'task_name': task_name, diff --git a/workflow/rocoto/gfs_tasks.py b/workflow/rocoto/gfs_tasks.py index bce3036be7..5874125874 100644 --- a/workflow/rocoto/gfs_tasks.py +++ b/workflow/rocoto/gfs_tasks.py @@ -1897,13 +1897,13 @@ def fit2obs(self): def metp(self): deps = [] - dep_dict = {'type': 'task', 'name': f'{self.run}_arch'} + dep_dict = {'type': 'task', 'name': f'{self.run}_arch_vrfy'} deps.append(rocoto.add_dependency(dep_dict)) if self._base["interval_gfs"] < to_timedelta("24H"): n_lookback = self._base["interval_gfs"] // to_timedelta("6H") for lookback in range(1, n_lookback + 1): deps2 = [] - dep_dict = {'type': 'taskvalid', 'name': f'{self.run}_arch', 'condition': 'not'} + dep_dict = {'type': 'taskvalid', 'name': f'{self.run}_arch_vrfy', 'condition': 'not'} deps2.append(rocoto.add_dependency(dep_dict)) for lookback2 in range(1, lookback): offset = timedelta_to_HMS(-to_timedelta(f'{6*lookback2}H')) @@ -1911,7 +1911,7 @@ def metp(self): deps2.append(rocoto.add_dependency(dep_dict)) offset = timedelta_to_HMS(-to_timedelta(f'{6*lookback}H')) - dep_dict = {'type': 'task', 'name': f'{self.run}_arch', 'offset': offset} + dep_dict = {'type': 'task', 'name': f'{self.run}_arch_vrfy', 'offset': offset} deps2.append(rocoto.add_dependency(dep_dict)) deps.append(rocoto.create_dependency(dep_condition='and', dep=deps2)) @@ -2301,7 +2301,59 @@ def mos_wx_ext_prdgen(self): return task - def arch(self): + def arch_vrfy(self): + deps = [] + if self.app_config.mode in ['cycled']: + if self.run in ['gfs']: + dep_dict = {'type': 'task', 'name': f'{self.run}_atmanlprod'} + deps.append(rocoto.add_dependency(dep_dict)) + elif self.run in ['gdas']: + dep_dict = {'type': 'task', 'name': f'{self.run}_atmanlprod'} + deps.append(rocoto.add_dependency(dep_dict)) + if self.options['do_fit2obs']: + dep_dict = {'type': 'task', 'name': f'{self.run}_fit2obs'} + deps.append(rocoto.add_dependency(dep_dict)) + if self.run in ['gfs'] and self.options['do_tracker']: + dep_dict = {'type': 'task', 'name': f'{self.run}_tracker'} + deps.append(rocoto.add_dependency(dep_dict)) + if self.run in ['gfs'] and self.options['do_genesis']: + dep_dict = {'type': 'task', 'name': f'{self.run}_genesis'} + deps.append(rocoto.add_dependency(dep_dict)) + if self.run in ['gfs'] and self.options['do_genesis_fsu']: + dep_dict = {'type': 'task', 'name': f'{self.run}_genesis_fsu'} + deps.append(rocoto.add_dependency(dep_dict)) + # Post job dependencies + dep_dict = {'type': 'metatask', 'name': f'{self.run}_atmos_prod'} + deps.append(rocoto.add_dependency(dep_dict)) + if self.options['do_ocean']: + if self.run in ['gfs']: + dep_dict = {'type': 'metatask', 'name': f'{self.run}_ocean_prod'} + deps.append(rocoto.add_dependency(dep_dict)) + if self.options['do_ice']: + if self.run in ['gfs']: + dep_dict = {'type': 'metatask', 'name': f'{self.run}_ice_prod'} + deps.append(rocoto.add_dependency(dep_dict)) + + dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) + + resources = self.get_resource('arch_vrfy') + task_name = f'{self.run}_arch_vrfy' + task_dict = {'task_name': task_name, + 'resources': resources, + 'dependency': dependencies, + 'envars': self.envars, + 'cycledef': self.run.replace('enkf', ''), + 'command': f'{self.HOMEgfs}/jobs/rocoto/arch_vrfy.sh', + 'job_name': f'{self.pslot}_{task_name}_@H', + 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', + 'maxtries': '&MAXTRIES;' + } + + task = rocoto.create_task(task_dict) + + return task + + def arch_tars(self): deps = [] if self.app_config.mode in ['cycled']: if self.run in ['gfs']: @@ -2363,16 +2415,28 @@ def arch(self): dep_dict = {'type': 'task', 'name': f'{self.run}_mos_{job}'} deps.append(rocoto.add_dependency(dep_dict)) + if self.options['do_metp'] and self.run in ['gfs']: + deps2 = [] + # taskvalid only handles regular tasks, so just check the first metp job exists + dep_dict = {'type': 'taskvalid', 'name': f'{self.run}_metpg2g1', 'condition': 'not'} + deps2.append(rocoto.add_dependency(dep_dict)) + dep_dict = {'type': 'metatask', 'name': f'{self.run}_metp'} + deps2.append(rocoto.add_dependency(dep_dict)) + deps.append(rocoto.create_dependency(dep_condition='or', dep=deps2)) + + dep_dict = {'type': 'task', 'name': f'{self.run}_arch_vrfy'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) - resources = self.get_resource('arch') - task_name = f'{self.run}_arch' + resources = self.get_resource('arch_tars') + task_name = f'{self.run}_arch_tars' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, 'envars': self.envars, 'cycledef': self.run.replace('enkf', ''), - 'command': f'{self.HOMEgfs}/jobs/rocoto/arch.sh', + 'command': f'{self.HOMEgfs}/jobs/rocoto/arch_tars.sh', 'job_name': f'{self.pslot}_{task_name}_@H', 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', 'maxtries': '&MAXTRIES;' @@ -2382,15 +2446,22 @@ def arch(self): return task - # Cleanup +# cleanup def cleanup(self): deps = [] if 'enkf' in self.run: - dep_dict = {'type': 'metatask', 'name': f'{self.run}_eamn'} + dep_dict = {'type': 'task', 'name': f'{self.run}_earc_vrfy'} deps.append(rocoto.add_dependency(dep_dict)) + if self.options['do_archtar']: + dep_dict = {'type': 'metatask', 'name': f'{self.run}_earc_tars'} + deps.append(rocoto.add_dependency(dep_dict)) + else: - dep_dict = {'type': 'task', 'name': f'{self.run}_arch'} + dep_dict = {'type': 'task', 'name': f'{self.run}_arch_vrfy'} deps.append(rocoto.add_dependency(dep_dict)) + if self.options['do_archtar']: + dep_dict = {'type': 'task', 'name': f'{self.run}_arch_tars'} + deps.append(rocoto.add_dependency(dep_dict)) if self.options['do_gempak']: if self.run in ['gdas']: @@ -2944,7 +3015,40 @@ def _get_eposgroups(epos): return task - def earc(self): + def earc_vrfy(self): + + deps = [] + if 'enkfgdas' in self.run: + dep_dict = {'type': 'metatask', 'name': f'{self.run}_epmn'} + else: + dep_dict = {'type': 'task', 'name': f'{self.run}_esfc'} + deps.append(rocoto.add_dependency(dep_dict)) + dep_dict = {'type': 'task', 'name': f'{self.run}_echgres'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) + + earcenvars = self.envars.copy() + earcenvars.append(rocoto.create_envar(name='ENSGRP', value='#grp#')) + + resources = self.get_resource('earc_vrfy') + + task_name = f'{self.run}_earc_vrfy' + task_dict = {'task_name': task_name, + 'resources': resources, + 'dependency': dependencies, + 'envars': earcenvars, + 'cycledef': self.run.replace('enkf', ''), + 'command': f'{self.HOMEgfs}/jobs/rocoto/earc_vrfy.sh', + 'job_name': f'{self.pslot}_{task_name}_@H', + 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', + 'maxtries': '&MAXTRIES;' + } + + task = rocoto.create_task(task_dict) + + return task + + def earc_tars(self): deps = [] if 'enkfgdas' in self.run: @@ -2962,26 +3066,26 @@ def earc(self): earcenvars.append(rocoto.create_envar(name='ENSGRP', value='#grp#')) # Integer division is floor division, but we need ceiling division - n_groups = -(self.nmem // -self._configs['earc']['NMEM_EARCGRP']) + n_groups = -(self.nmem // -self._configs['earc_tars']['NMEM_EARCGRP']) groups = ' '.join([f'{grp:02d}' for grp in range(0, n_groups + 1)]) - resources = self.get_resource('earc') + resources = self.get_resource('earc_tars') var_dict = {'grp': groups} - task_name = f'{self.run}_earc#grp#' + task_name = f'{self.run}_earc_tars_#grp#' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, 'envars': earcenvars, 'cycledef': self.run.replace('enkf', ''), - 'command': f'{self.HOMEgfs}/jobs/rocoto/earc.sh', + 'command': f'{self.HOMEgfs}/jobs/rocoto/earc_tars.sh', 'job_name': f'{self.pslot}_{task_name}_@H', 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', 'maxtries': '&MAXTRIES;' } - metatask_dict = {'task_name': f'{self.run}_eamn', + metatask_dict = {'task_name': f'{self.run}_earc_tars', 'var_dict': var_dict, 'task_dict': task_dict } diff --git a/workflow/rocoto/tasks.py b/workflow/rocoto/tasks.py index c491f26800..74f5dc4b02 100644 --- a/workflow/rocoto/tasks.py +++ b/workflow/rocoto/tasks.py @@ -11,13 +11,13 @@ class Tasks: - SERVICE_TASKS = ['arch', 'earc', 'stage_ic', 'fetch', 'cleanup'] + SERVICE_TASKS = ['arch_vrfy', 'arch_tars', 'earc_vrfy', 'earc_tars', 'stage_ic', 'fetch', 'cleanup'] VALID_TASKS = ['aerosol_init', 'stage_ic', 'fetch', - 'prep', 'anal', 'sfcanl', 'analcalc', 'analdiag', 'arch', "cleanup", + 'prep', 'anal', 'sfcanl', 'analcalc', 'analdiag', 'arch_vrfy', 'arch_tars', "cleanup", 'prepatmiodaobs', 'atmanlinit', 'atmanlvar', 'atmanlfv3inc', 'atmanlfinal', 'prepoceanobs', 'marineanlinit', 'marineanlletkf', 'marinebmat', 'marineanlvar', 'ocnanalecen', 'marineanlchkpt', 'marineanlfinal', 'ocnanalvrfy', - 'earc', 'ecen', 'echgres', 'ediag', 'efcs', + 'earc_vrfy', 'earc_tars', 'ecen', 'echgres', 'ediag', 'efcs', 'eobs', 'eomg', 'epos', 'esfc', 'eupd', 'atmensanlinit', 'atmensanlobs', 'atmensanlsol', 'atmensanlletkf', 'atmensanlfv3inc', 'atmensanlfinal', 'aeroanlinit', 'aeroanlvar', 'aeroanlfinal', 'aeroanlgenb',