Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfixes #1

Draft
wants to merge 4 commits into
base: local
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions doc/source/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ Configuration

Some configuration is possible:

- ``PRISMS_JOBS_SOFTWARE``: (optional, default is detected automatically)

The job submission software can be chosen by setting the ``$PRISMS_JOBS_SOFTWARE``
environmental variable to ``torque`` or ``slurm``.

- ``PRISMS_JOBS_DIR``: (optional, default=``$HOME/.prisms_jobs``)

The jobs database is stored at ``$PBS_JOB_DIR/jobs.db``.
Expand Down
16 changes: 8 additions & 8 deletions prisms_jobs/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from __future__ import (absolute_import, division, print_function, unicode_literals)
from builtins import *

import imp
import importlib
import json
import os
import six
Expand Down Expand Up @@ -31,8 +31,13 @@ def detect_software():

* 'torque' - detected via 'qsub'
* 'slurm' - detected via 'sbatch'
* set manually with the PRISMS_JOBS_SOFTWARE environment variable
"""
if find_executable('qsub') is not None:
if os.environ.get('PRISMS_JOBS_SOFTWARE') == 'torque':
return 'torque'
elif os.environ.get('PRISMS_JOBS_SOFTWARE') == 'slurm':
return 'slurm'
elif find_executable('qsub') is not None:
return 'torque'
elif find_executable('sbatch') is not None:
return 'slurm'
Expand Down Expand Up @@ -67,12 +72,7 @@ def set_software(software_name=None):
import prisms_jobs.interface.slurm as software
else:
try:
f, filename, description = imp.find_module(software_name)
try:
software = imp.load_module(software_name, f, filename, description)
finally:
if f:
f.close()
software = importlib.import_module(software_name)
except:
raise Exception('Unrecognized \'software\': ' + software_name)
global __software
Expand Down
48 changes: 32 additions & 16 deletions prisms_jobs/interface/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ def _squeue(jobid=None, username=getlogin(), full=False, sformat=None): #pyli

else:
# First, get jobids that belong to that username using
# squeue (-h strips the header)
sopt = ["squeue", "-h", "-u", username]
# squeue (-h strips the header, %A displays only jobid)
sopt = ["squeue", "-h", "-u", username, "-o", "%A"]

qsout = run(sopt)[0]

Expand Down Expand Up @@ -106,7 +106,6 @@ def sub_string(job):
if job.account is not None:
jobstr += "#SBATCH -A {0}\n".format(job.account)
jobstr += "#SBATCH -t {0}\n".format(job.walltime)
jobstr += "#SBATCH -n {0}\n".format(job.nodes*job.ppn)
if job.pmem is not None:
jobstr += "#SBATCH --mem-per-cpu={0}\n".format(job.pmem)
if job.qos is not None:
Expand All @@ -119,12 +118,17 @@ def sub_string(job):
jobstr += "#SBATCH --mail-type=END\n"
if 'a' in job.message:
jobstr += "#SBATCH --mail-type=FAIL\n"
# SLURM does assignment to no. of nodes automatically
# jobstr += "#SBATCH -N {0}\n".format(job.nodes)
if job.queue is not None:
jobstr += "#SBATCH -p {0}\n".format(job.queue)
if job.gpus is not None: # GPUs do not use ppn, partition, # nodes (NERSC settings...)
jobstr += "#SBATCH --gpus={0}\n".format(job.gpus)
else:
if job.queue is not None:
jobstr += "#SBATCH -p {0}\n".format(job.queue)
jobstr += "#SBATCH -N {0}\n".format(job.nodes)
jobstr += "#SBATCH --ntasks-per-node={0}\n".format(job.ppn)
if job.constraint is not None:
jobstr += "#SBATCH --constraint={0}\n".format(job.constraint)
if job.exclude is not None:
jobstr += "#SBATCH -x {0}\n".format(job.exclude)
jobstr += "{0}\n".format(job.command)

return jobstr
Expand Down Expand Up @@ -258,18 +262,30 @@ def job_status(jobid=None):


# Look for timing info
m = re.search(r"RunTime=\s*([0-9]*:[0-9]*:[0-9]*)\s", line) #pylint: disable=invalid-name
m = re.search(r"RunTime=\s*([0-9]*-)?([0-9]*:[0-9]*:[0-9]*)\s", line) #pylint: disable=invalid-name
if m:
if m.group(1) == "Unknown":
if m.group(1) == "Unknown" or m.group(2) == "Unknown":
continue
hrs, mns, scs = m.group(1).split(":")
runtime = datetime.timedelta(hours=int(hrs), minutes=int(mns), seconds=int(scs))
jobstatus["elapsedtime"] = runtime.seconds
if m.group(1) == None:
days = 0
else:
days = m.group(1).split('-')[0]
hrs, mns, scs = m.group(2).split(":")
runtime = datetime.timedelta(days=int(days), hours=int(hrs), minutes=int(mns), seconds=int(scs))
jobstatus["elapsedtime"] = int(runtime.total_seconds())
continue

m = re.match(r"\S*\s*TimeLimit=\s*([0-9]*:[0-9]*:[0-9]*)\s", line) #pylint: disable=invalid-name
if m:
walltime = datetime.timedelta(hours=int(hrs), minutes=int(mns), seconds=int(scs))
jobstatus["walltime"] = walltime.seconds
m = re.match(r"\S*\s*TimeLimit=\s*([0-9]*-)?([0-9]*:[0-9]*:[0-9]*)\s", line) #pylint: disable=invalid-name
if m:
if m.group(1) == "Unknown" or m.group(2) == "Unknown":
continue
if m.group(1) == None:
days = 0
else:
days = m.group(1).split('-')[0]
hrs, mns, scs = m.group(2).split(":")
walltime = datetime.timedelta(days=int(days), hours=int(hrs), minutes=int(mns), seconds=int(scs))
jobstatus["walltime"] = int(walltime.total_seconds())
continue

# Grab the job start time
Expand Down
16 changes: 15 additions & 1 deletion prisms_jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ class Job(object): #pylint: disable=too-many-instance-attributes
pmem (str): Memory requsted. Ex: ``"3800mb"``
qos (str): Ex: ``"flux"``
queue (str): Ex: ``"fluxoe"``
constraint(str): Constraint. Ex: ``"haswell"``
gpus(int): How many gpus to request (slurm only). Ex: 4
exclude (str): Nodes to exclude (slurm only). Ex: ``"node01,node02,node03"``

exetime (str): Time after which the job is eligible for execution. Ex: ``"1100"``

Expand Down Expand Up @@ -82,19 +85,21 @@ class Job(object): #pylint: disable=too-many-instance-attributes
pmem (str): Memory requsted. Ex: ``"3800mb"``
qos (str): Ex: ``"flux"``
queue (str): Ex: ``"fluxoe"``
exclude (str): Nodes to exclude (slurm only). Ex: ``"node01,node02,node03"``
exetime (str): Time after which the job is eligible for execution. Ex: ``"1100"``
message (str): When to send email about the job. Ex: ``"abe"``
email (str): Where to send notifications. Ex: ``"[email protected]"``
priority (str): Priority ranges from (low) -1024 to (high) 1023. Ex: ``"-200"``
constraint (str): Constraint. Ex: ``"haswell"``
gpus (int) How many gpus to request (slurm only). Ex: 4
command (str): String with command to run by script. Ex: ``"echo \"hello\" > test.txt"``
auto (bool): Indicates an automatically re-submitting job. Ex: ``True``

"""


def __init__(self, name="STDIN", account=None, nodes=None, ppn=None, walltime=None, #pylint: disable=too-many-arguments, too-many-locals
pmem=None, qos=None, queue=None, exetime=None, message="a", email=None,
pmem=None, qos=None, queue=None, gpus=None, exclude=None, exetime=None, message="a", email=None,
priority="0", constraint=None, command=None, auto=False, substr=None):

if substr != None:
Expand Down Expand Up @@ -128,6 +133,15 @@ def __init__(self, name="STDIN", account=None, nodes=None, ppn=None, walltime=No
# queue string
self.queue = queue

# number of gpus to request
if gpus is not None:
self.gpus = int(gpus)
else:
self.gpus = None

# nodes to exclude
self.exclude = exclude

# time eligible for execution
# PBS -a exetime
# Declares the time after which the job is eligible for execution,
Expand Down
37 changes: 25 additions & 12 deletions prisms_jobs/jobdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def regexp(pattern, string):
return re.match(pattern, string) is not None

class JobDB(object): #pylint: disable=too-many-instance-attributes, too-many-public-methods
"""A primsms_jobs Job Database object
"""A prisms_jobs Job Database object

Usually this is called without arguments (prisms_jobs.JobDB()) to open or
create a database in the default location.
Expand Down Expand Up @@ -316,22 +316,35 @@ def update(self):
for key, jobstatus in iteritems(newstatus):
if jobstatus == "C":
self.curs.execute(
"UPDATE jobs SET jobstatus=?, elapsedtime=?, modifytime=? WHERE jobid=?",
("C", None, int(time.time()), key))
#elif jobstatus["qstatstr"] is None:
# self.curs.execute(
# "UPDATE jobs SET jobstatus=?, elapsedtime=?, modifytime=? WHERE jobid=?",
# (jobstatus["jobstatus"], jobstatus["elapsedtime"], int(time.time()), key))
else:
"UPDATE jobs SET jobstatus=?, elapsedtime=?, modifytime=?\
WHERE jobid=?",
("C", None, int(time.time()),
key))
# running jobs should have all fields updated because some may change,
# e.g. number of nodes from Q -> R may change
elif jobstatus["jobstatus"] == "R":
self.curs.execute(
"UPDATE jobs SET jobstatus=?, elapsedtime=?, starttime=?,\
completiontime=?, qstatstr=?, modifytime=? WHERE jobid=?",
completiontime=?, qstatstr=?, modifytime=?, nodes=?\
WHERE jobid=?",
(
jobstatus["jobstatus"], jobstatus["elapsedtime"],
jobstatus["starttime"], jobstatus["completiontime"],
jobstatus["qstatstr"], int(time.time()), key))

self.conn.commit()
jobstatus["qstatstr"], int(time.time()), jobstatus["nodes"],
key))
# all other statuses will have missing info, e.g. number of nodes
# therefore, all that must be done is to update the jobstatus,
# clear elapsedtime if it is there (e.g. running job -> check),
# and set the last modified time and qstatstring
else:
self.curs.execute(
"UPDATE jobs SET jobstatus=?, elapsedtime=?, modifytime=?,\
qstatstr=?\
WHERE jobid=?",
(jobstatus["jobstatus"], None, int(time.time()),
jobstatus["qstatstr"],
key))
self.conn.commit()

# update taskstatus for non-auto jobs
self.curs.execute(
Expand Down
7 changes: 5 additions & 2 deletions prisms_jobs/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def hours(walltime):
sys.exit()

def strftimedelta(seconds): #pylint: disable=redefined-outer-name
"""Convert seconds to D+:HH:MM:SS"""
"""Convert seconds to D-HH:MM:SS"""
seconds = int(seconds)

day_in_seconds = 24.0*3600.0
Expand All @@ -119,7 +119,10 @@ def strftimedelta(seconds): #pylint: disable=redefined-outer-name
minute = int(seconds/minute_in_seconds)
seconds -= minute*minute_in_seconds

return str(day) + ":" + ("%02d" % hour) + ":" + ("%02d" % minute) + ":" + ("%02d" % seconds)
if day == 0:
return ("%02d" % hour) + ":" + ("%02d" % minute) + ":" + ("%02d" % seconds)
else:
return str(day) + "-" + ("%02d" % hour) + ":" + ("%02d" % minute) + ":" + ("%02d" % seconds)

def exetime(deltatime):
"""Get the exetime string for the PBS '-a'option from a [[[DD:]MM:]HH:]SS string
Expand Down