Skip to content

Commit

Permalink
Merge pull request DataBiosphere#1944 from BD2KGenomics/issues/1940-j…
Browse files Browse the repository at this point in the history
…ob-attributes

Add convenience Job attributes (resolves DataBiosphere#1940)
  • Loading branch information
ejacox authored Nov 3, 2017
2 parents f5b7bce + 83f83d3 commit dd221e2
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 41 deletions.
50 changes: 21 additions & 29 deletions docs/developingWorkflows/developing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ gigabytes of memory, 2 cores and 3 gigabytes of local disk to complete the work.

The :func:`toil.job.Job.run` method is the function the user overrides to get
work done. Here it just logs a message using
:func:`toil.fileStore.FileStore.logToMaster`, which will be registered in the log
:func:`toil.job.Job.log`, which will be registered in the log
output of the leader process of the workflow.


Expand Down Expand Up @@ -106,8 +106,8 @@ For example::
self.message = message

def run(self, fileStore):
fileStore.logToMaster("Hello, world!, I have a message: %s"
% self.message)
self.log("Hello, world!, I have a message: {}".format(self.message))

if __name__=="__main__":
options = Job.Runner.getDefaultOptions("./toilWorkflowRun")
options.logLevel = "INFO"
Expand Down Expand Up @@ -235,17 +235,15 @@ argument in a class, this allows access to the methods of the wrapping job, see
from toil.job import Job

def helloWorld(job, message):
job.fileStore.logToMaster("Hello world, "
"I have a message: %s" % message) # This uses a logging function
# of the toil.fileStore.FileStore class
job.log("Hello world, I have a message: {}".format(message))

if __name__=="__main__":
options = Job.Runner.getDefaultOptions("./toilWorkflowRun")
options.logLevel = "INFO"
print Job.Runner.startToil(Job.wrapJobFn(helloWorld, "woot"), options)

Here ``helloWorld()`` is a job function. It accesses the
:class:`toil.fileStore.FileStore` attribute of the job to log a message that will
Here ``helloWorld()`` is a job function. It uses the :func:`toil.job.Job.log`
to log a message that will
be printed to the output console. Here the only subtle difference to note is
the line::

Expand Down Expand Up @@ -277,9 +275,7 @@ earlier ``helloWorld()`` job function::
from toil.job import Job

def helloWorld(job, message, memory="2G", cores=2, disk="3G"):
job.fileStore.logToMaster("Hello world, "
"I have a message: %s" % message) # This uses a logging function
# of the toil.fileStore.FileStore class
job.log("Hello world, I have a message: {}".format(message))

j1 = Job.wrapJobFn(helloWorld, "first")
j2 = Job.wrapJobFn(helloWorld, "second or third")
Expand All @@ -304,9 +300,7 @@ example::
from toil.job import Job

def helloWorld(job, message, memory="2G", cores=2, disk="3G"):
job.fileStore.logToMaster("Hello world, "
"I have a message: %s" % message) # This uses a logging function
# of the toil.fileStore.FileStore class
job.log("Hello world, I have a message: {}".format(message))

j1 = Job.wrapJobFn(helloWorld, "first")
j2 = j1.addChildJobFn(helloWorld, "second or third")
Expand All @@ -330,9 +324,7 @@ specified as a DAG as follows::
from toil.job import Job

def helloWorld(job, message, memory="2G", cores=2, disk="3G"):
job.fileStore.logToMaster("Hello world, "
"I have a message: %s" % message) # This uses a logging function
# of the toil.fileStore.FileStore class
job.log("Hello world, I have a message: {}".format(message))

j1 = Job.wrapJobFn(helloWorld, "first")
j2 = j1.addChildJobFn(helloWorld, "second or third")
Expand Down Expand Up @@ -362,7 +354,7 @@ Toil also allows jobs to be created dynamically within jobs. For example::
job.addChildJobFn(binaryStringFn, depth-1, message + "0")
job.addChildJobFn(binaryStringFn, depth-1, message + "1")
else:
job.fileStore.logToMaster("Binary string: %s" % message)
job.log("Binary string: {}".format(message))

if __name__=="__main__":
options = Job.Runner.getDefaultOptions("./toilWorkflowRun")
Expand Down Expand Up @@ -391,7 +383,7 @@ the following example::
from toil.job import Job

def fn(job, i):
job.fileStore.logToMaster("i is: %s" % i, level=100)
job.log("i is: %s" % i, level=100)
return i+1

j1 = Job.wrapJobFn(fn, 1)
Expand Down Expand Up @@ -546,9 +538,9 @@ node and that will be cleaned up, regardless of failure, when the job finishes::

class LocalFileStoreJob(Job):
def run(self, fileStore):
scratchDir = fileStore.getLocalTempDir() #Create a temporary
# directory safely within the allocated disk space
# reserved for the job.
scratchDir = self.tempDir
# self.TempDir will always contain the name of a directory within
# the allocated disk space reserved for the job

scratchFile = fileStore.getLocalTempFile() #Similarly
# create a temporary file.
Expand All @@ -565,7 +557,7 @@ Job functions can also access the file store for the job. The equivalent of the
``LocalFileStoreJob`` class is::

def localFileStoreJobFn(job):
scratchDir = job.fileStore.getLocalTempDir()
scratchDir = job.tempDir
scratchFile = job.fileStore.getLocalTempFile()

Note that the ``fileStore`` attribute is accessed as an attribute of the
Expand All @@ -580,9 +572,8 @@ example::
import os

def globalFileStoreJobFn(job):
job.fileStore.logToMaster("The following example exercises all the"
" methods provided by the"
" toil.fileStore.FileStore class")
job.log("The following example exercises all the methods provided"
" by the toil.fileStore.FileStore class")

scratchFile = job.fileStore.getLocalTempFile() # Create a local
# temporary file.
Expand All @@ -606,7 +597,7 @@ example::
scratchFile2 = job.fileStore.readGlobalFile(fileID)

# Read the second file to a desired location: scratchFile3.
scratchFile3 = os.path.join(job.fileStore.getLocalTempDir(), "foo.txt")
scratchFile3 = os.path.join(job.tempDir, "foo.txt")
job.fileStore.readGlobalFile(fileID2, userPath=scratchFile3)

# Read the second file again using a stream.
Expand Down Expand Up @@ -730,7 +721,7 @@ An example of a basic ``dockerCall`` is below:

dockerCall(job=job,
tool='quay.io/ucsc_cgl/bwa',
workDir=job.fileStore.getLocalTempDir(),
workDir=job.tempDir,
parameters=['index', '/data/reference.fa'])

``dockerCall`` can also be added to workflows like any other job function:
Expand All @@ -739,7 +730,7 @@ An example of a basic ``dockerCall`` is below:

align = Job.wrapJobFn(dockerCall,
tool='quay.io/ucsc_cgl/bwa',
workDir=job.fileStore.getLocalTempDir(),
workDir=job.tempDir,
parameters=['index', '/data/reference.fa']))

if __name__=="__main__":
Expand All @@ -760,6 +751,7 @@ set by passing in the optional keyword argument, 'entrypoint'. Example:

entrypoint=["/bin/bash","-c"]


dockerCall supports currently the 75 keyword arguments found in the python
`Docker API`_, under the 'run' command.

Expand Down
2 changes: 1 addition & 1 deletion docs/gettingStarted/quickStart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ Next let's look at the job that begins the actual workflow, ``setup``.
.. literalinclude:: ../../src/toil/test/sort/sort.py
:pyobject: setup

``setup`` really only does two things. First it writes to the logs using :func:`Job.FileStore.logToMaster` and then
``setup`` really only does two things. First it writes to the logs using :func:`Job.log` and then
calls :func:`addChildJobFn`. Child jobs run directly after the current job. This function turns the 'job function'
``down`` into an actual job and passes in the inputs including an optional resource requirement, ``memory``. The job
doesn't actually get run until the call to :func:`Job.rv`. Once the job ``down`` finishes, its output is returned here.
Expand Down
2 changes: 1 addition & 1 deletion docs/running/cli.rst
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ defined before launching the Job, i.e:
Running Workflows with Services
-------------------------------

Toil supports jobs, or clusters of jobs, that run as *services* (see :ref:`service-dev-ref` ) to other
Toil supports jobs, or clusters of jobs, that run as *services* (see :ref:`serviceDev`) to other
*accessor* jobs. Example services include server databases or Apache Spark
Clusters. As service jobs exist to provide services to accessor jobs their
runtime is dependent on the concurrent running of their accessor jobs. The dependencies
Expand Down
19 changes: 19 additions & 0 deletions src/toil/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ def __init__(self, memory=None, cores=None, disk=None, preemptable=None, unitNam
self._rvs = collections.defaultdict(list)
self._promiseJobStore = None
self._fileStore = None
self._tempDir = None

def run(self, fileStore):
"""
Expand Down Expand Up @@ -457,6 +458,24 @@ def addFollowOnJobFn(self, fn, *args, **kwargs):
else:
return self.addFollowOn(JobFunctionWrappingJob(fn, *args, **kwargs))

@property
def tempDir(self):
"""
Shortcut to calling :func:`job.fileStore.getLocalTempDir`. Temp dir is created on first call
and will be returned for first and future calls
:return: Path to tempDir. See `job.fileStore.getLocalTempDir`
:rtype: str
"""
if self._tempDir is None:
self._tempDir = self._fileStore.getLocalTempDir()
return self._tempDir

def log(self, text, level=logging.INFO):
"""
convenience wrapper for :func:`fileStore.logToMaster`
"""
self._fileStore.logToMaster(text, level)

@staticmethod
def wrapFn(fn, *args, **kwargs):
"""
Expand Down
14 changes: 7 additions & 7 deletions src/toil/test/sort/sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def setup(job, inputFile, N, downCheckpoints, options):
Sets up the sort.
Returns the FileID of the sorted file
"""
job.fileStore.logToMaster("Starting the merge sort")
job.log("Starting the merge sort")
return job.addChildJobFn(down,
inputFile, N,
downCheckpoints,
Expand All @@ -58,8 +58,8 @@ def down(job, inputFileStoreID, N, downCheckpoints, options, memory=sortMemory):
length = os.path.getsize(inputFile)
if length > N:
# We will subdivide the file
job.fileStore.logToMaster("Splitting file: %s of size: %s"
% (inputFileStoreID, length), level=logging.CRITICAL)
job.log("Splitting file: %s of size: %s"
% (inputFileStoreID, length), level=logging.CRITICAL)
# Split the file into two copies
midPoint = getMidPoint(inputFile, 0, length)
t1 = job.fileStore.getLocalTempFile()
Expand All @@ -77,8 +77,8 @@ def down(job, inputFileStoreID, N, downCheckpoints, options, memory=sortMemory):
checkpoint=downCheckpoints, options=options, memory=options.mergeMemory).rv(), options=options, memory=options.sortMemory).rv()
else:
# We can sort this bit of the file
job.fileStore.logToMaster("Sorting file: %s of size: %s"
% (inputFileStoreID, length), level=logging.CRITICAL)
job.log("Sorting file: %s of size: %s"
% (inputFileStoreID, length), level=logging.CRITICAL)
# Sort the copy and write back to the fileStore
shutil.copyfile(inputFile, inputFile + '.sort')
sort(inputFile + '.sort')
Expand All @@ -93,8 +93,8 @@ def up(job, inputFileID1, inputFileID2, options, memory=sortMemory):
with job.fileStore.readGlobalFileStream(inputFileID1) as inputFileHandle1:
with job.fileStore.readGlobalFileStream(inputFileID2) as inputFileHandle2:
merge(inputFileHandle1, inputFileHandle2, fileHandle)
job.fileStore.logToMaster("Merging %s and %s to %s"
% (inputFileID1, inputFileID2, outputFileStoreID))
job.log("Merging %s and %s to %s"
% (inputFileID1, inputFileID2, outputFileStoreID))
# Cleanup up the input files - these deletes will occur after the completion is successful.
job.fileStore.deleteGlobalFile(inputFileID1)
job.fileStore.deleteGlobalFile(inputFileID2)
Expand Down
6 changes: 3 additions & 3 deletions src/toil/test/src/fileStoreTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ def testAsyncWriteWithCaching(self):
A = Job.wrapJobFn(self._forceModifyCacheLockFile, newTotalMB=1024, disk='1G')
B = Job.wrapJobFn(self._doubleWriteFileToJobStore, fileMB=850, disk='900M')
# Set it to > 2GB such that the cleanup jobs don't die.
C = Job.wrapJobFn(self._readFromJobStoreWithoutAsssertions, fsID=B.rv(), disk='1G')
C = Job.wrapJobFn(self._readFromJobStoreWithoutAssertions, fsID=B.rv(), disk='1G')
D = Job.wrapJobFn(self._forceModifyCacheLockFile, newTotalMB=5000, disk='1G')
A.addChild(B)
B.addChild(C)
Expand Down Expand Up @@ -763,13 +763,13 @@ def newHarbingerFileRead(self):

job.fileStore.writeGlobalFile(testFile.name)
fsID = job.fileStore.writeGlobalFile(testFile.name)
hidden.AbstractCachingFileStoreTest._readFromJobStoreWithoutAsssertions(job, fsID)
hidden.AbstractCachingFileStoreTest._readFromJobStoreWithoutAssertions(job, fsID)
# Make this take longer so we can test asynchronous writes across jobs/workers.
job.fileStore.HarbingerFile.read = newHarbingerFileRead
return job.fileStore.writeGlobalFile(testFile.name)

@staticmethod
def _readFromJobStoreWithoutAsssertions(job, fsID):
def _readFromJobStoreWithoutAssertions(job, fsID):
"""
Reads a file from the job store. That will be all, thank you.
Expand Down
50 changes: 50 additions & 0 deletions src/toil/test/src/jobTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,18 @@ def createWorkflow():

self.runNewCheckpointIsLeafVertexTest(createWorkflow)

def testTempDir(self):
"""
test that job.tempDir works as expected and make use of job.log for logging
"""
message = "I love rachael price"

options = Job.Runner.getDefaultOptions(self._getTestJobStorePath())
with Toil(options) as workflow:
j = sillyTestJob(message)
j.addChildJobFn(seriousTestJob, message)
workflow.start(j)

def runNewCheckpointIsLeafVertexTest(self, createWorkflowFn):
"""
Test verification that a checkpoint job is a leaf vertex using both
Expand Down Expand Up @@ -693,6 +705,44 @@ def child(job):
pass


class sillyTestJob(Job):
"""
all this job does is write a message to a tempFile
in the tempDir (which is deleted)
"""
def __init__(self, message):
Job.__init__(self)
self.message = message

@staticmethod
def sillify(message):
"""
Turns "this serious help message" into "shis serious selp sessage"
"""
return ' '.join(['s' + word if word[0] in 'aeiou' else 's' + word[1:] for word in message.split()])

def run(self, fileStore):
file1 = self.tempDir + 'sillyFile.txt'
self.log('first filename is {}'.format(file1))
file2 = self.tempDir + 'sillyFile.txt'
self.log('second filename is {}'.format(file1))
# make sure we get the same thing every time
assert file1 == file2

# write to the tempDir to be sure that everything works
with open(file1, 'w') as fd:
fd.write(self.sillify(self.message))


def seriousTestJob(job, message):
# testing job.temDir for functionJobs
with open(job.tempDir + 'seriousFile.txt', 'w') as fd:
fd.write("The unadulterated message is:")
fd.write(message)
# and logging
job.log("message has been written")


def checkRequirements(job):
# insure default resource requirements are being set correctly
assert job.cores is not None
Expand Down

0 comments on commit dd221e2

Please sign in to comment.