From fd118959a1e43db8cce081aff8d9773a5376e4e7 Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Tue, 4 Jun 2024 22:38:11 +0200 Subject: [PATCH] Add test for parsing submit output to jobid --- aiida_hyperqueue/scheduler.py | 2 +- tests/conftest.py | 18 +-- tests/test_scheduler.py | 295 ++++++++++++++++++---------------- 3 files changed, 170 insertions(+), 145 deletions(-) diff --git a/aiida_hyperqueue/scheduler.py b/aiida_hyperqueue/scheduler.py index d560875..bd2b2f2 100644 --- a/aiida_hyperqueue/scheduler.py +++ b/aiida_hyperqueue/scheduler.py @@ -152,7 +152,7 @@ def _get_submit_command(self, submit_script: str) -> str: submit_script: the path of the submit script relative to the working directory. """ - submit_command = f"hq job submit {submit_script}" + submit_command = f"hq submit {submit_script}" self.logger.info(f"Submitting with: {submit_command}") diff --git a/tests/conftest.py b/tests/conftest.py index c362184..26ba2f0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -297,18 +297,18 @@ def command( environment.update(env) stderr = subprocess.DEVNULL if ignore_stderr else subprocess.STDOUT + process = subprocess.Popen( + final_args, + stdout=subprocess.PIPE, + stderr=stderr, + cwd=cwd, + env=environment, + stdin=subprocess.PIPE if stdin is not None else subprocess.DEVNULL, + ) if not wait: - return subprocess.Popen(final_args, stderr=stderr, cwd=cwd, env=environment) + return process else: - process = subprocess.Popen( - final_args, - stdout=subprocess.PIPE, - stderr=stderr, - cwd=cwd, - env=environment, - stdin=subprocess.PIPE if stdin is not None else subprocess.DEVNULL, - ) stdout = process.communicate(stdin)[0].decode() if process.returncode != 0: if expect_fail: diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 4b21f58..e582553 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -16,6 +16,23 @@ from .conftest import HqEnv from .utils import wait_for_job_state +@pytest.fixture +def valid_submit_script(): + scheduler = HyperQueueScheduler() + + job_tmpl = JobTemplate() + job_tmpl.job_name = 'echo hello' + job_tmpl.shebang = '#!/bin/bash' + job_tmpl.uuid = str(uuid.uuid4()) + job_tmpl.job_resource = scheduler.create_job_resource(num_cpus=1) + job_tmpl.max_wallclock_seconds = 24 * 3600 + tmpl_code_info = JobTemplateCodeInfo() + tmpl_code_info.cmdline_params = ['echo', 'Hello'] + job_tmpl.codes_info = [tmpl_code_info] + job_tmpl.codes_run_mode = CodeRunMode.SERIAL + + return scheduler.get_submit_script(job_tmpl) + def test_resource_validation(): """Tests to verify that resources are correctly validated.""" resource = HyperQueueJobResource(num_cpus=16, memory_mb=20) @@ -49,125 +66,29 @@ def test_resource_validation(): ): HyperQueueJobResource(num_cpus=4, memory_mb=1.2) +def test_submit_command(): + """Test submit command""" + scheduler = HyperQueueScheduler() -#def test_parse_common_joblist_output(): -# """Test whether _parse_joblist_output can parse the squeue output""" -# scheduler = SlurmScheduler() -# -# retval = 0 -# stdout = TEXT_SQUEUE_TO_TEST -# stderr = '' -# -# job_list = scheduler._parse_joblist_output(retval, stdout, stderr) -# job_dict = {j.job_id: j for j in job_list} -# -# # The parameters are hard coded in the text to parse -# job_parsed = len(job_list) -# assert job_parsed == JOBS_ON_CLUSTER -# -# job_running_parsed = len([j for j in job_list if j.job_state and j.job_state == JobState.RUNNING]) -# assert len(JOBS_RUNNING) == job_running_parsed -# -# job_held_parsed = len([j for j in job_list if j.job_state and j.job_state == JobState.QUEUED_HELD]) -# assert JOBS_HELD == job_held_parsed -# -# job_queued_parsed = len([j for j in job_list if j.job_state and j.job_state == JobState.QUEUED]) -# assert JOBS_QUEUED == job_queued_parsed -# -# parsed_running_users = [j.job_owner for j in job_list if j.job_state and j.job_state == JobState.RUNNING] -# assert set(USERS_RUNNING) == set(parsed_running_users) -# -# parsed_running_jobs = [j.job_id for j in job_list if j.job_state and j.job_state == JobState.RUNNING] -# assert set(JOBS_RUNNING) == set(parsed_running_jobs) -# -# assert job_dict['863553'].requested_wallclock_time_seconds, 30 * 60 -# assert job_dict['863553'].wallclock_time_seconds, 29 * 60 + 29 -# assert job_dict['863553'].dispatch_time, datetime.datetime(2013, 5, 23, 11, 44, 11) -# assert job_dict['863553'].submission_time, datetime.datetime(2013, 5, 23, 10, 42, 11) -# -# assert job_dict['863100'].annotation == 'Resources' -# assert job_dict['863100'].num_machines == 32 -# assert job_dict['863100'].num_mpiprocs == 1024 -# assert job_dict['863100'].queue_name == 'normal' -# -# assert job_dict['861352'].title == 'Pressure_PBEsol_0' -# -# assert job_dict['863554'].requested_wallclock_time_seconds is None -# -# # allocated_machines is not implemented in this version of the plugin -# # for j in job_list: -# # if j.allocated_machines: -# # num_machines = 0 -# # num_mpiprocs = 0 -# # for n in j.allocated_machines: -# # num_machines += 1 -# # num_mpiprocs += n.num_mpiprocs -# # -# # self.assertTrue( j.num_machines==num_machines ) -# # self.assertTrue( j.num_mpiprocs==num_mpiprocs ) -# -#def test_parse_failed_squeue_output(self): -# """Test that _parse_joblist_output reacts as expected to failures.""" -# scheduler = SlurmScheduler() -# -# # non-zero return value should raise -# with pytest.raises(SchedulerError, match='squeue returned exit code 1'): -# scheduler._parse_joblist_output(1, TEXT_SQUEUE_TO_TEST, '') -# -# # non-empty stderr should be logged -# with self.assertLogs(scheduler.logger, logging.WARNING): -# scheduler._parse_joblist_output(0, TEXT_SQUEUE_TO_TEST, 'error message') -# -# -#@pytest.mark.parametrize( -# 'value,expected', -# [ -# ('2', 2 * 60), -# ('02', 2 * 60), -# ('02:3', 2 * 60 + 3), -# ('02:03', 2 * 60 + 3), -# ('1:02:03', 3600 + 2 * 60 + 3), -# ('01:02:03', 3600 + 2 * 60 + 3), -# ('1-3', 86400 + 3 * 3600), -# ('01-3', 86400 + 3 * 3600), -# ('01-03', 86400 + 3 * 3600), -# ('1-3:5', 86400 + 3 * 3600 + 5 * 60), -# ('01-3:05', 86400 + 3 * 3600 + 5 * 60), -# ('01-03:05', 86400 + 3 * 3600 + 5 * 60), -# ('1-3:5:7', 86400 + 3 * 3600 + 5 * 60 + 7), -# ('01-3:05:7', 86400 + 3 * 3600 + 5 * 60 + 7), -# ('01-03:05:07', 86400 + 3 * 3600 + 5 * 60 + 7), -# ('UNLIMITED', 2**31 - 1), -# ('NOT_SET', None), -# ], -#) -#def test_time_conversion(value, expected): -# """Test conversion of (relative) times. -# -# From docs, acceptable time formats include -# "minutes", "minutes:seconds", "hours:minutes:seconds", -# "days-hours", "days-hours:minutes" and "days-hours:minutes:seconds". -# """ -# scheduler = SlurmScheduler() -# assert scheduler._convert_time(value) == expected -# -# -#def test_time_conversion_errors(caplog): -# """Test conversion of (relative) times for bad inputs.""" -# scheduler = SlurmScheduler() -# -# # Disable logging to avoid excessive output during test -# with caplog.at_level(logging.CRITICAL): -# with pytest.raises(ValueError, match='Unrecognized format for time string.'): -# # Empty string not valid -# scheduler._convert_time('') -# with pytest.raises(ValueError, match='Unrecognized format for time string.'): -# # there should be something after the dash -# scheduler._convert_time('1-') -# with pytest.raises(ValueError, match='Unrecognized format for time string.'): -# # there should be something after the dash -# # there cannot be a dash after the colons -# scheduler._convert_time('1:2-3') + assert scheduler._get_submit_command('job.sh') == "hq submit job.sh" + +def test_parse_submit_command_output(hq_env: HqEnv, valid_submit_script): + """Test parsing the output of submit command""" + hq_env.start_server() + hq_env.start_worker(cpus="2") + Path("_aiidasubmit.sh").write_text(valid_submit_script) + + process = hq_env.command(["submit", "_aiidasubmit.sh"], wait=False, ignore_stderr=True) + stdout = process.communicate()[0].decode() + stderr = "" + retval = process.returncode + + assert retval == 0 + + scheduler = HyperQueueScheduler() + job_id = scheduler._parse_submit_output(retval, stdout, stderr) + + assert job_id == "1" def test_submit_script(): """Test the creation of a simple submission script.""" @@ -216,26 +137,11 @@ def test_submit_script_mem_not_specified(): assert "#HQ --resource mem" not in submit_script_text -def test_submit_script_is_hq_valid(hq_env: HqEnv): +def test_submit_script_is_hq_valid(hq_env: HqEnv, valid_submit_script): """The generated script can actually be run by hq""" - scheduler = HyperQueueScheduler() - - job_tmpl = JobTemplate() - job_tmpl.job_name = 'echo hello' - job_tmpl.shebang = '#!/bin/bash' - job_tmpl.uuid = str(uuid.uuid4()) - job_tmpl.job_resource = scheduler.create_job_resource(num_cpus=1) - job_tmpl.max_wallclock_seconds = 24 * 3600 - tmpl_code_info = JobTemplateCodeInfo() - tmpl_code_info.cmdline_params = ['echo', 'Hello'] - job_tmpl.codes_info = [tmpl_code_info] - job_tmpl.codes_run_mode = CodeRunMode.SERIAL - - submit_script_text = scheduler.get_submit_script(job_tmpl) - hq_env.start_server() hq_env.start_worker(cpus="2") - Path("_aiidasubmit.sh").write_text(submit_script_text) + Path("_aiidasubmit.sh").write_text(valid_submit_script) hq_env.command(["submit", "_aiidasubmit.sh"]) table = hq_env.command(["job", "info", "1"], as_table=True) @@ -327,3 +233,122 @@ def test_submit_script_is_hq_valid(hq_env: HqEnv): # stderr = 'Batch job submission failed: Invalid account or account/partition combination specified' # result = scheduler._parse_submit_output(1, '', stderr) # assert result == CalcJob.exit_codes.ERROR_SCHEDULER_INVALID_ACCOUNT + +#def test_parse_common_joblist_output(): +# """Test whether _parse_joblist_output can parse the squeue output""" +# scheduler = SlurmScheduler() +# +# retval = 0 +# stdout = TEXT_SQUEUE_TO_TEST +# stderr = '' +# +# job_list = scheduler._parse_joblist_output(retval, stdout, stderr) +# job_dict = {j.job_id: j for j in job_list} +# +# # The parameters are hard coded in the text to parse +# job_parsed = len(job_list) +# assert job_parsed == JOBS_ON_CLUSTER +# +# job_running_parsed = len([j for j in job_list if j.job_state and j.job_state == JobState.RUNNING]) +# assert len(JOBS_RUNNING) == job_running_parsed +# +# job_held_parsed = len([j for j in job_list if j.job_state and j.job_state == JobState.QUEUED_HELD]) +# assert JOBS_HELD == job_held_parsed +# +# job_queued_parsed = len([j for j in job_list if j.job_state and j.job_state == JobState.QUEUED]) +# assert JOBS_QUEUED == job_queued_parsed +# +# parsed_running_users = [j.job_owner for j in job_list if j.job_state and j.job_state == JobState.RUNNING] +# assert set(USERS_RUNNING) == set(parsed_running_users) +# +# parsed_running_jobs = [j.job_id for j in job_list if j.job_state and j.job_state == JobState.RUNNING] +# assert set(JOBS_RUNNING) == set(parsed_running_jobs) +# +# assert job_dict['863553'].requested_wallclock_time_seconds, 30 * 60 +# assert job_dict['863553'].wallclock_time_seconds, 29 * 60 + 29 +# assert job_dict['863553'].dispatch_time, datetime.datetime(2013, 5, 23, 11, 44, 11) +# assert job_dict['863553'].submission_time, datetime.datetime(2013, 5, 23, 10, 42, 11) +# +# assert job_dict['863100'].annotation == 'Resources' +# assert job_dict['863100'].num_machines == 32 +# assert job_dict['863100'].num_mpiprocs == 1024 +# assert job_dict['863100'].queue_name == 'normal' +# +# assert job_dict['861352'].title == 'Pressure_PBEsol_0' +# +# assert job_dict['863554'].requested_wallclock_time_seconds is None +# +# # allocated_machines is not implemented in this version of the plugin +# # for j in job_list: +# # if j.allocated_machines: +# # num_machines = 0 +# # num_mpiprocs = 0 +# # for n in j.allocated_machines: +# # num_machines += 1 +# # num_mpiprocs += n.num_mpiprocs +# # +# # self.assertTrue( j.num_machines==num_machines ) +# # self.assertTrue( j.num_mpiprocs==num_mpiprocs ) +# +#def test_parse_failed_squeue_output(self): +# """Test that _parse_joblist_output reacts as expected to failures.""" +# scheduler = SlurmScheduler() +# +# # non-zero return value should raise +# with pytest.raises(SchedulerError, match='squeue returned exit code 1'): +# scheduler._parse_joblist_output(1, TEXT_SQUEUE_TO_TEST, '') +# +# # non-empty stderr should be logged +# with self.assertLogs(scheduler.logger, logging.WARNING): +# scheduler._parse_joblist_output(0, TEXT_SQUEUE_TO_TEST, 'error message') +# +# +#@pytest.mark.parametrize( +# 'value,expected', +# [ +# ('2', 2 * 60), +# ('02', 2 * 60), +# ('02:3', 2 * 60 + 3), +# ('02:03', 2 * 60 + 3), +# ('1:02:03', 3600 + 2 * 60 + 3), +# ('01:02:03', 3600 + 2 * 60 + 3), +# ('1-3', 86400 + 3 * 3600), +# ('01-3', 86400 + 3 * 3600), +# ('01-03', 86400 + 3 * 3600), +# ('1-3:5', 86400 + 3 * 3600 + 5 * 60), +# ('01-3:05', 86400 + 3 * 3600 + 5 * 60), +# ('01-03:05', 86400 + 3 * 3600 + 5 * 60), +# ('1-3:5:7', 86400 + 3 * 3600 + 5 * 60 + 7), +# ('01-3:05:7', 86400 + 3 * 3600 + 5 * 60 + 7), +# ('01-03:05:07', 86400 + 3 * 3600 + 5 * 60 + 7), +# ('UNLIMITED', 2**31 - 1), +# ('NOT_SET', None), +# ], +#) +#def test_time_conversion(value, expected): +# """Test conversion of (relative) times. +# +# From docs, acceptable time formats include +# "minutes", "minutes:seconds", "hours:minutes:seconds", +# "days-hours", "days-hours:minutes" and "days-hours:minutes:seconds". +# """ +# scheduler = SlurmScheduler() +# assert scheduler._convert_time(value) == expected +# +# +#def test_time_conversion_errors(caplog): +# """Test conversion of (relative) times for bad inputs.""" +# scheduler = SlurmScheduler() +# +# # Disable logging to avoid excessive output during test +# with caplog.at_level(logging.CRITICAL): +# with pytest.raises(ValueError, match='Unrecognized format for time string.'): +# # Empty string not valid +# scheduler._convert_time('') +# with pytest.raises(ValueError, match='Unrecognized format for time string.'): +# # there should be something after the dash +# scheduler._convert_time('1-') +# with pytest.raises(ValueError, match='Unrecognized format for time string.'): +# # there should be something after the dash +# # there cannot be a dash after the colons +# scheduler._convert_time('1:2-3')