Skip to content

Commit

Permalink
Merge branch 'master' of github.com:dmwm/ProdCommon
Browse files Browse the repository at this point in the history
  • Loading branch information
belforte committed Jun 12, 2014
2 parents f7e8b5d + 3c3dd6b commit 45d2d71
Show file tree
Hide file tree
Showing 4 changed files with 344 additions and 4 deletions.
2 changes: 1 addition & 1 deletion src/python/ProdCommon/BossLite/Scheduler/SchedulerARC.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ def getClusters(self):
elif not c["cluster"] and line.find("Name:") >= 0:
c["cluster"] = line.split(': ')[1]

clusters.append(c)
if c["cluster"]: clusters.append(c)
return clusters


Expand Down
1 change: 1 addition & 0 deletions src/python/ProdCommon/BossLite/Scheduler/SchedulerPbsv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def submitJob ( self, job, task=None, requirements=''):
s.append('# This script generated by CRAB2 from http://cms.cern.ch')
s.append('#PBS -e %s:%stmp_%s' % (self.hostname, self.jobResDir, job['standardError']) )
s.append('#PBS -o %s:%stmp_%s' % (self.hostname, self.jobResDir, job['standardOutput']) )
s.append('#PBS -q %s' % (self.queue) )
s.append('#PBS -N CMS_CRAB2')
if self.resources:
resourceList = self.resources.split(',')
Expand Down
7 changes: 4 additions & 3 deletions src/python/ProdCommon/BossLite/Scheduler/SchedulerSge.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,10 @@ def queryLocal(self, schedIdList, objType='node' ) :
"""
ret_map={}
#print schedIdList, service, objType
r = re.compile("(\d+) .* "+os.getlogin()+" \W+(\w+) .* (\S+)@(\w+)")
rnohost = re.compile("(\d+) .* "+os.getlogin()+" \W+(\w+) ")
cmd='qstat -u '+os.getlogin()
import pwd
r = re.compile("(\d+) .* "+pwd.getpwuid(os.geteuid()).pw_name+" \W+(\w+) .* (\S+)@(\w+)")
rnohost = re.compile("(\d+) .* "+pwd.getpwuid(os.geteuid()).pw_name+" \W+(\w+) ")
cmd='qstat -u '+pwd.getpwuid(os.geteuid()).pw_name
#print cmd
out, ret = self.ExecuteCommand(cmd)
#print "<"+out+">"
Expand Down
338 changes: 338 additions & 0 deletions src/python/ProdCommon/BossLite/Scheduler/SchedulerSlurm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,338 @@
#!/usr/bin/env python
"""
BossLite SLURM interface
Written by [email protected], based on modifications by [email protected]
Based on the Pbsv2 interface
"""

__revision__ = "$Id: "
__version__ = "$Revision: 1.4 $"

import re, os, time, uuid
import tempfile, os.path
import subprocess, re, socket
import shutil

from ProdCommon.BossLite.Scheduler.SchedulerInterface import SchedulerInterface
from ProdCommon.BossLite.Common.Exceptions import SchedulerError
from ProdCommon.BossLite.DbObjects.Job import Job
from ProdCommon.BossLite.DbObjects.Task import Task
from ProdCommon.BossLite.DbObjects.RunningJob import RunningJob

class SchedulerSlurm (SchedulerInterface) :
"""
basic class to handle pbs jobs
"""
def __init__( self, **args):
super(SchedulerSlurm, self).__init__(**args)
print("BossLite.SchedulerSlurm.__init__: args = ", args)
self.jobScriptDir = args['jobScriptDir']
self.jobResDir = args['jobResDir']
self.queue = args['queue']
self.workerNodeWorkDir = args.get('workernodebase', '')
if not self.workerNodeWorkDir:
self.workerNodeWorkDir='/scratch/' + os.environ['LOGNAME']
print("BossLite.SchedulerSlurm.__init__: workernodebase not set, using default = ", self.workerNodeWorkDir)

self.hostname = args.get('hostname', None)
if not self.hostname:
self.hostname = socket.gethostname()
self.resources = args.get('resources', '')
self.use_proxy = args.get('use_proxy', True)
self.forceTransferFiles= args.get('forcetransferfiles', 0)

self.res_dict = {}
self.proxy_location = os.environ.get( 'X509_USER_PROXY', \
'/tmp/x509up_u'+ repr(os.getuid()) )

self.status_map={'E':'R',
'H':'SS',
'Q':'SS',
'R':'R',
'S':'R',
'T':'R',
'W':'SS',
'PD':'SS',
'Done':'SD',
'C':'SD',
'CG':'SD'}

def jobDescription ( self, obj, requirements='', config='', service = '' ):
"""
retrieve scheduler specific job description
return it as a string
"""
raise NotImplementedError

def submit ( self, obj, requirements='', config='', service = '' ) :
"""
set up submission parameters and submit
return jobAttributes, bulkId, service
- jobAttributs is a map of the format
jobAttributes[ 'name' : 'schedulerId' ]
- bulkId is an eventual bulk submission identifier
- service is a endpoit to connect withs (such as the WMS)
"""

if type(obj) == RunningJob or type(obj) == Job:
map, taskId, queue = self.submitJob(obj, requirements)
elif type(obj) == Task :
map, taskId, queue = self.submitTask (obj, requirements )

return map, taskId, queue

def submitTask ( self, task, requirements=''):

ret_map={}
for job in task.getJobs() :
map, taskId, queue = self.submitJob(job, task, requirements)
ret_map.update(map)

return ret_map, taskId, queue

def submitJob ( self, job, task=None, requirements=''):
""" Need to copy the inputsandbox to WN before submitting a job"""
# Write a temporary submit script
# NB: we assume an env var SLURM_JOBCOOKIE points to the exec dir on the batch host

inputFiles = task['globalSandbox'].split(',')
pbsScript = tempfile.NamedTemporaryFile()
epilogue = tempfile.NamedTemporaryFile( prefix = 'epilogue.' )
if not self.workerNodeWorkDir:
self.workerNodeWorkDir = os.path.join( os.getcwd(), 'CRAB-SLURM' )
if not os.path.exists( self.workerNodeWorkDir ):
os.mkdir( self.workerNodeWorkDir )

self.stageDir = os.path.join( os.getcwd(), 'CRAB-SLURM' )
if not os.path.exists( self.stageDir ):
os.mkdir( self.stageDir )

# Generate a UUID for transfering input files
randomPrefix = uuid.uuid4().hex

# Begin building SLURM script
s=[]
s.append('#!/bin/sh')
s.append('# This script generated by CRAB2 from http://cms.cern.ch')
s.append('#SBATCH --error %swrapper_%s' % (self.jobResDir, job['standardError']) )
s.append('#SBATCH --output %swrapper_%s' % (self.jobResDir, job['standardOutput']) )
s.append('#SBATCH --job-name CMS_CRAB2')
#if self.resources:
# resourceList = self.resources.split(',')
# for resource in resourceList:
# s.append('#SBATCH -l %s' % resource)

#s.append('ls -lah')
#s.append('pwd')
#s.append('set -x')
#s.append('#SBATCH -T %s' % os.path.abspath(epilogue.name))

# get files for stagein
fileList = []
inputFiles = task['globalSandbox'].split(',')

# Do we want the proxy?
if self.use_proxy:
if os.path.exists( self.proxy_location ):
inputFiles.append( self.proxy_location )
else:
raise SchedulerError('Proxy Error',"Proxy not found at %s" % self.proxy_location)


if self.queue:
s.append('#SBATCH --partition %s' % self.queue)

s.append('mkdir -p %s' % self.workerNodeWorkDir)

for file in inputFiles:
targetFile = os.path.abspath( os.path.join( self.workerNodeWorkDir,
"%s-%s" % (randomPrefix, os.path.basename( file ) ) ) )
stageFile = os.path.abspath( os.path.join( self.stageDir,
"%s-%s" % (randomPrefix, os.path.basename( file ) ) ) )
if self.forceTransferFiles:
raise Exception("forceTransferFiles not implemented")
s.append('#SBATCH -W stagein=%s@%s:%s' % (targetFile, self.hostname, file))
else:
s.append('cp %s %s' % ( os.path.abspath(file), targetFile ) )

#if fileList:
# s.append('#SLURM -W stagein=%s' % ','.join(fileList))

# Inform SLURM of what we want to stage out
fileList = []
for file in job['outputFiles']:
targetFile = os.path.abspath( os.path.join( self.workerNodeWorkDir,
"%s-%s" % (randomPrefix, os.path.basename( file ) ) ) )
stageFile = os.path.abspath( os.path.join( task['outputDirectory'],
file ) )
if self.forceTransferFiles:
raise Exception("forceTransferFiles not implemented")
s.append('#SBATCH -W stageout=%s@%s:%s' % \
(targetFile,
self.hostname,
stageFile) ) # get out of $HOME
else:
s.append('cp -f %s %s' % (targetFile, stageFile) )


s.append('set -x')
s.append('pwd')
s.append('ls -lah')
s.append('echo ***BEGINNING SLURMV2***')
s.append('CRAB2_OLD_DIRECTORY=`pwd`')
s.append('CRAB2_SLURM_WORKDIR=%s' % self.workerNodeWorkDir)
if self.workerNodeWorkDir:
s.append('cd %s' % self.workerNodeWorkDir)


s.append('CRAB2_WORKDIR=`pwd`/CRAB2-$SLURM_JOBCOOKIE$SLURM_JOBID')
s.append('if [ ! -d $CRAB2_WORKDIR ] ; then ')
s.append(' mkdir -p $CRAB2_WORKDIR')
s.append('fi')
s.append('cd $CRAB2_WORKDIR')

# move files up to $SLURM_JOBCOOKIE
inputFiles = task['globalSandbox'].split(',')

# Do we want the proxy?
if self.use_proxy:
if os.path.exists( self.proxy_location ):
inputFiles.append( self.proxy_location )
else:
raise SchedulerError, "Proxy not found at %s" % self.proxy_location

for file in inputFiles:
targetFile = "%s-%s" % (randomPrefix, os.path.basename( file ) )
s.append('mv $CRAB2_SLURM_WORKDIR/%s $CRAB2_WORKDIR/%s' % \
( targetFile,
os.path.basename( file ) ) )

# set proxy
if self.use_proxy:
s.append('export X509_USER_PROXY=$CRAB2_WORKDIR/%s' % os.path.basename( self.proxy_location ) )

s.append("./%s %s" % (job['executable'], job['arguments']) )

s.append('ls $CRAB2_WORKDIR')
# move output files to where SLURM can find them
for file in job['outputFiles']:
s.append('mv $CRAB2_WORKDIR/%s $CRAB2_SLURM_WORKDIR/%s-%s' % (file, randomPrefix, file ) )

fileList = []
for file in job['outputFiles']:
targetFile = os.path.abspath( os.path.join( self.workerNodeWorkDir,
"%s-%s" % (randomPrefix, os.path.basename( file ) ) ) )
stageFile = os.path.abspath( os.path.join( task['outputDirectory'],
file ) )
if not self.forceTransferFiles:
s.append('mv -f %s %s' % (targetFile, stageFile) )


s.append('cd $CRAB2_OLD_DIRECTORY')
s.append('rm -rf $CRAB2_WORKDIR')
pbsScript.write('\n'.join(s))
pbsScript.flush()
for line in s:
self.logging.debug(" CONFIG: %s" % line)

s = []
s.append('#!/bin/sh');
if self.workerNodeWorkDir:
s.append('cd ' + self.workerNodeWorkDir)
s.append('rm -fr $SLURM_JOBCOOKIE')
s.append('touch $HOME/done.$1')
epilogue.write( '\n'.join( s ) )
epilogue.flush()
os.chmod( epilogue.name, 700 )

p = subprocess.Popen("sbatch %s" % pbsScript.name, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(child_stdout, child_stderr) = p.communicate()
pbsScript.close()
epilogue.close()

if p.returncode != 0:
self.logging.error('Error in job submission')
self.logging.error(child_stderr)
raise SchedulerError('SLURM error', child_stderr)

try:
jobid = int(child_stdout.strip().split(' ')[3])
except:
self.logging.error("SLURM could not submit job: %s" % (child_stdout))
self.logging.error(child_stderr)
raise SchedulerError('SLURM error', child_stderr)

return {job['name']:jobid}, None, None

def query(self, obj, service='', objType='node') :
"""
query status and eventually other scheduler related information
It may use single 'node' scheduler id or bulk id for association
"""
if type(obj) != Task :
raise SchedulerError('wrong argument type', str( type(obj) ))

jobids=[]
for job in obj.jobs:
if not self.valid( job.runningJob ): continue
id=str(job.runningJob['schedulerId']).strip()
#p = subprocess.Popen( ['qstat', '-x', id], stdout=subprocess.PIPE,
p = subprocess.Popen( ['squeue', '-h', '-o','<jobid>%i</jobid><exec_host>%B</exec_host><job_state>%t</job_state>','-j', id], stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
qstat_output, \
qstat_error = p.communicate()
qstat_return = p.returncode

if qstat_return:
#if qstat_return != 153: # 153 means the job isn't there
if qstat_return != 1: # 153 means the job isn't there
self.logging.error('Error in job query for '+id)
self.logging.error('SLURM stdout: \n %s' % qstat_output)
self.logging.error('SLURM stderr: \n %s' % qstat_error)
raise SchedulerError('SLURM error', '%s: %s' % (qstat_error, qstat_return) )

host=''
if len(qstat_output)==0:
pbs_stat='Done'
else:
if qstat_output.find('</exec_host>') >= 0:
host = qstat_output[ qstat_output.find('<exec_host>') + len('<exec_host>') :
qstat_output.find('</exec_host>') ]
if qstat_output.find('</job_state>') >= 0:
pbs_stat = qstat_output[ qstat_output.find('<job_state>') + len('<job_state>') :
qstat_output.find('</job_state>') ]

job.runningJob['statusScheduler']=pbs_stat
job.runningJob['status'] = self.status_map[pbs_stat]
job.runningJob['destination']=host

def kill(self, obj):

for job in obj.jobs :
if not self.valid( job.runningJob ): continue
id=str(job.runningJob['schedulerId']).strip()

p = subprocess.Popen( ['scancel', id], stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
qdel_output, \
qdel_error = p.communicate()
qdel_return = p.returncode

if qdel_return != 0:
self.logging.error('Error in job kill for '+id)
self.logging.error('SLURM Error stdout: %s' % qdel_output)
raise SchedulerError('SLURM Error in kill', qdel_output)

def getOutput( self, obj, outdir='' ):
"""
retrieve output or just put it in the destination directory
does not return
"""
pass

0 comments on commit 45d2d71

Please sign in to comment.