Skip to content

Commit

Permalink
Merge branch 'hpc' (PR #18)
Browse files Browse the repository at this point in the history
These commits implement the ability to launch PBS jobs in tests via the HPC
instance and provide a framework for other HPC workload managers (i.e. SLURM)
to be added.

These commits add a new class in the HPC module, PBS, which is used to
translate arguments into a PBS run-able script which is then passed to the HPC
class which runs the command via subprocess.Popen and provides logging of the
HPC command.
  • Loading branch information
MiCurry committed May 4, 2020
2 parents a512e58 + 1661943 commit 0689827
Show file tree
Hide file tree
Showing 10 changed files with 305 additions and 38 deletions.
6 changes: 3 additions & 3 deletions envs/cheyenne.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ Description:
HPC: PBS

PBS_OPTIONS:
- A: NM00013
- q: regular
- j: oe
A: NMMM0013
q: share
j: oe

Modsets:
###############
Expand Down
211 changes: 180 additions & 31 deletions smarts/hpc.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,193 @@
import subprocess
import types
import os
import signal
import sys
from multiprocessing import shared_memory

class HPC:
def __init__(self, type, *args, **kwargs):
self.type = type
return
def __init__(self, type):
self.type = type.upper()

def launch_job(self, command, PBS_OPTIONS=None, SLURM_OPTIONS=None, *args, **kwargs):
# Launch a job onto the self.type HPC with the command
# Kwargs: timeout = length?, blocking
def __str__(self):
return self.type

# Grab the ID of the job, and report its initial qstat reference to the user (if running in
# nonblocking).
def __rpr__(self):
return self.type

# TODO: Don't allow jobs to submit to a interactive job
def __eq__(self, other):
return (self.type == other)

if self.type == "PBS":
pass
elif self.type == "SLURM":
pass
else:
print("That Batch scheduler is not a valid batch scheduler!", self.type)
def init_logging(self, fname):
""" Opening fname for logging, and set self.log to it. """
self.log = open(fname, 'w')
return

def close_logging(self):
""" Flush self.log and close it """
self.log.flush()
self.log.close()
return

def log_cmd(self, cmd):
""" Log cmd, a list, and add '\n' to it """
self.log.write(str(cmd)+'\n')

def launch_job(self, cmd, name, **kwargs):
""" Run cmd in subprocess.Popen. Cmd should be a list of arguments, without whitespace.
This function will open a log and record all commands and output from the HPC scheduler and
return the returncode of cmd.
This function will only be called via the PBS.launch_job or another HPC derived class.
"""
self.init_logging('smarts-hpc.'+str(name)+'.log')
self.log_cmd(cmd)
self.log.flush()

# Combine STDOUT and STDERR
job = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

# Terminate the HPC blocking job if we recive SIGINT - As it appears to not stop
# automatically
try:
job.wait()
except KeyboardInterrupt as e:
job.terminate()
stdout = str(job.stdout.read().decode('utf-8'))
self.log.write(stdout)
self.log.close()
print(e)
sys.exit(-1)

def cancel_job(self, job):
# Cancel the HPC job, job
pass
stdout = str(job.stdout.read().decode('utf-8'))

self.log.write(stdout)
self.close_logging()

return job.returncode

def wait(self, job, timeout):
# Wait on a job to finish - Easiest way to wait on a job might be to look for the file that
# is created at the end, rather then just running `qstat -fu $USER`, which bogs down the system
# or `qstat -u $USER` which updates every 1 minute
class PBS(HPC):
def launch_script(self, script, **kwargs):
""" Submit script to the PBS job queue and block until it finishes. Upon a succesfully run
of qsub, if the job is accepted by pbs, runs and returns, True will be returned. If the job
is not able to be submitted then False will be returned and a log file will contain the
qsub error messages.
# At the end of the timeout, call `qstat -fu $USER` and report status to the user
if self.type == "PBS":
pass
elif self.type == "SLURM":
pass
The qsub command will be called with the -Wblock=true option.
Optional keyword arguments:
cl_options - optional - A list of command line options to pass to the qsub command, similar
to args used in Python's suprocess.Popen:
['-r', 'n', '-M', '[email protected]']
"""
cmd = ['qsub', '-Wblock=true']

cl_options = kwargs.get('cl_options', None)
if cl_options:
if not isinstance(cl_options, list):
raise TypeError("cl_options must be a list")

for opts in cl_options:
cmd.append(opts)

if not os.path.isfile(script):
print("ERROR: HPC Could not find this PBS job script: ", script)
return False
else:
print("That Batch scheduler is not a valid batch scheduler!", self.type)
sys.exit(-1)
pass
cmd.append(script)

name = script.lstrip('./')

if HPC.launch_job(self, cmd, name) == 0:
return True
else:
return False

def launch_job(self, executables, name, wallTime, queue, nNodes, ncpus, nMPI, **kwargs):
""" Create a PBS job script (by deafult script.pbs) and submit it to PBS using qsub. Based
on the agrugments described below. The job will block until execution is complete. If the
job is able to be submitted without errors and finishes, True will be returned, else False
will be returned.
executables - A list of exectuables to preform for this job. For instance, source a
environment file, and launch the init_atmosphere core. SMARTS will create a
job script for PBS and place these exectuables in the order they appear.
Example:
executables = ["ulimit -s unlimited",
"source ~/setup_cheyenne",
"mpiexec_mpt ./init_atmosphere"]
name - A string that contains the desired name for this job
wallTime - The wall time in HH:MM:SS.
queue - The desired queue.
nNodes - The number of nodes
ncpus - The number of CPUs to use per node
nMPI - The number of MPI tasks per node.
Optional Keyword Arguments:
shell - The shell to use to launch the job script and the executables. Default is
'#!/usr/bin/env bash'
pbs_options - A dictionary of additional PBS options. Where the key of each item
is the PBS option and the value assocaited with each key is the
desired value. Options will be added to the job script in the
order the appear. Default is None.
For instance to set and account key:
pbs_options = { 'A' : "A000001" }
script_name - Optional name to name the job script. Default is 'script.pbs'.
"""

shell = kwargs.get('shell', '#!/usr/bin/env bash')
pbs_options = kwargs.get('pbs_options', None)
script_name = kwargs.get('script_name', 'script.pbs')

opts = [shell, '\n']

if pbs_options:
for key, value in pbs_options.items():
if '-' not in key:
opts += ['#PBS', '-'+key, value, '\n']
else:
opts += ['#PBS', key, value, '\n']

opts += ['#PBS', '-N', name, '\n']
opts += ['#PBS', '-j', 'oe', '\n']
opts += ['#PBS', '-q', queue, '\n']
opts += ['#PBS', '-l', 'walltime='+wallTime, '\n']
opts += ['#PBS', '-l']
opts += ['select='+str(nNodes)+':'+'ncpus='+str(ncpus)+':'+'mpiprocs='+str(nMPI),'\n']
opts += ['\n']

if isinstance(executables, list):
for line in executables:
opts += [line, '\n']
else:
opts += [executables, '\n']

script = open(script_name, 'w')

for i in range(len(opts)):
if opts[i] == '\n':
script.write(opts[i])
else:
script.write(opts[i])

if i != len(opts) - 1:
if opts[i] != '\n' and opts[i+1] != '\n':
script.write(' ')
script.close()

qsub_cmd = ['qsub', '-Wblock=true', script_name]

if HPC.launch_job(self, qsub_cmd, script_name) == 0:
return True
else:
return False


def init_hpc(hpcType):
""" If hpcType is the name of a HPC class, return and instance of that class, if not return
None. """
if hpcType.upper() == "PBS":
return PBS(hpcType.upper())
else:
return None
7 changes: 3 additions & 4 deletions smarts/testManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from multiprocessing import Process

from smarts.reporters.reporter import Result
from smarts import hpc


NOT_IMPLEMENTED_ERROR = "IS NOT YET IMPLEMENTED"
Expand Down Expand Up @@ -110,10 +111,9 @@ def __init__(self, env, testDir, srcDir, *args, **kwargs):
self.launch_names = None

# Based on if the env we have is an HPC or not, initalize the HPC
if env.hpc == True:
if env.hpc:
# Initalize HPC
raise NotImplementedError("HPC COMPATABLITY IS NOT YET IMPLEMENTED")
pass
self.hpc = hpc.init_hpc(env.hpc)
else:
self.hpc = None

Expand Down Expand Up @@ -352,7 +352,6 @@ def run_tests(self, tests, *args, **kwargs):
num_tests = len(tests)
finished = 0
avaliable_cpus = self.env.ncpus
self.hpc = None
run = True
requested_test_names = tests

Expand Down
Empty file added tests/pbs_test/__init__.py
Empty file.
56 changes: 56 additions & 0 deletions tests/pbs_test/pbs_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import os
import shutil

class pbs_test:
test_name = "Test HPC.launch_job - PBS"
test_description = "Launch a PBS job via HPC.launch_job"
nCPUs = 1
test_dependencies = None

def run(self, env, result, srcDir, testDir, hpc):
print("PBS_TEST: Starting pbs_test ...")

if hpc != 'PBS':
result.result = "FAILED"
result.msg = "The HPC type was not 'PBS'"
return

# Copy the executable from the test directory into the test's run directory
shutil.copy(os.path.join(testDir, 'pbs_test', 'sleep30.sh'), './sleep30.sh')

executable = ['printf "starting job\\n"',
"./sleep30.sh",
'printf "job finished\\n"']
name = "PBS_Test"
wallTime = "00:00:35"
queue = "share"
nNodes = "1"
ncpus = "1"
procs = "1"
options = env.env['PBS_OPTIONS']

print("PBS_TEST: launching pbs job via hpc.launch_job ...")
hpc_result = hpc.launch_job(executable, name, wallTime, queue, nNodes, ncpus, procs,
pbs_options=options,
script_name="my_script.pbs")
print("PBS_TEST: HPC job finished... checking results")

if not hpc_result:
result.result = "FAILED"
result.msg = "Failed to launch HPC job"
return

if not os.path.isfile('log'):
result.result = "FAILED"
result.msg = "Log file was not present after pbs run"
return

log = open('./log', 'r')
if log.read() != 'success':
result.result = "FAILED"
result.msg = "File did not contain 'success'"
return

result.result = "PASSED"
result.msg = "PBS HPC.launch_job was succesful"
return
5 changes: 5 additions & 0 deletions tests/pbs_test/sleep30.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/usr/bin/env bash

sleep 30
printf "success" >> log
exit 0
Empty file.
44 changes: 44 additions & 0 deletions tests/pbs_test_script/pbs_test_script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import os
import shutil

class pbs_test_script:
test_name = "Launch Script Test - PBS"
test_description = "PBS Test Script - Submit a PBS job via HPC.launch_script"
nCPUs = 1
test_dependencies = None

def run(self, env, result, srcDir, testDir, hpc):
print("PBS_TEST_SCRIPT: Starting test...")

if hpc != 'PBS':
result.result = "FAILED"
result.msg = "The HPC type was not 'PBS'"
return

# Copy the executable from the test directory into the test's run directory
shutil.copy(os.path.join(testDir, 'pbs_test_script', 'script.pbs'), './script.pbs')
shutil.copy(os.path.join(testDir, 'pbs_test_script', 'sleep30.sh'), './sleep30.sh')

print("PBS_TEST_SCRIPT: Launching PBS job via hpc.launch_script ...")
hpc = hpc.launch_script('./script.pbs')
print("PBS_TEST_SCRIPT: Job finished ... checking results")
if not hpc:
result.result = "FAILED"
result.msg = "Problem launching HPC job"
return

if not os.path.isfile('log'):
result.result = "FAILED"
result.msg = "Log file was not present after pbs run"
return

log = open('./log', 'r')
if log.read() != 'success':
result.result = "FAILED"
result.msg = "File did not contain 'success'"
return


result.result = "PASSED"
result.msg = "PBS script test was succesful"
return
9 changes: 9 additions & 0 deletions tests/pbs_test_script/script.pbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/usr/bin/env bash
#PBS -N smarts_pbs_test_2
#PBS -A NMMM0013
#PBS -l walltime=00:00:40
#PBS -q economy
#PBS -l select=1:ncpus=1:mpiprocs=1

./sleep30.sh
exit 0
5 changes: 5 additions & 0 deletions tests/pbs_test_script/sleep30.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/usr/bin/env bash

sleep 30
printf "success" >> log
exit 0

0 comments on commit 0689827

Please sign in to comment.