diff --git a/.github/workflows/ci_unit_tests.yaml b/.github/workflows/ci_unit_tests.yaml new file mode 100644 index 0000000000..e22f63bf56 --- /dev/null +++ b/.github/workflows/ci_unit_tests.yaml @@ -0,0 +1,64 @@ +name: CI Unit Tests +on: [pull_request, push, workflow_dispatch] + +jobs: + + ci_pytest: + runs-on: ubuntu-latest + name: Run unit tests on CI system + permissions: + checks: write + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: 3.11.8 + + - name: Install dependencies + run: | + sudo apt-get update + sudo apt-get install -y perl libxml-libxml-perl libxml-libxslt-perl libdatetime-perl + python -m pip install --upgrade pip + pip install pytest + pip install wxflow + pip install wget + + - name: Cache Rocoto Install + uses: actions/cache@v4 + with: + path: ~/rocoto + key: ${{ runner.os }}-rocoto-${{ hashFiles('**/ci-unit_tests.yaml') }} + + - name: Install Rocoto + run: | + if [ ! -d "$HOME/rocoto/bin" ]; then + git clone https://github.com/christopherwharrop/rocoto.git $HOME/rocoto + cd $HOME/rocoto + ./INSTALL + fi + echo "$HOME/rocoto/bin" >> $GITHUB_PATH + + - name: Run tests + shell: bash + run: | + sudo mkdir -p /scratch1/NCEPDEV + cd $GITHUB_WORKSPACE/sorc + git submodule update --init --recursive + ./link_workflow.sh + cd $GITHUB_WORKSPACE/ci/scripts/tests + ln -s ../wxflow + + pytest -v --junitxml $GITHUB_WORKSPACE/ci/scripts/tests/test-results.xml + + + - name: Publish Test Results + if: always() + uses: EnricoMi/publish-unit-test-result-action@v2 + with: + files: ci/scripts/tests/test-results.xml + job_summary: true + comment_mode: off diff --git a/ci/Jenkinsfile b/ci/Jenkinsfile index 956bd692dd..05d38b7898 100644 --- a/ci/Jenkinsfile +++ b/ci/Jenkinsfile @@ -14,7 +14,7 @@ pipeline { options { skipDefaultCheckout() - //parallelsAlwaysFailFast() + parallelsAlwaysFailFast() } stages { // This initial stage is used to get the Machine name from the GitHub labels on the PR @@ -90,9 +90,6 @@ pipeline { stage('3. Build System') { matrix { agent { label NodeName[machine].toLowerCase() } - //options { - // throttle(['global_matrix_build']) - //} axes { axis { name 'system' @@ -102,6 +99,7 @@ pipeline { stages { stage('build system') { steps { + catchError(buildResult: 'UNSTABLE', stageResult: 'FAILURE') { script { def HOMEgfs = "${CUSTOM_WORKSPACE}/${system}" // local HOMEgfs is used to build the system on per system basis under the custome workspace for each buile system sh(script: "mkdir -p ${HOMEgfs}") @@ -120,8 +118,8 @@ pipeline { if (env.CHANGE_ID) { sh(script: """${GH} pr comment ${env.CHANGE_ID} --repo ${repo_url} --body "Checkout **Failed** on ${Machine}: ${e.getMessage()}" """) } - echo "Failed to checkout: ${e.getMessage()}" STATUS = 'Failed' + error("Failed to checkout: ${e.getMessage()}") } def gist_url = "" def error_logs = "" @@ -155,6 +153,7 @@ pipeline { } catch (Exception error_comment) { echo "Failed to comment on PR: ${error_comment.getMessage()}" } + STATUS = 'Failed' error("Failed to build system on ${Machine}") } } @@ -174,6 +173,7 @@ pipeline { } } } + } } } } @@ -181,7 +181,9 @@ pipeline { } stage('4. Run Tests') { - failFast false + when { + expression { STATUS != 'Failed' } + } matrix { agent { label NodeName[machine].toLowerCase() } axes { @@ -198,14 +200,21 @@ pipeline { expression { return caseList.contains(Case) } } steps { + catchError(buildResult: 'UNSTABLE', stageResult: 'FAILURE') { script { sh(script: "sed -n '/{.*}/!p' ${CUSTOM_WORKSPACE}/gfs/ci/cases/pr/${Case}.yaml > ${CUSTOM_WORKSPACE}/gfs/ci/cases/pr/${Case}.yaml.tmp") def yaml_case = readYaml file: "${CUSTOM_WORKSPACE}/gfs/ci/cases/pr/${Case}.yaml.tmp" system = yaml_case.experiment.system def HOMEgfs = "${CUSTOM_WORKSPACE}/${system}" // local HOMEgfs is used to populate the XML on per system basis env.RUNTESTS = "${CUSTOM_WORKSPACE}/RUNTESTS" - sh(script: "${HOMEgfs}/ci/scripts/utils/ci_utils_wrapper.sh create_experiment ${HOMEgfs}/ci/cases/pr/${Case}.yaml") + try { + error_output = sh(script: "${HOMEgfs}/ci/scripts/utils/ci_utils_wrapper.sh create_experiment ${HOMEgfs}/ci/cases/pr/${Case}.yaml", returnStdout: true).trim() + } catch (Exception error_create) { + sh(script: """${GH} pr comment ${env.CHANGE_ID} --repo ${repo_url} --body "${Case} **FAILED** to create experment on ${Machine}\n with the error:\n\\`\\`\\`\n${error_output}\\`\\`\\`" """) + error("Case ${Case} failed to create experment directory") + } } + } } } @@ -213,7 +222,6 @@ pipeline { when { expression { return caseList.contains(Case) } } - failFast false steps { script { HOMEgfs = "${CUSTOM_WORKSPACE}/gfs" // common HOMEgfs is used to launch the scripts that run the experiments @@ -255,11 +263,11 @@ pipeline { STATUS = 'Failed' try { sh(script: """${GH} pr edit ${env.CHANGE_ID} --repo ${repo_url} --remove-label "CI-${Machine}-Running" --add-label "CI-${Machine}-${STATUS}" """, returnStatus: true) - sh(script: """${GH} pr comment ${env.CHANGE_ID} --repo ${repo_url} --body "Experiment ${Case} **FAILED** on ${Machine} in\n\\`${CUSTOM_WORKSPACE}/RUNTESTS/${pslot}\\`" """) + sh(script: """${GH} pr comment ${env.CHANGE_ID} --repo ${repo_url} --body "Experiment ${Case} **FAILED** on ${Machine} in\n\\`${CUSTOM_WORKSPACE}/RUNTESTS/EXPDIR/${pslot}\\`" """) } catch (Exception e) { echo "Failed to update label from Running to ${STATUS}: ${e.getMessage()}" } - error("Failed to run experiments ${Case} on ${Machine}") + echo "Failed to run experiments ${Case} on ${Machine}" } } } @@ -268,6 +276,7 @@ pipeline { } } } + stage( '5. FINALIZE' ) { agent { label NodeName[machine].toLowerCase() } steps { diff --git a/ci/cases/yamls/gefs_ci_defaults.yaml b/ci/cases/yamls/gefs_ci_defaults.yaml index ceb36d4acb..05a97edd90 100644 --- a/ci/cases/yamls/gefs_ci_defaults.yaml +++ b/ci/cases/yamls/gefs_ci_defaults.yaml @@ -1,4 +1,4 @@ defaults: !INC {{ HOMEgfs }}/parm/config/gefs/yaml/defaults.yaml base: - HPC_ACCOUNT: {{ 'HPC_ACCOUNT' | getenv }} + ACCOUNT: {{ 'HPC_ACCOUNT' | getenv }} diff --git a/ci/scripts/tests/test_create_experiment.py b/ci/scripts/tests/test_create_experiment.py new file mode 100644 index 0000000000..03f3a30805 --- /dev/null +++ b/ci/scripts/tests/test_create_experiment.py @@ -0,0 +1,29 @@ +from wxflow import Executable +from shutil import rmtree +import os +import copy + +_here = os.path.dirname(__file__) +HOMEgfs = os.sep.join(_here.split(os.sep)[:-3]) +RUNDIR = os.path.join(_here, 'testdata/RUNDIR') + + +def test_create_experiment(): + + create_experiment_script = Executable(f'{HOMEgfs}/workflow/create_experiment.py') + yaml_dir = yaml_dir = os.path.join(HOMEgfs, 'ci/cases/pr') + env = os.environ.copy() + env['RUNTESTS'] = RUNDIR + + for case in os.listdir(yaml_dir): + if case.endswith('.yaml'): + with open(os.path.join(yaml_dir, case), 'r') as file: + file_contents = file.read() + if 'ICSDIR_ROOT' not in file_contents: + create_experiment = copy.deepcopy(create_experiment_script) + create_experiment.add_default_arg(['-y', f'../../cases/pr/{case}', '--overwrite']) + env['pslot'] = os.path.splitext(case)[0] + create_experiment(env=env) + assert (create_experiment.returncode == 0) + + rmtree(RUNDIR) diff --git a/ci/scripts/tests/test_rocotostat.py b/ci/scripts/tests/test_rocotostat.py new file mode 100755 index 0000000000..f43f8df2f8 --- /dev/null +++ b/ci/scripts/tests/test_rocotostat.py @@ -0,0 +1,90 @@ +import sys +import os +from shutil import rmtree +import wget + +script_dir = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(os.path.join(os.path.dirname(script_dir), 'utils')) + +from rocotostat import rocoto_statcount, rocotostat_summary, is_done, is_stalled, CommandNotFoundError +from wxflow import which + +test_data_url = 'https://noaa-nws-global-pds.s3.amazonaws.com/data/CI/' + +testdata_path = 'testdata/rocotostat' +testdata_full_path = os.path.join(script_dir, testdata_path) + + +if not os.path.isfile(os.path.join(testdata_full_path, 'database.db')): + os.makedirs(testdata_full_path, exist_ok=True) + workflow_url = test_data_url + str(testdata_path) + '/workflow.xml' + workflow_destination = os.path.join(testdata_full_path, 'workflow.xml') + wget.download(workflow_url, workflow_destination) + + database_url = test_data_url + str(testdata_path) + '/database.db' + database_destination = os.path.join(testdata_full_path, 'database.db') + wget.download(database_url, database_destination) + +try: + rocotostat = which('rocotostat') +except CommandNotFoundError: + raise CommandNotFoundError("rocotostat not found in PATH") + +rocotostat.add_default_arg(['-w', os.path.join(testdata_path, 'workflow.xml'), '-d', os.path.join(testdata_path, 'database.db')]) + + +def test_rocoto_statcount(): + + result = rocoto_statcount(rocotostat) + + assert result['SUCCEEDED'] == 20 + assert result['FAIL'] == 0 + assert result['DEAD'] == 0 + assert result['RUNNING'] == 0 + assert result['SUBMITTING'] == 0 + assert result['QUEUED'] == 0 + + +def test_rocoto_summary(): + + result = rocotostat_summary(rocotostat) + + assert result['CYCLES_TOTAL'] == 1 + assert result['CYCLES_DONE'] == 1 + + +def test_rocoto_done(): + + result = rocotostat_summary(rocotostat) + + assert is_done(result) + + rmtree(testdata_full_path) + + +def test_rocoto_stalled(): + testdata_path = 'testdata/rocotostat_stalled' + testdata_full_path = os.path.join(script_dir, testdata_path) + xml = os.path.join(testdata_full_path, 'stalled.xml') + db = os.path.join(testdata_full_path, 'stalled.db') + + if not os.path.isfile(os.path.join(testdata_full_path, 'stalled.db')): + os.makedirs(testdata_full_path, exist_ok=True) + workflow_url = test_data_url + str(testdata_path) + '/stalled.xml' + database_url = test_data_url + str(testdata_path) + '/stalled.db' + + workflow_destination = os.path.join(testdata_full_path, 'stalled.xml') + wget.download(workflow_url, workflow_destination) + + database_destination = os.path.join(testdata_full_path, 'stalled.db') + wget.download(database_url, database_destination) + + rocotostat = which('rocotostat') + rocotostat.add_default_arg(['-w', xml, '-d', db]) + + result = rocoto_statcount(rocotostat) + + assert result['SUCCEEDED'] == 11 + assert is_stalled(result) + + rmtree(testdata_full_path) diff --git a/ci/scripts/tests/test_setup.py b/ci/scripts/tests/test_setup.py new file mode 100755 index 0000000000..77a36369f4 --- /dev/null +++ b/ci/scripts/tests/test_setup.py @@ -0,0 +1,89 @@ +from wxflow import Executable, Configuration, ProcessError +from shutil import rmtree +import pytest +import os + +_here = os.path.dirname(__file__) +HOMEgfs = os.sep.join(_here.split(os.sep)[:-3]) +RUNDIR = os.path.join(_here, 'testdata/RUNDIR') +pslot = "C48_ATM" +account = "fv3-cpu" +foobar = "foobar" + + +def test_setup_expt(): + + arguments = [ + "gfs", "forecast-only", + "--pslot", pslot, "--app", "ATM", "--resdetatmos", "48", + "--comroot", f"{RUNDIR}", "--expdir", f"{RUNDIR}", + "--idate", "2021032312", "--edate", "2021032312", "--overwrite" + ] + setup_expt_script = Executable(os.path.join(HOMEgfs, "workflow", "setup_expt.py")) + setup_expt_script.add_default_arg(arguments) + setup_expt_script() + assert (setup_expt_script.returncode == 0) + + +def test_setup_xml(): + + setup_xml_script = Executable(os.path.join(HOMEgfs, "workflow/setup_xml.py")) + setup_xml_script.add_default_arg(f"{RUNDIR}/{pslot}") + setup_xml_script() + assert (setup_xml_script.returncode == 0) + + cfg = Configuration(f"{RUNDIR}/{pslot}") + base = cfg.parse_config('config.base') + assert base.ACCOUNT == account + + assert "UNKNOWN" not in base.values() + + with open(f"{RUNDIR}/{pslot}/{pslot}.xml", 'r') as file: + contents = file.read() + assert contents.count(account) > 5 + + rmtree(RUNDIR) + + +def test_setup_xml_fail_config_env_cornercase(): + + script_content = ('''#!/usr/bin/env bash +export HOMEgfs=foobar +../../../workflow/setup_xml.py "${1}"\n +''') + + with open('run_setup_xml.sh', 'w') as file: + file.write(script_content) + os.chmod('run_setup_xml.sh', 0o755) + + try: + setup_xml_script = Executable(os.path.join(HOMEgfs, "ci", "scripts", "tests", "run_setup_xml.sh")) + setup_xml_script.add_default_arg(f"{RUNDIR}/{pslot}") + setup_xml_script() + assert (setup_xml_script.returncode == 0) + + cfg = Configuration(f"{RUNDIR}/{pslot}") + base = cfg.parse_config('config.base') + assert base.ACCOUNT == account + + assert foobar not in base.values() + assert "UNKNOWN" not in base.values() + + with open(f"{RUNDIR}/{pslot}/{pslot}.xml", 'r') as file: + contents = file.read() + assert contents.count(account) > 5 + + except ProcessError as e: + # We expect this fail becuse ACCOUNT=fv3-cpu in config.base and environment + pass + except Exception as e: + # If an exception occurs, pass the test with a custom message + pytest.fail(f"Expected exception occurred: {e}") + + finally: + # Cleanup code to ensure it runs regardless of test outcome + os.remove('run_setup_xml.sh') + try: + rmtree(RUNDIR) + except FileNotFoundError: + pass diff --git a/ci/scripts/utils/publish_logs.py b/ci/scripts/utils/publish_logs.py index 7768c17c10..283c84a8d1 100755 --- a/ci/scripts/utils/publish_logs.py +++ b/ci/scripts/utils/publish_logs.py @@ -46,7 +46,8 @@ def add_logs_to_gist(args, emcbot_gh): gist_files = {} for file in args.file: - file_content = file.read() + with open(file.name, 'r', encoding='latin-1') as file: + file_content = file.read() gist_files[os.path.basename(file.name)] = emcbot_gh.InputFileContent(file_content) gist = emcbot_gh.user.create_gist(public=True, files=gist_files, description=f"error log file from CI run {args.gist[0]}") @@ -85,7 +86,8 @@ def upload_logs_to_repo(args, emcbot_gh, emcbot_ci_url): break for file in args.file: - file_content = file.read() + with open(file.name, 'r', encoding='latin-1') as file: + file_content = file.read() file_path_in_repo = f"{repo_path}/{path_header}/" + str(os.path.basename(file.name)) emcbot_gh.repo.create_file(file_path_in_repo, "Adding error log file", file_content, branch="error_logs") diff --git a/ci/scripts/utils/rocotostat.py b/ci/scripts/utils/rocotostat.py index 9b1d8dcc3a..70c672f0e8 100755 --- a/ci/scripts/utils/rocotostat.py +++ b/ci/scripts/utils/rocotostat.py @@ -14,6 +14,35 @@ def attempt_multiple_times(expression, max_attempts, sleep_duration=0, exception_class=Exception): + """ + Retries a function multiple times. + + Try to execute the function expression up to max_attempts times ignoring any exceptions + of the type exception_class, It waits for sleep_duration seconds between attempts. + + Parameters + ---------- + expression : callable + The function to be executed. + max_attempts : int + The maximum number of attempts to execute the function. + sleep_duration : int, optional + The number of seconds to wait between attempts. Default is 0. + exception_class : Exception, optional + The type of exception to catch. Default is the base Exception class, catching all exceptions. + + Returns + ------- + The return value of the function expression. + + Raises + ------ + exception_class + If the function expression raises an exception of type exception_class + in all max_attempts attempts. + + """ + attempt = 0 last_exception = None while attempt < max_attempts: @@ -189,7 +218,7 @@ def is_stalled(rocoto_status): error_return = rocoto_status['UNKNOWN'] rocoto_state = 'UNKNOWN' elif is_stalled(rocoto_status): - rocoto_status = attempt_multiple_times(rocoto_statcount(rocotostat), 2, 120, ProcessError) + rocoto_status = attempt_multiple_times(lambda: rocoto_statcount(rocotostat), 2, 120, ProcessError) if is_stalled(rocoto_status): error_return = 3 rocoto_state = 'STALLED' diff --git a/env/HERA.env b/env/HERA.env index db63f0bfa5..b743a19a62 100755 --- a/env/HERA.env +++ b/env/HERA.env @@ -140,13 +140,13 @@ elif [[ "${step}" = "ocnanalecen" ]]; then [[ ${NTHREADS_OCNANALECEN} -gt ${nth_max} ]] && export NTHREADS_OCNANALECEN=${nth_max} export APRUN_OCNANALECEN="${launcher} -n ${npe_ocnanalecen} --cpus-per-task=${NTHREADS_OCNANALECEN}" -elif [[ "${step}" = "ocnanalletkf" ]]; then +elif [[ "${step}" = "marineanalletkf" ]]; then - nth_max=$((npe_node_max / npe_node_ocnanalletkf)) + nth_max=$((npe_node_max / npe_node_marineanalletkf)) - export NTHREADS_OCNANALLETKF=${nth_ocnanalletkf:-${nth_max}} - [[ ${NTHREADS_OCNANALLETKF} -gt ${nth_max} ]] && export NTHREADS_OCNANALLETKF=${nth_max} - export APRUN_OCNANALLETKF="${launcher} -n ${npe_ocnanalletkf} --cpus-per-task=${NTHREADS_OCNANALLETKF}" + export NTHREADS_MARINEANALLETKF=${nth_marineanalletkf:-${nth_max}} + [[ ${NTHREADS_MARINEANALLETKF} -gt ${nth_max} ]] && export NTHREADS_MARINEANALLETKF=${nth_max} + export APRUN_MARINEANALLETKF="${launcher} -n ${npe_marineanalletkf} --cpus-per-task=${NTHREADS_MARINEANALLETKF}" elif [[ "${step}" = "anal" ]] || [[ "${step}" = "analcalc" ]]; then diff --git a/env/ORION.env b/env/ORION.env index 502e99e192..c203acae48 100755 --- a/env/ORION.env +++ b/env/ORION.env @@ -148,13 +148,13 @@ elif [[ "${step}" = "ocnanalecen" ]]; then [[ ${NTHREADS_OCNANALECEN} -gt ${nth_max} ]] && export NTHREADS_OCNANALECEN=${nth_max} export APRUN_OCNANALECEN="${launcher} -n ${npe_ocnanalecen} --cpus-per-task=${NTHREADS_OCNANALECEN}" -elif [[ "${step}" = "ocnanalletkf" ]]; then +elif [[ "${step}" = "marineanalletkf" ]]; then - nth_max=$((npe_node_max / npe_node_ocnanalletkf)) + nth_max=$((npe_node_max / npe_node_marineanalletkf)) - export NTHREADS_OCNANALLETKF=${nth_ocnanalletkf:-${nth_max}} - [[ ${NTHREADS_OCNANALLETKF} -gt ${nth_max} ]] && export NTHREADS_OCNANALLETKF=${nth_max} - export APRUN_OCNANALLETKF="${launcher} -n ${npe_ocnanalletkf} --cpus-per-task=${NTHREADS_OCNANALLETKF}" + export NTHREADS_MARINEANALLETKF=${nth_marineanalletkf:-${nth_max}} + [[ ${NTHREADS_MARINEANALLETKF} -gt ${nth_max} ]] && export NTHREADS_MARINEANALLETKF=${nth_max} + export APRUN_MARINEANALLETKF="${launcher} -n ${npe_marineanalletkf} --cpus-per-task=${NTHREADS_MARINEANALLETKF}" elif [[ "${step}" = "anal" ]] || [[ "${step}" = "analcalc" ]]; then diff --git a/jobs/JGDAS_GLOBAL_OCEAN_ANALYSIS_LETKF b/jobs/JGLOBAL_MARINE_ANALYSIS_LETKF similarity index 82% rename from jobs/JGDAS_GLOBAL_OCEAN_ANALYSIS_LETKF rename to jobs/JGLOBAL_MARINE_ANALYSIS_LETKF index d03ddfc19a..38dc3049f9 100755 --- a/jobs/JGDAS_GLOBAL_OCEAN_ANALYSIS_LETKF +++ b/jobs/JGLOBAL_MARINE_ANALYSIS_LETKF @@ -1,6 +1,6 @@ #!/bin/bash source "${HOMEgfs}/ush/preamble.sh" -source "${HOMEgfs}/ush/jjob_header.sh" -e "ocnanalletkf" -c "base ocnanal ocnanalletkf" +source "${HOMEgfs}/ush/jjob_header.sh" -e "marineanalletkf" -c "base ocnanal marineanalletkf" ############################################## # Set variables used in the script @@ -13,8 +13,10 @@ gPDY=${GDATE:0:8} gcyc=${GDATE:8:2} YMD=${gPDY} HH=${gcyc} declare_from_tmpl -rx \ - COM_OCEAN_HISTORY_PREV:COM_OCEAN_HISTORY_TMPL \ - COM_ICE_HISTORY_PREV:COM_ICE_HISTORY_TMPL + COMIN_OCEAN_HISTORY_PREV:COM_OCEAN_HISTORY_TMPL \ + COMIN_ICE_HISTORY_PREV:COM_ICE_HISTORY_TMPL + +YMD=${PDY} HH=${cyc} declare_from_tmpl -rx COMIN_OBS:COM_OBS_TMPL ############################################## # Begin JOB SPECIFIC work diff --git a/jobs/rocoto/ocnanalletkf.sh b/jobs/rocoto/marineanalletkf.sh similarity index 87% rename from jobs/rocoto/ocnanalletkf.sh rename to jobs/rocoto/marineanalletkf.sh index f710be5710..f2bfb9f70c 100755 --- a/jobs/rocoto/ocnanalletkf.sh +++ b/jobs/rocoto/marineanalletkf.sh @@ -8,7 +8,7 @@ source "${HOMEgfs}/ush/preamble.sh" status=$? [[ ${status} -ne 0 ]] && exit "${status}" -export job="ocnanalletkf" +export job="marineanalletkf" export jobid="${job}.$$" ############################################################### @@ -18,6 +18,6 @@ export PYTHONPATH ############################################################### # Execute the JJOB -"${HOMEgfs}/jobs/JGDAS_GLOBAL_OCEAN_ANALYSIS_LETKF" +"${HOMEgfs}/jobs/JGLOBAL_MARINE_ANALYSIS_LETKF" status=$? exit "${status}" diff --git a/parm/config/gfs/config.marineanalletkf b/parm/config/gfs/config.marineanalletkf new file mode 100644 index 0000000000..fde3433a13 --- /dev/null +++ b/parm/config/gfs/config.marineanalletkf @@ -0,0 +1,18 @@ +#!/bin/bash + +########## config.marineanalletkf ########## +# Ocn Analysis specific + +echo "BEGIN: config.marineanalletkf" + +# Get task specific resources +. "${EXPDIR}/config.resources" marineanalletkf + +export MARINE_LETKF_EXEC="${JEDI_BIN}/gdas.x" +export MARINE_LETKF_YAML_TMPL="${PARMgfs}/gdas/soca/letkf/letkf.yaml.j2" +export MARINE_LETKF_STAGE_YAML_TMPL="${PARMgfs}/gdas/soca/letkf/letkf_stage.yaml.j2" + +export GRIDGEN_EXEC="${JEDI_BIN}/gdas_soca_gridgen.x" +export GRIDGEN_YAML="${PARMgfs}/gdas/soca/gridgen/gridgen.yaml" + +echo "END: config.marineanalletkf" diff --git a/parm/config/gfs/config.ocnanal b/parm/config/gfs/config.ocnanal index 38a6cbd52a..367e570ec8 100644 --- a/parm/config/gfs/config.ocnanal +++ b/parm/config/gfs/config.ocnanal @@ -16,8 +16,8 @@ export SOCA_NINNER=@SOCA_NINNER@ export CASE_ANL=@CASE_ANL@ export DOMAIN_STACK_SIZE=116640000 #TODO: Make the stack size resolution dependent export JEDI_BIN=${HOMEgfs}/sorc/gdas.cd/build/bin - -export COMIN_OBS=@COMIN_OBS@ +export SOCA_FIX_STAGE_YAML_TMPL="${PARMgfs}/gdas/soca/soca_fix_stage.yaml.j2" +export SOCA_ENS_BKG_STAGE_YAML_TMPL="${PARMgfs}/gdas/soca/soca_ens_bkg_stage.yaml.j2" # NICAS export NICAS_RESOL=@NICAS_RESOL@ diff --git a/parm/config/gfs/config.ocnanalletkf b/parm/config/gfs/config.ocnanalletkf deleted file mode 100644 index b67f37152e..0000000000 --- a/parm/config/gfs/config.ocnanalletkf +++ /dev/null @@ -1,11 +0,0 @@ -#!/bin/bash - -########## config.ocnanalletkf ########## -# Ocn Analysis specific - -echo "BEGIN: config.ocnanalletkf" - -# Get task specific resources -. "${EXPDIR}/config.resources" ocnanalletkf - -echo "END: config.ocnanalletkf" diff --git a/parm/config/gfs/config.resources b/parm/config/gfs/config.resources index e577a95fe6..2c0a3a8165 100644 --- a/parm/config/gfs/config.resources +++ b/parm/config/gfs/config.resources @@ -25,7 +25,7 @@ if (( $# != 1 )); then echo "waveinit waveprep wavepostsbs wavepostbndpnt wavepostbndpntbll wavepostpnt" echo "wavegempak waveawipsbulls waveawipsgridded" echo "postsnd awips gempak npoess" - echo "ocnanalprep prepoceanobs ocnanalbmat ocnanalrun ocnanalecen ocnanalletkf ocnanalchkpt ocnanalpost ocnanalvrfy" + echo "ocnanalprep prepoceanobs ocnanalbmat ocnanalrun ocnanalecen marineanalletkf ocnanalchkpt ocnanalpost ocnanalvrfy" exit 1 fi @@ -555,32 +555,32 @@ case ${step} in export memory_ocnanalecen ;; - "ocnanalletkf") + "marineanalletkf") npes=16 case ${OCNRES} in "025") npes=480 - memory_ocnanalletkf="96GB" + memory_marineanalletkf="96GB" ;; "050") npes=16 - memory_ocnanalletkf="96GB" + memory_marineanalletkf="96GB" ;; "500") npes=16 - memory_ocnanalletkf="24GB" + memory_marineanalletkf="24GB" ;; *) echo "FATAL ERROR: Resources not defined for job ${step} at resolution ${OCNRES}" exit 4 esac - export wtime_ocnanalletkf="00:10:00" - export npe_ocnanalletkf=${npes} - export nth_ocnanalletkf=1 + export wtime_marineanalletkf="00:10:00" + export npe_marineanalletkf=${npes} + export nth_marineanalletkf=1 export is_exclusive=True - export npe_node_ocnanalletkf=$(( npe_node_max / nth_ocnanalletkf )) - export memory_ocnanalletkf + export npe_node_marineanalletkf=$(( npe_node_max / nth_marineanalletkf )) + export memory_marineanalletkf ;; diff --git a/sorc/wxflow b/sorc/wxflow index 5dad7dd61c..1356acdb2b 160000 --- a/sorc/wxflow +++ b/sorc/wxflow @@ -1 +1 @@ -Subproject commit 5dad7dd61cebd9b3f2b163b3b06bb75eae1860a9 +Subproject commit 1356acdb2bbca28e442597699da1a295faa18fe3 diff --git a/ush/python/pygfs/task/marine_letkf.py b/ush/python/pygfs/task/marine_letkf.py index 0ae5bea98d..0fdd3d9aba 100644 --- a/ush/python/pygfs/task/marine_letkf.py +++ b/ush/python/pygfs/task/marine_letkf.py @@ -1,11 +1,16 @@ #!/usr/bin/env python3 +import f90nml from logging import getLogger +import os from pygfs.task.analysis import Analysis from typing import Dict -from wxflow import (chdir, +from wxflow import (AttrDict, + FileHandler, logit, - Task) + parse_j2yaml, + to_timedelta, + to_YMDH) logger = getLogger(__name__.split('.')[-1]) @@ -30,6 +35,21 @@ def __init__(self, config: Dict) -> None: logger.info("init") super().__init__(config) + _half_assim_freq = to_timedelta(f"{self.task_config.assim_freq}H") / 2 + _letkf_yaml_file = 'letkf.yaml' + _letkf_exec_args = [self.task_config.MARINE_LETKF_EXEC, + 'soca', + 'localensembleda', + _letkf_yaml_file] + + self.task_config.WINDOW_MIDDLE = self.task_config.current_cycle + self.task_config.WINDOW_BEGIN = self.task_config.current_cycle - _half_assim_freq + self.task_config.letkf_exec_args = _letkf_exec_args + self.task_config.letkf_yaml_file = _letkf_yaml_file + self.task_config.mom_input_nml_tmpl = os.path.join(self.task_config.DATA, 'mom_input.nml.tmpl') + self.task_config.mom_input_nml = os.path.join(self.task_config.DATA, 'mom_input.nml') + self.task_config.obs_dir = os.path.join(self.task_config.DATA, 'obs') + @logit(logger) def initialize(self): """Method initialize for ocean and sea ice LETKF task @@ -43,6 +63,63 @@ def initialize(self): logger.info("initialize") + # make directories and stage ensemble background files + ensbkgconf = AttrDict() + keys = ['previous_cycle', 'current_cycle', 'DATA', 'NMEM_ENS', + 'PARMgfs', 'ROTDIR', 'COM_OCEAN_HISTORY_TMPL', 'COM_ICE_HISTORY_TMPL'] + for key in keys: + ensbkgconf[key] = self.task_config[key] + ensbkgconf.RUN = 'enkfgdas' + soca_ens_bkg_stage_list = parse_j2yaml(self.task_config.SOCA_ENS_BKG_STAGE_YAML_TMPL, ensbkgconf) + FileHandler(soca_ens_bkg_stage_list).sync() + soca_fix_stage_list = parse_j2yaml(self.task_config.SOCA_FIX_STAGE_YAML_TMPL, self.task_config) + FileHandler(soca_fix_stage_list).sync() + letkf_stage_list = parse_j2yaml(self.task_config.MARINE_LETKF_STAGE_YAML_TMPL, self.task_config) + FileHandler(letkf_stage_list).sync() + + obs_list = parse_j2yaml(self.task_config.OBS_YAML, self.task_config) + + # get the list of observations + obs_files = [] + for ob in obs_list['observers']: + obs_name = ob['obs space']['name'].lower() + obs_filename = f"{self.task_config.RUN}.t{self.task_config.cyc}z.{obs_name}.{to_YMDH(self.task_config.current_cycle)}.nc" + obs_files.append((obs_filename, ob)) + + obs_files_to_copy = [] + obs_to_use = [] + # copy obs from COMIN_OBS to DATA/obs + for obs_file, ob in obs_files: + obs_src = os.path.join(self.task_config.COMIN_OBS, obs_file) + obs_dst = os.path.join(self.task_config.DATA, self.task_config.obs_dir, obs_file) + if os.path.exists(obs_src): + obs_files_to_copy.append([obs_src, obs_dst]) + obs_to_use.append(ob) + else: + logger.warning(f"{obs_file} is not available in {self.task_config.COMIN_OBS}") + + # stage the desired obs files + FileHandler({'copy': obs_files_to_copy}).sync() + + # make the letkf.yaml + letkfconf = AttrDict() + keys = ['WINDOW_BEGIN', 'WINDOW_MIDDLE', 'RUN', 'gcyc', 'NMEM_ENS'] + for key in keys: + letkfconf[key] = self.task_config[key] + letkfconf.RUN = 'enkfgdas' + letkf_yaml = parse_j2yaml(self.task_config.MARINE_LETKF_YAML_TMPL, letkfconf) + letkf_yaml.observations.observers = obs_to_use + letkf_yaml.save(self.task_config.letkf_yaml_file) + + # swap date and stack size in mom_input.nml + domain_stack_size = self.task_config.DOMAIN_STACK_SIZE + ymdhms = [int(s) for s in self.task_config.WINDOW_BEGIN.strftime('%Y,%m,%d,%H,%M,%S').split(',')] + with open(self.task_config.mom_input_nml_tmpl, 'r') as nml_file: + nml = f90nml.read(nml_file) + nml['ocean_solo_nml']['date_init'] = ymdhms + nml['fms_nml']['domains_stack_size'] = int(domain_stack_size) + nml.write(self.task_config.mom_input_nml, force=True) # force to overwrite if necessary + @logit(logger) def run(self): """Method run for ocean and sea ice LETKF task @@ -56,8 +133,6 @@ def run(self): logger.info("run") - chdir(self.runtime_config.DATA) - @logit(logger) def finalize(self): """Method finalize for ocean and sea ice LETKF task