Skip to content

Commit

Permalink
Merge pull request DataBiosphere#1892 from chapmanb/master
Browse files Browse the repository at this point in the history
Batch: cache instead of block for statePollingWait
  • Loading branch information
ejacox authored Sep 20, 2017
2 parents 8aff5d1 + 996ee0c commit e549103
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 19 deletions.
37 changes: 28 additions & 9 deletions src/toil/batchSystems/abstractGridEngineBatchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from __future__ import absolute_import

from builtins import str
from datetime import datetime
import os
import shutil
import logging
Expand Down Expand Up @@ -69,6 +70,8 @@ def __init__(self, newJobsQueue, updatedJobsQueue, killQueue, killedJobsQueue, b
self.boss = boss
self.allocatedCpus = dict()
self.batchJobIDs = dict()
self._checkOnJobsCache = None
self._checkOnJobsTimestamp = None

def getBatchSystemID(self, jobID):
"""
Expand Down Expand Up @@ -181,19 +184,25 @@ def killJobs(self):
return True

def checkOnJobs(self):
"""Check and update status of all running jobs.
Respects statePollingWait and will return cached results if not within
time period to talk with the scheduler.
"""
Check and update status of all running jobs.
"""
activity = False
self.boss.sleepSeconds()
if (self._checkOnJobsTimestamp and
(datetime.now() - self._checkOnJobsTimestamp).total_seconds() < self.boss.config.statePollingWait):
return self._checkOnJobsCache

activity = False
for jobID in list(self.runningJobs):
batchJobID = self.getBatchSystemID(jobID)
status = self.getJobExitCode(batchJobID)
if status is not None:
activity = True
self.updatedJobsQueue.put((jobID, status))
self.forgetJob(jobID)
self._checkOnJobsCache = activity
self._checkOnJobsTimestamp = datetime.now()
return activity

def run(self):
Expand Down Expand Up @@ -300,6 +309,8 @@ def __init__(self, config, maxCores, maxMemory, maxDisk):
self.worker.start()
self.localBatch = registry.batchSystemFactoryFor(registry.defaultBatchSystem())()(config, maxCores,
maxMemory, maxDisk)
self._getRunningBatchJobIDsTimestamp = None
self._getRunningBatchJobIDsCache = {}

def __des__(self):
# Closes the file handle associated with the results file.
Expand Down Expand Up @@ -355,8 +366,19 @@ def getIssuedBatchJobIDs(self):
return list(self.localBatch.getIssuedBatchJobIDs()) + list(self.currentJobs)

def getRunningBatchJobIDs(self):
"""Retrieve running job IDs from local and batch scheduler.
Respects statePollingWait and will return cached results if not within
time period to talk with the scheduler.
"""
localIds = self.localBatch.getRunningBatchJobIDs()
batchIds = self.worker.getRunningJobIDs()
if (self._getRunningBatchJobIDsTimestamp and
(datetime.now() - self._getRunningBatchJobIDsTimestamp).total_seconds() < self.config.statePollingWait):
batchIds = self._getRunningBatchJobIDsCache
else:
batchIds = self.worker.getRunningJobIDs()
self._getRunningBatchJobIDsCache = batchIds
self._getRunningBatchJobIDsTimestamp = datetime.now()
batchIds.update(localIds)
return batchIds

Expand Down Expand Up @@ -398,13 +420,10 @@ def getWaitDuration(self):
def getRescueBatchJobFrequency(cls):
return 30 * 60 # Half an hour

def sleepSeconds(self, sleeptime=None):
def sleepSeconds(self, sleeptime=1):
""" Helper function to drop on all state-querying functions to avoid over-querying.
"""
sleeptime = sleeptime or self.config.statePollingWait
logger.debug('Querying job state, waiting for %s seconds', sleeptime)
time.sleep(sleeptime)

return sleeptime

@abstractclassmethod
Expand Down
17 changes: 8 additions & 9 deletions src/toil/batchSystems/torque.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,18 @@ def _pbsVersion(self):
"""
def getRunningJobIDs(self):
times = {}

self.boss.sleepSeconds()
currentjobs = dict((str(self.batchJobIDs[x][0].strip()), x) for x in self.runningJobs)
logger.debug("getRunningJobIDs current jobs are: " + str(currentjobs))
# Limit qstat to current username to avoid clogging the batch system on heavily loaded clusters
#job_user = os.environ.get('USER')
#process = subprocess.Popen(['qstat', '-u', job_user], stdout=subprocess.PIPE)
# -x shows exit status in PBSPro, not XML output like OSS PBS
# Skip running qstat if we don't have any current jobs
if not currentjobs:
return times
# Only query for job IDs to avoid clogging the batch system on heavily loaded clusters
# PBS plain qstat will return every running job on the system.
jobids = sorted(currentjobs.keys())
if self._version == "pro":
process = subprocess.Popen(['qstat', '-x'], stdout=subprocess.PIPE)
process = subprocess.Popen(['qstat', '-x'] + jobids, stdout=subprocess.PIPE)
elif self._version == "oss":
process = subprocess.Popen(['qstat'], stdout=subprocess.PIPE)
process = subprocess.Popen(['qstat'] + jobids, stdout=subprocess.PIPE)


stdout, stderr = process.communicate()
Expand Down Expand Up @@ -107,7 +107,6 @@ def getRunningJobIDs(self):

def getUpdatedBatchJob(self, maxWait):
try:
self.boss.sleepSeconds()
logger.debug("getUpdatedBatchJob: Job updates")
pbsJobID, retcode = self.updatedJobsQueue.get(timeout=maxWait)
self.updatedJobsQueue.task_done()
Expand Down
3 changes: 2 additions & 1 deletion src/toil/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,8 @@ def _addOptions(addGroupFn, config):
addOptionFn("--deadlockWait", dest="deadlockWait", default=None,
help=("The minimum number of seconds to observe the cluster stuck running only the same service jobs before throwing a deadlock exception. default=%s" % config.deadlockWait))
addOptionFn("--statePollingWait", dest="statePollingWait", default=1,
help=("The minimum number of seconds to wait before retrieving the current job state, in seconds"))
help=("Time, in seconds, to wait before doing a scheduler query for job state. "
"Return cached results if within the waiting period."))

#
#Resource requirements
Expand Down

0 comments on commit e549103

Please sign in to comment.