Skip to content

Commit

Permalink
Add test for parsing submit output to jobid
Browse files Browse the repository at this point in the history
  • Loading branch information
unkcpz committed Jun 4, 2024
1 parent 950d8d1 commit fd11895
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 145 deletions.
2 changes: 1 addition & 1 deletion aiida_hyperqueue/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def _get_submit_command(self, submit_script: str) -> str:
submit_script: the path of the submit script relative to the working
directory.
"""
submit_command = f"hq job submit {submit_script}"
submit_command = f"hq submit {submit_script}"

self.logger.info(f"Submitting with: {submit_command}")

Expand Down
18 changes: 9 additions & 9 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,18 +297,18 @@ def command(
environment.update(env)
stderr = subprocess.DEVNULL if ignore_stderr else subprocess.STDOUT

process = subprocess.Popen(
final_args,
stdout=subprocess.PIPE,
stderr=stderr,
cwd=cwd,
env=environment,
stdin=subprocess.PIPE if stdin is not None else subprocess.DEVNULL,
)
if not wait:
return subprocess.Popen(final_args, stderr=stderr, cwd=cwd, env=environment)
return process

else:
process = subprocess.Popen(
final_args,
stdout=subprocess.PIPE,
stderr=stderr,
cwd=cwd,
env=environment,
stdin=subprocess.PIPE if stdin is not None else subprocess.DEVNULL,
)
stdout = process.communicate(stdin)[0].decode()
if process.returncode != 0:
if expect_fail:
Expand Down
295 changes: 160 additions & 135 deletions tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,23 @@
from .conftest import HqEnv
from .utils import wait_for_job_state

@pytest.fixture
def valid_submit_script():
scheduler = HyperQueueScheduler()

job_tmpl = JobTemplate()
job_tmpl.job_name = 'echo hello'
job_tmpl.shebang = '#!/bin/bash'
job_tmpl.uuid = str(uuid.uuid4())
job_tmpl.job_resource = scheduler.create_job_resource(num_cpus=1)
job_tmpl.max_wallclock_seconds = 24 * 3600
tmpl_code_info = JobTemplateCodeInfo()
tmpl_code_info.cmdline_params = ['echo', 'Hello']
job_tmpl.codes_info = [tmpl_code_info]
job_tmpl.codes_run_mode = CodeRunMode.SERIAL

return scheduler.get_submit_script(job_tmpl)

def test_resource_validation():
"""Tests to verify that resources are correctly validated."""
resource = HyperQueueJobResource(num_cpus=16, memory_mb=20)
Expand Down Expand Up @@ -49,125 +66,29 @@ def test_resource_validation():
):
HyperQueueJobResource(num_cpus=4, memory_mb=1.2)

def test_submit_command():
"""Test submit command"""
scheduler = HyperQueueScheduler()

#def test_parse_common_joblist_output():
# """Test whether _parse_joblist_output can parse the squeue output"""
# scheduler = SlurmScheduler()
#
# retval = 0
# stdout = TEXT_SQUEUE_TO_TEST
# stderr = ''
#
# job_list = scheduler._parse_joblist_output(retval, stdout, stderr)
# job_dict = {j.job_id: j for j in job_list}
#
# # The parameters are hard coded in the text to parse
# job_parsed = len(job_list)
# assert job_parsed == JOBS_ON_CLUSTER
#
# job_running_parsed = len([j for j in job_list if j.job_state and j.job_state == JobState.RUNNING])
# assert len(JOBS_RUNNING) == job_running_parsed
#
# job_held_parsed = len([j for j in job_list if j.job_state and j.job_state == JobState.QUEUED_HELD])
# assert JOBS_HELD == job_held_parsed
#
# job_queued_parsed = len([j for j in job_list if j.job_state and j.job_state == JobState.QUEUED])
# assert JOBS_QUEUED == job_queued_parsed
#
# parsed_running_users = [j.job_owner for j in job_list if j.job_state and j.job_state == JobState.RUNNING]
# assert set(USERS_RUNNING) == set(parsed_running_users)
#
# parsed_running_jobs = [j.job_id for j in job_list if j.job_state and j.job_state == JobState.RUNNING]
# assert set(JOBS_RUNNING) == set(parsed_running_jobs)
#
# assert job_dict['863553'].requested_wallclock_time_seconds, 30 * 60
# assert job_dict['863553'].wallclock_time_seconds, 29 * 60 + 29
# assert job_dict['863553'].dispatch_time, datetime.datetime(2013, 5, 23, 11, 44, 11)
# assert job_dict['863553'].submission_time, datetime.datetime(2013, 5, 23, 10, 42, 11)
#
# assert job_dict['863100'].annotation == 'Resources'
# assert job_dict['863100'].num_machines == 32
# assert job_dict['863100'].num_mpiprocs == 1024
# assert job_dict['863100'].queue_name == 'normal'
#
# assert job_dict['861352'].title == 'Pressure_PBEsol_0'
#
# assert job_dict['863554'].requested_wallclock_time_seconds is None
#
# # allocated_machines is not implemented in this version of the plugin
# # for j in job_list:
# # if j.allocated_machines:
# # num_machines = 0
# # num_mpiprocs = 0
# # for n in j.allocated_machines:
# # num_machines += 1
# # num_mpiprocs += n.num_mpiprocs
# #
# # self.assertTrue( j.num_machines==num_machines )
# # self.assertTrue( j.num_mpiprocs==num_mpiprocs )
#
#def test_parse_failed_squeue_output(self):
# """Test that _parse_joblist_output reacts as expected to failures."""
# scheduler = SlurmScheduler()
#
# # non-zero return value should raise
# with pytest.raises(SchedulerError, match='squeue returned exit code 1'):
# scheduler._parse_joblist_output(1, TEXT_SQUEUE_TO_TEST, '')
#
# # non-empty stderr should be logged
# with self.assertLogs(scheduler.logger, logging.WARNING):
# scheduler._parse_joblist_output(0, TEXT_SQUEUE_TO_TEST, 'error message')
#
#
#@pytest.mark.parametrize(
# 'value,expected',
# [
# ('2', 2 * 60),
# ('02', 2 * 60),
# ('02:3', 2 * 60 + 3),
# ('02:03', 2 * 60 + 3),
# ('1:02:03', 3600 + 2 * 60 + 3),
# ('01:02:03', 3600 + 2 * 60 + 3),
# ('1-3', 86400 + 3 * 3600),
# ('01-3', 86400 + 3 * 3600),
# ('01-03', 86400 + 3 * 3600),
# ('1-3:5', 86400 + 3 * 3600 + 5 * 60),
# ('01-3:05', 86400 + 3 * 3600 + 5 * 60),
# ('01-03:05', 86400 + 3 * 3600 + 5 * 60),
# ('1-3:5:7', 86400 + 3 * 3600 + 5 * 60 + 7),
# ('01-3:05:7', 86400 + 3 * 3600 + 5 * 60 + 7),
# ('01-03:05:07', 86400 + 3 * 3600 + 5 * 60 + 7),
# ('UNLIMITED', 2**31 - 1),
# ('NOT_SET', None),
# ],
#)
#def test_time_conversion(value, expected):
# """Test conversion of (relative) times.
#
# From docs, acceptable time formats include
# "minutes", "minutes:seconds", "hours:minutes:seconds",
# "days-hours", "days-hours:minutes" and "days-hours:minutes:seconds".
# """
# scheduler = SlurmScheduler()
# assert scheduler._convert_time(value) == expected
#
#
#def test_time_conversion_errors(caplog):
# """Test conversion of (relative) times for bad inputs."""
# scheduler = SlurmScheduler()
#
# # Disable logging to avoid excessive output during test
# with caplog.at_level(logging.CRITICAL):
# with pytest.raises(ValueError, match='Unrecognized format for time string.'):
# # Empty string not valid
# scheduler._convert_time('')
# with pytest.raises(ValueError, match='Unrecognized format for time string.'):
# # there should be something after the dash
# scheduler._convert_time('1-')
# with pytest.raises(ValueError, match='Unrecognized format for time string.'):
# # there should be something after the dash
# # there cannot be a dash after the colons
# scheduler._convert_time('1:2-3')
assert scheduler._get_submit_command('job.sh') == "hq submit job.sh"

def test_parse_submit_command_output(hq_env: HqEnv, valid_submit_script):
"""Test parsing the output of submit command"""
hq_env.start_server()
hq_env.start_worker(cpus="2")
Path("_aiidasubmit.sh").write_text(valid_submit_script)

process = hq_env.command(["submit", "_aiidasubmit.sh"], wait=False, ignore_stderr=True)
stdout = process.communicate()[0].decode()
stderr = ""
retval = process.returncode

assert retval == 0

scheduler = HyperQueueScheduler()
job_id = scheduler._parse_submit_output(retval, stdout, stderr)

assert job_id == "1"

def test_submit_script():
"""Test the creation of a simple submission script."""
Expand Down Expand Up @@ -216,26 +137,11 @@ def test_submit_script_mem_not_specified():

assert "#HQ --resource mem" not in submit_script_text

def test_submit_script_is_hq_valid(hq_env: HqEnv):
def test_submit_script_is_hq_valid(hq_env: HqEnv, valid_submit_script):
"""The generated script can actually be run by hq"""
scheduler = HyperQueueScheduler()

job_tmpl = JobTemplate()
job_tmpl.job_name = 'echo hello'
job_tmpl.shebang = '#!/bin/bash'
job_tmpl.uuid = str(uuid.uuid4())
job_tmpl.job_resource = scheduler.create_job_resource(num_cpus=1)
job_tmpl.max_wallclock_seconds = 24 * 3600
tmpl_code_info = JobTemplateCodeInfo()
tmpl_code_info.cmdline_params = ['echo', 'Hello']
job_tmpl.codes_info = [tmpl_code_info]
job_tmpl.codes_run_mode = CodeRunMode.SERIAL

submit_script_text = scheduler.get_submit_script(job_tmpl)

hq_env.start_server()
hq_env.start_worker(cpus="2")
Path("_aiidasubmit.sh").write_text(submit_script_text)
Path("_aiidasubmit.sh").write_text(valid_submit_script)
hq_env.command(["submit", "_aiidasubmit.sh"])
table = hq_env.command(["job", "info", "1"], as_table=True)

Expand Down Expand Up @@ -327,3 +233,122 @@ def test_submit_script_is_hq_valid(hq_env: HqEnv):
# stderr = 'Batch job submission failed: Invalid account or account/partition combination specified'
# result = scheduler._parse_submit_output(1, '', stderr)
# assert result == CalcJob.exit_codes.ERROR_SCHEDULER_INVALID_ACCOUNT

#def test_parse_common_joblist_output():
# """Test whether _parse_joblist_output can parse the squeue output"""
# scheduler = SlurmScheduler()
#
# retval = 0
# stdout = TEXT_SQUEUE_TO_TEST
# stderr = ''
#
# job_list = scheduler._parse_joblist_output(retval, stdout, stderr)
# job_dict = {j.job_id: j for j in job_list}
#
# # The parameters are hard coded in the text to parse
# job_parsed = len(job_list)
# assert job_parsed == JOBS_ON_CLUSTER
#
# job_running_parsed = len([j for j in job_list if j.job_state and j.job_state == JobState.RUNNING])
# assert len(JOBS_RUNNING) == job_running_parsed
#
# job_held_parsed = len([j for j in job_list if j.job_state and j.job_state == JobState.QUEUED_HELD])
# assert JOBS_HELD == job_held_parsed
#
# job_queued_parsed = len([j for j in job_list if j.job_state and j.job_state == JobState.QUEUED])
# assert JOBS_QUEUED == job_queued_parsed
#
# parsed_running_users = [j.job_owner for j in job_list if j.job_state and j.job_state == JobState.RUNNING]
# assert set(USERS_RUNNING) == set(parsed_running_users)
#
# parsed_running_jobs = [j.job_id for j in job_list if j.job_state and j.job_state == JobState.RUNNING]
# assert set(JOBS_RUNNING) == set(parsed_running_jobs)
#
# assert job_dict['863553'].requested_wallclock_time_seconds, 30 * 60
# assert job_dict['863553'].wallclock_time_seconds, 29 * 60 + 29
# assert job_dict['863553'].dispatch_time, datetime.datetime(2013, 5, 23, 11, 44, 11)
# assert job_dict['863553'].submission_time, datetime.datetime(2013, 5, 23, 10, 42, 11)
#
# assert job_dict['863100'].annotation == 'Resources'
# assert job_dict['863100'].num_machines == 32
# assert job_dict['863100'].num_mpiprocs == 1024
# assert job_dict['863100'].queue_name == 'normal'
#
# assert job_dict['861352'].title == 'Pressure_PBEsol_0'
#
# assert job_dict['863554'].requested_wallclock_time_seconds is None
#
# # allocated_machines is not implemented in this version of the plugin
# # for j in job_list:
# # if j.allocated_machines:
# # num_machines = 0
# # num_mpiprocs = 0
# # for n in j.allocated_machines:
# # num_machines += 1
# # num_mpiprocs += n.num_mpiprocs
# #
# # self.assertTrue( j.num_machines==num_machines )
# # self.assertTrue( j.num_mpiprocs==num_mpiprocs )
#
#def test_parse_failed_squeue_output(self):
# """Test that _parse_joblist_output reacts as expected to failures."""
# scheduler = SlurmScheduler()
#
# # non-zero return value should raise
# with pytest.raises(SchedulerError, match='squeue returned exit code 1'):
# scheduler._parse_joblist_output(1, TEXT_SQUEUE_TO_TEST, '')
#
# # non-empty stderr should be logged
# with self.assertLogs(scheduler.logger, logging.WARNING):
# scheduler._parse_joblist_output(0, TEXT_SQUEUE_TO_TEST, 'error message')
#
#
#@pytest.mark.parametrize(
# 'value,expected',
# [
# ('2', 2 * 60),
# ('02', 2 * 60),
# ('02:3', 2 * 60 + 3),
# ('02:03', 2 * 60 + 3),
# ('1:02:03', 3600 + 2 * 60 + 3),
# ('01:02:03', 3600 + 2 * 60 + 3),
# ('1-3', 86400 + 3 * 3600),
# ('01-3', 86400 + 3 * 3600),
# ('01-03', 86400 + 3 * 3600),
# ('1-3:5', 86400 + 3 * 3600 + 5 * 60),
# ('01-3:05', 86400 + 3 * 3600 + 5 * 60),
# ('01-03:05', 86400 + 3 * 3600 + 5 * 60),
# ('1-3:5:7', 86400 + 3 * 3600 + 5 * 60 + 7),
# ('01-3:05:7', 86400 + 3 * 3600 + 5 * 60 + 7),
# ('01-03:05:07', 86400 + 3 * 3600 + 5 * 60 + 7),
# ('UNLIMITED', 2**31 - 1),
# ('NOT_SET', None),
# ],
#)
#def test_time_conversion(value, expected):
# """Test conversion of (relative) times.
#
# From docs, acceptable time formats include
# "minutes", "minutes:seconds", "hours:minutes:seconds",
# "days-hours", "days-hours:minutes" and "days-hours:minutes:seconds".
# """
# scheduler = SlurmScheduler()
# assert scheduler._convert_time(value) == expected
#
#
#def test_time_conversion_errors(caplog):
# """Test conversion of (relative) times for bad inputs."""
# scheduler = SlurmScheduler()
#
# # Disable logging to avoid excessive output during test
# with caplog.at_level(logging.CRITICAL):
# with pytest.raises(ValueError, match='Unrecognized format for time string.'):
# # Empty string not valid
# scheduler._convert_time('')
# with pytest.raises(ValueError, match='Unrecognized format for time string.'):
# # there should be something after the dash
# scheduler._convert_time('1-')
# with pytest.raises(ValueError, match='Unrecognized format for time string.'):
# # there should be something after the dash
# # there cannot be a dash after the colons
# scheduler._convert_time('1:2-3')

0 comments on commit fd11895

Please sign in to comment.