diff --git a/base.py b/base.py new file mode 100644 index 0000000..d5ef157 --- /dev/null +++ b/base.py @@ -0,0 +1,107 @@ +'''Base objects and functionality.''' + +import sys +import shutil +import os +import time +import glob + +class ProcessingObject(): + """Base object with the properties shared by all processing + objects.""" + overwrite = False + waitfornext = True + verbose = True + channelout = sys.stdout.writelines + update = False + parallel = False + inext = '.nii' + outext = '.nii' + + def findStrInList(self,searchstr,targetlist): + '''Use a list comp search method to find a target string in each of + the strings in targetlist. Numerous assumptions: 1) error if no + matches, 2) only one match should be found.''' + hits = [x for x in targetlist if searchstr in x] + if not hits: + raise Exception('No matches for %s' % searchstr) + return hits + + def findListItemInStr(self,searchstr,targetlist): + '''Use a list comp search method to find a target string in each of + the strings in targetlist. Numerous assumptions: 1) error if no + matches, 2) only one match should be found.''' + hits = [x for x in targetlist if x in searchstr] + if not hits: + raise Exception('No matches for %s' % searchstr) + return hits + + def output(self,text): + '''Write out text to e.g. stdout but only if verbose.''' + if type(text) is not list: + text = [text] + if self.verbose: + self.channelout(('%s\n' % s for s in text)) + return + + def checkInputs(self,argdict): + for attribute in argdict: + if hasattr(self,attribute): + #self.output('Updating attribute: %s' % attribute) + setattr(self,attribute,argdict[attribute]) + else: + raise Exception('Unknown argument: %s' % attribute) + return + + def getOutput(self,invol,sd): + """The default getOutput method converts an invol to an outvol + based on various object settings. Inherited objects can overwrite + with methods based on checking attributes internally instead.""" + outpath,outfile = os.path.split(invol[0]) + outfn,outext = os.path.splitext(outfile) + return self.addPrefix(os.path.join(outpath,outfn + self.outext) + ,self.outprefix) + + def checkOutput(self,outvol): + """Use the output from getOutput to decide if the task has already + been done (ie, there is output) or not. To be overwritten with + getOutput by processes that don't use volumes as output.""" + # Support wild card matching for the adventurous + if '*' in outvol: + return len(glob.glob(outvol)) >= 1 + else: + return os.path.exists(outvol) + + def setAllInputs(self,argdict): + for attribute in argdict: + setattr(self,attribute,argdict[attribute]) + return + + def mkAnaDir(self,anadir): + '''Make a directory while remaining conscious of existing dirs and + overwrite settings.''' + # Check if we need to worry about overwriting + if os.path.exists(anadir): + if self.overwrite: + self.output('Creating %s after removing old dir' % anadir) + shutil.rmtree(anadir) + elif self.update: + self.output('%s already exists, continuing' % anadir) + return False + else: + raise Exception('%s already exists!' % anadir) + os.mkdir(anadir) + return True + + def initialiseLog(self,logfile): + """Make a log file that stdout gets piped to.""" + timestr = time.strftime('%y%m%d_%H%M') + if os.path.exists(logfile) and not self.overwrite: + F = open(logfile,'a') + else: + F = open(logfile,'w') + F.write('\n\nbatchbrain on %s...\n' % timestr) + F.close() + return + + diff --git a/data.py b/data.py new file mode 100644 index 0000000..8214fb2 --- /dev/null +++ b/data.py @@ -0,0 +1,509 @@ +'''Objects for storing data.''' +import collections +import os +import glob +import numpy +import batchbrain.base +import pdb +import shutil +import scipy.stats +import csv + +class Task(batchbrain.base.ProcessingObject): + """Stores properties for a given task (ie, a function and a list of + arguments to call it with). + In the future, this will likely be useful for flow control + (waitfornext etc)""" + + def __init__(self,taskname,procfunc,postrunfun=None,**kwargs): + """Initialise a new task object.""" + self.name = taskname + self.func = procfunc + self.postrunfun = postrunfun + # Just return + if self.postrunfun is None: + def postrunfun(out): + return None + self.postrunfun = postrunfun + self.checkInputs(kwargs) + self.jobs = [] + return + + def updateJobs(self,args): + self.jobs.append(args) + return + + def postRunUpdate(self,outputs): + """Deal with outputs from some processing.""" + for o in outputs: + self.postrunfun(o) + return + +class AcquisitionData(batchbrain.base.ProcessingObject): + """Storage object for a single acquisition (smallest object in the + hierarchy unless we go to VolumeData...""" + + def __init__(self,acqdir,parameters=None,**kwargs): + # Check for DICOM import status + self.checkInputs(kwargs) + madedir = self.mkAnaDir(acqdir) + self.imported = madedir==False + self.path = acqdir + self.name = os.path.split(acqdir)[1] + # Store things like e.g. motion data + self.parameters = parameters + if self.parameters is None: + self.parameters = {} + return + + def findFiles(self,prefix,ext): + """Find some files in the acquisition dir with glob.""" + searchstr = os.path.join(self.path, + '%s*%s' % (prefix,ext)) + return glob.glob(searchstr) + + def summariseTranslation(self): + '''Calculate some summary statistic for how much a subject moved. + Not sure how to do this ultimately. For now, use the sum of the + absolute distance.''' + # Get the scan-by-scan difference in xyz + transdiff = numpy.diff(self.parameters['translation'],axis=0) + # Compute euclidean distance of diffs + transdist = numpy.sqrt(transdiff[:,0]**2 + transdiff[:,1]**2 + + transdiff[:,2]**2) + self.parameters['translation_totdist'] = numpy.sum(transdist) + return self.parameters['translation_totdist'] + +class SequenceData(batchbrain.base.ProcessingObject): + """Storage object for a sequence.""" + + def __init__(self,**kwargs): + '''Initialise a SequenceData object, setup data structures.''' + self.acquisitions = collections.OrderedDict() + self.checkInputs(kwargs) + + # Shortcut + self.acq = self.acquisitions + return + + def chooseLeastMover(self): + score = [] + potacqs = self.acquisitions.keys() + for acq in potacqs: + score.append(self.acq[acq].summariseTranslation()) + # Pick the min + ind = score.index(min(score)) + return self.acq[potacqs[ind]] + + def resolveAcquisition(self,acqrule): + '''Return a list of acquisitions according to a rule.''' + potacqs = self.acquisitions.keys() + if acqrule is int: + return [self.acq[potacqs[acqrule]]] + if acqrule == 'all': + return [self.acq[x] for x in self.acq] + if acqrule == 'first': + return [self.acq[potacqs[0]]] + if acqrule == 'last': + return [self.acq[potacqs[-1:]]] + if acqrule == 'leasttranslation': + return [self.chooseLeastMover()] + if acqrule == 'leastrotation': + raise Exception('No support yet for rotation!') + raise Exception('Unknown acqrule: %s' % acqrule) + return + +class GroupData(batchbrain.base.ProcessingObject): + """Storage object for a group analysis.""" + + def __init__(self,groupdir,**kwargs): + """Make a group analysis directory and set up internal data + attributes.""" + self.path = groupdir + self.name = os.path.split(groupdir)[1] + self.subdata = collections.OrderedDict() + self.checkInputs(kwargs) + self.mkAnaDir(groupdir) + return + + def addSubjectToGroup(self,sub,targetsequence,inprefix='', \ + inext='.nii',acqrule='last',outprefix='',outext='.nii'): + """Include a new subject in the group.""" + # Find the acquisition we want + acq = sub.seq[targetsequence].resolveAcquisition(acqrule) + if len(acq) > 1: + raise Exception('No support for multivol per sub group analysis') + acq = acq[0] + # Find the volume + invol = acq.findFiles(inprefix,inext) + if len(invol) != 1: + raise Exception('Expected 1 invol, got:\n%s' % invol) + invol = invol[0] + # Get outvol. getOutput method would be ideal here but can't + # make it general enough + fpath,ffile = os.path.split(invol) + ffn,fext = os.path.splitext(ffile) + outvol = os.path.join(self.path, + outprefix+sub.name+'_'+ffn+outext) + alreadydone = self.checkOutput(outvol) + if alreadydone: + if self.overwrite: + pass + elif not self.update: + raise Exception('%s already exists.' % outvol) + # Copy volume across + shutil.copy(invol,outvol) + return + +class SubjectData(batchbrain.base.ProcessingObject): + """Storage object for individual subjects. Used for accessing paths and + subject info (e.g. demographics).""" + + def __init__(self,subjdir,demographics=None,**kwargs): + """Make a subject directory and set up internal data attributes.""" + # Initialise other bits + self.path = subjdir + self.name = os.path.split(self.path)[1] + self.sequences = collections.OrderedDict() + self.demographics = demographics + self.checkInputs(kwargs) + self.mkAnaDir(subjdir) + # TODO: update subject list in super. Not sure how to do this. + + # Shortcut + self.seq = self.sequences + return + +class StudyData(batchbrain.base.ProcessingObject): + """Master object that keeps track of the data in the study.""" + + def __init__(self,anadir,projectcodes=None,**kwargs): + '''Initialise a new dataset, create main study directory.''' + # Check if any interesting flags came in (e.g., overruling + # overwrite setting) + self.checkInputs(kwargs) + self.output('Initialising StudyData object') + # Set up main data directory + self.mkAnaDir(anadir) + self.anadir = anadir + self.subjects = collections.OrderedDict() + self.projectcodes = projectcodes + self.groups = {} + + # Keep track of functions and inputs to map - by order of addition + self.tasks = collections.OrderedDict() + + # shortcut + self.sub = self.subjects + return + + def addJobs(self,tn,procfunc,arguments,postrunfun=None): + """Add a given prepared process to the tasklist.""" + if not self.tasks.has_key(tn): + # Initialise new task + self.tasks[tn] = Task(tn,procfunc,update=self.update, + overwrite=self.overwrite,verbose=self.verbose, + postrunfun=postrunfun) + else: + # Ensure the user isn't doing something odd + if not self.tasks[tn].func == procfunc: + raise Exception('procfunc mismatch: old %s new %s' % ( + self.tasks[tn].func,procfunc)) + self.tasks[tn].updateJobs(arguments) + return + + def printSummary(self): + '''TODO: collection of n subjects by sequence etc.''' + self.output('STUDY SUMMARY:') + self.output('%d subjects' % len(self.subjects)) + self.output('%d sequences' % len(self.sequences)) + self.output('SUBJECT BY SEQUENCE BREAKDOWN:') + # Longest sequence + totlen = len(max(self.sequences+self.subjects,key=len)) + cn = len(self.sequences)+1 + rn = len(self.subjects) + collabs = [s.center(totlen) for s in ['']+self.sequences] + # First label row + self.output('%s\t'*cn % tuple(collabs)) + for sub in self.subjects: + # first the label + row = sub.ljust(totlen) + # Then each sequence + for seq in self.sequences: + if self.countdict[sub].has_key(seq): + num = self.countdict[sub][seq] + else: + num = 0 + row += '\t' + str(num).center(totlen) + self.output(row) + return + + def addSubject(self,sub,demographics=None): + '''Add a list of subjects to the object and initialise + directories.''' + subdir = os.path.join(self.anadir,sub) + if demographics is None: + demographics = {} + self.subjects[sub] = SubjectData(subdir,demographics=demographics, + overwrite=self.overwrite, + update=self.update, + verbose=self.verbose) + return + + def getPaths(self,sub,seq,acq,prefix='',ext='.nii'): + '''Retrieve a list of paths to volumes.''' + # Get acq objects in a list + acqs = self.sub[sub].seq[seq].resolveAcquisition(acq) + vols = [] + for a in acqs: + vols += a.findFiles(prefix,ext) + return vols + + def pathToAcquisition(self,inpath): + """Deduce the likely acq object from a file path.""" + # Figure out a subject + sub = self.findListItemInStr(inpath,self.subjects.keys()) + if len(sub) > 1: + raise Exception('Multiple possible subjects:\n%s' % \ + sub) + sub = sub[0] + seq = self.findListItemInStr(inpath,self.sub[sub].seq.keys()) + if len(seq) > 1: + raise Exception('Multiple possible sequences:\n%s' % \ + seq) + seq = seq[0] + acq = self.findListItemInStr(inpath,self.sub[sub].seq[seq].acq.keys()) + if len(acq) > 1: + raise Exception('Multiple possible acquisitions:\n%s' % \ + acq) + acq = acq[0] + return self.sub[sub].seq[seq].acq[acq] + + def getSubjectsByDemographics(self,key,onlyvals=None): + """Return a dict of lists of SubjectData objects. The key will + correspond to the unique entries in key.""" + outdict = {} + for sub in self.subjects.values(): + if not sub.demographics.has_key(key): + raise Exception('All subjects must have key: %s' % key) + outk = sub.demographics[key] + if onlyvals is not None and not outk in onlyvals: + continue + if outdict.has_key(outk): + outdict[outk].append(sub) + else: + outdict[outk] = [sub] + return outdict + + def compareSubjectsByDemographics(self,dvkey,ivkey,onlyivs=None): + """Use a between-samples T or F to compare subjects on scores in + dvkey, sorted into groups by ivkey.""" + groups = self.getSubjectsByDemographics(ivkey) + gdata = {} + # Restrict to only certain iv levels. Useful for post hoc tests. + if onlyivs is not None: + for k in groups.keys(): + if not k in onlyivs: + del groups[k] + # Make a list for each group + for g in groups: + gdata[g] = [] + for sub in groups[g]: + if sub.demographics[dvkey] == '': + self.output('No demographic %s for %s (group %s), skipping' %( + sub.name,dvkey,g)) + continue + gdata[g].append(sub.demographics[dvkey]) + ngroups = len(gdata.keys()) + if ngroups < 2: + raise Exception('Need more than 1 group!') + elif ngroups == 2: + compfun = scipy.stats.ttest_ind + test = 'T' + else: + compfun = scipy.stats.f_oneway + test = 'F' + inference = dict(zip([test,'p'],compfun(*gdata.values()))) + for g in gdata.keys(): + inference[g+'raw'] = gdata[g] + inference[g+'mean'] = numpy.mean(gdata[g]) + inference[g+'std'] = numpy.std(gdata[g]) + inference[g+'n'] = len(gdata[g]) + inference[g+'sterr'] = numpy.std(gdata[g]) / \ + numpy.sqrt(len(gdata[g])) + return inference + + def getSubjectDemographics(self,key,subjectnames=None): + """Return a dict where each subject is a key, and the value is the + entry in demographics[key].""" + if subjectnames is None: + subjectnames = self.subjects.keys() + outdict = {} + for subname in subjectnames: + subobj = self.subjects[subname] + if not subobj.demographics.has_key(key): + raise Exception('All subjects must have key: %s' % key) + outdict[subname] = subobj.demographics[key] + return outdict + + def parametersToDemographics(self,seqname,acqrule='all'): + """Update each subject's demographics dict with the keys from the + parameters dict in acq.""" + for s in self.sub.values(): + seq = s.seq[seqname] + acqs = seq.resolveAcquisition(acqrule) + for a in acqs: + for k_in,v_in in a.parameters.iteritems(): + # Ugly conditional to avoid inconsistent column labels + if acqrule == 'leasttranslation': + k_out = '%s_leasttran_%s' % (seqname,k_in) + else: + k_out = '%s_%s' % (a.name,k_in) + s.demographics[k_out] = v_in + return + + def exportDemographics(self,outfile): + """Write a CSV table with demographics for all subjects.""" + F = open(outfile,'wb') + W = csv.writer(F,dialect='excel') + # First, make a master dict + mdict = {} + for subname,s in self.subjects.iteritems(): + for k,v in s.demographics.iteritems(): + # Skip data that isn't appropriate + if type(v) is list or type(v) is numpy.ndarray: + continue + # Remove empty keys + if k == '': + continue + if mdict.has_key(k): + mdict[k][subname] = str(v) + else: + mdict[k] = {} + mdict[k][subname] = str(v) + # Now we know the labels + collabs = mdict.keys() + collabs.sort() + # Begin writing out + W.writerow(['bb_subname']+collabs) + for subname in self.subjects.keys(): + row = [subname] + for k in collabs: + try: + d = mdict[k][subname] + except KeyError: + d = '' + row.append(d) + W.writerow(row) + del W + F.close() + return + + def addGroup(self,groupdir): + '''Initialise a group analysis.''' + if self.groups.has_key(groupdir): + if self.overwrite: + pass + elif not self.update: + raise Exception('%s already exists!' % groupdir) + self.output('%s already exists, skipping...' % groupdir) + return + self.groups[groupdir] = GroupData(os.path.join(self.anadir, + groupdir), overwrite=self.overwrite, update=self.update, + verbose=self.verbose) + return + +# An alternative function might be CopyData that simply grabs niftis from +# somewhere +class CBUMRI(StudyData): + """StudyData-derived object with functionality for CBU's + mridata system.""" + + def __init__(self,anadir,projectcodes=None,**kwargs): + # Initialise with standard StudyData + StudyData.__init__(self,anadir,projectcodes=projectcodes,\ + **kwargs) + + # sub-class specific information + self.rootdir='/mridata/cbu' + self.ncharignorefn=11 + self.indsseriesnfn=[7,10] + self.indprojcodeneg=-7 + + # Check if anything else came through + self.checkInputs(kwargs) + return + + def importCBUData(self,series,nslices=None): + """Iterate over subjects, converting DICOMs to NIFTI and copying to + anadir. Also initialise SequenceData and AcquisitionData objects as + necessary.""" + for sub in self.subjects.keys(): + # Find data + # (need underscore because sometimes subject codes are LONGER than + # meant to be) + subdir = glob.glob(os.path.join(self.rootdir,sub+'_*')) + + # Catch bad data + if not len(subdir): + raise Exception('No subject dir found: %s' % sub) + if len(subdir) > 1: + if self.projectcodes is None: + raise Exception('Duplicate subject directories:\n%s' % subdir) + # Need to work out if remaining dirs belong to project + # Attempt to disambiguate by project code + check = numpy.array([s[self.indprojcodeneg:] in self.projectcodes for s in + subdir],dtype='bool') + subdir = list(numpy.array(subdir)[check]) + if not len(subdir): + raise Exception('No subject dirs match project codes:\n%s' + % subdir) + + # Now we have a list of subdir, usually len==1, sometimes more + # Collect all series for this subject + self.output('Identifying series in\n%s' % subdir) + # Ascending project numbers should correspond to order of acq. + subdir.sort() + seriesdirs = [] + for sd in subdir: + dirs = glob.glob(os.path.join( + self.rootdir,sd,'*','*')) + # Ensure in order of acquisition + dirs.sort() + seriesdirs += dirs + # Now exact match to end of fn + goodseries = [i for i in seriesdirs if i[-len(series):]==series] + + # Initialise empty list of acquisitions + if self.sub[sub].seq.has_key(series): + if self.overwrite: + pass + elif not self.update: + raise Exception('%s and %s already exists!' % (sub,series)) + else: + self.sub[sub].seq[series] = SequenceData(update=self.update, + overwrite=self.overwrite,verbose=self.verbose) + + for ind,gs in enumerate(goodseries): + # Check for completeness + if nslices is not None: + ndcm = os.listdir(gs) + if len(ndcm) != nslices: + self.output('%s is incomplete, skipping...' % gs) + continue + fn = '%s_acq%03d' % (series,ind+1) + seriesoutdir = os.path.join(self.sub[sub].path,fn) + self.sub[sub].seq[series].acq[fn] = AcquisitionData( + seriesoutdir,update=self.update,overwrite=self.overwrite, + verbose=self.verbose) + # Don't reimport dicoms for existing series + if self.sub[sub].seq[series].acq[fn].imported: + continue + self.output('converting %s' % gs) + # TEMP HACK CODE + cmd = 'dcm2nii -o %s %s %s' % (seriesoutdir, + '-a N -c N -d N -e N -g N -i N -f N -p Y', + gs) + os.system(cmd) + #mricron.dcm2nii(gs,seriesoutdir) + self.output('finished conversion for %s' % sub) diff --git a/dti_motion b/dti_motion new file mode 100755 index 0000000..661cf23 --- /dev/null +++ b/dti_motion @@ -0,0 +1,48 @@ +#!/bin/sh + +#Script written by Mark Jenkinson of FMRIB; posted in reply to Charlotte's question on the FSL forum 11/01/2011 + +if [ $# -lt 1 ] ; then + echo "Usage: `basename $0` " + exit 1; +fi + +logfile=$1; +basenm=`basename $logfile .ecclog`; + +nums=`grep -n 'Final' $logfile | sed 's/:.*//'`; + +touch grot_ts.txt +touch grot.mat + +firsttime=yes; +m=1; +for n in $nums ; do + echo "Timepoint $m" + n1=`echo $n + 1 | bc` ; + n2=`echo $n + 5 | bc` ; + sed -n "$n1,${n2}p" $logfile > grot.mat ; + if [ $firsttime = yes ] ; then firsttime=no; cp grot.mat grot.refmat ; cp grot.mat grot.oldmat ; fi + absval=`$FSLDIR/bin/rmsdiff grot.mat grot.refmat $basenm`; + relval=`$FSLDIR/bin/rmsdiff grot.mat grot.oldmat $basenm`; + cp grot.mat grot.oldmat + echo $absval $relval >> ec_disp.txt ; + $FSLDIR/bin/avscale --allparams grot.mat $basenm | grep 'Rotation Angles' | sed 's/.* = //' >> ec_rot.txt ; + $FSLDIR/bin/avscale --allparams grot.mat $basenm | grep 'Translations' | sed 's/.* = //' >> ec_trans.txt ; + m=`echo $m + 1 | bc`; +done + +echo "absolute" > grot_labels.txt +echo "relative" >> grot_labels.txt + +$FSLDIR/bin/fsl_tsplot -i ec_disp.txt -t 'Eddy Current estimated mean displacement (mm)' -l grot_labels.txt -o ec_disp.png + +echo "x" > grot_labels.txt +echo "y" >> grot_labels.txt +echo "z" >> grot_labels.txt + +$FSLDIR/bin/fsl_tsplot -i ec_rot.txt -t 'Eddy Current estimated rotations (radians)' -l grot_labels.txt -o ec_rot.png +$FSLDIR/bin/fsl_tsplot -i ec_trans.txt -t 'Eddy Current estimated translations (mm)' -l grot_labels.txt -o ec_trans.png + +# clean up temp files +/bin/rm grot_labels.txt grot.oldmat grot.refmat grot.mat diff --git a/processes.py b/processes.py new file mode 100644 index 0000000..49497e5 --- /dev/null +++ b/processes.py @@ -0,0 +1,409 @@ +'''Objects for processing - the intermediate step where the files in a +StudyData-derived object are joined with relevant analyses and added to the +Tasks field in StudyData.''' + +import os +import glob +import batchbrain.base +import pdb +import numpy +import csv + +class DataProcess(batchbrain.base.ProcessingObject): + """Master object for things to do with processing imported data.""" + postrunfun = None + + def addPrefix(self,vol,pref): + filepath,file = os.path.split(vol) + if pref is None: + return vol + return os.path.join(filepath,'%s%s' % (pref,file)) + + def processCommand(self,cmd): + """send a command line string to the shell. Easily parallelised.""" + os.system(cmd) + return + + def ensureListofLen(self,inp,wantedlen): + objinp = getattr(self,inp) + if type(objinp) is list: + if len(objinp) == wantedlen: + return + if len(objinp) != 1: + raise Exception('Lists must be len %d or 1' % wantedlen) + else: + objinp = [objinp] + setattr(self,inp,objinp * wantedlen) + return + + def multiProcess(self,dt): + '''Method for generating multi-in 1-out command line processes. + Note that this _should_ work also with 1-in 1-out data.''' + newjobs = 0 + # First, figure out how many 'multi' means + # Get the len of any lists + listcands = ['inprefix','targetsequence', + 'inext','acqrule'] + lens = [len(getattr(self,x)) for x in listcands \ + if type(getattr(self,x)) is list] + if lens: + maxlen = max(lens) + else: + maxlen = 1 + # Now make each listcands item a list if it isn't already + for x in listcands: + self.ensureListofLen(x,maxlen) + + for sub in dt.subjects.keys(): + # list of lists + inputlist = [] + for ind in xrange(maxlen): + invols = dt.getPaths(sub,self.targetsequence[ind], + self.acqrule[ind],self.inprefix[ind], + self.inext[ind]) + inputlist.append(invols) + + maxlistlen = len(max(inputlist,key=len)) + # We may have to check that we always get the same number of + # volumes here (e.g., if different sequences), but for now, + # let's assume all is well... + for ind in xrange(maxlistlen): + # Pull out invol list + inputs = [x[ind] for x in inputlist] + # Output checking is necessary to avoid re-running on every + # iteration. Checks should be either based on checking dir + # contents (current solution) OR checking attributes in + # StudyData object. + # Figure out what to call outvol + output = self.getOutput(inputs,dt) + alreadydone = self.checkOutput(output) + # TODO actually make the outvol maybe. Also need to store + # some kind of reference to outfield to set pars. + if alreadydone and not self.overwrite: + self.output('%s already exists, skipping.' % output) + continue + cmd = self.process(inputs,output) + dt.addJobs(self.__class__.__name__,self.runner,cmd, + postrunfun=self.postrunfun) + newjobs+=1 + self.output('Done. Added %d new jobs to tasks.' % newjobs) + return + +class eddy_correct(DataProcess): + + def __init__(self,targetsequence,inprefix=None,outprefix='ec_', \ + refno=0,acqrule='all',**kwargs): + self.processname = 'FSL Eddy Correct' + self.targetsequence = targetsequence + self.inprefix = inprefix + self.outprefix = outprefix + self.refno = refno + self.runner = self.processCommand + self.acqrule = acqrule + self.logfn = self.outprefix + self.__class__.__name__ + '.log' + self.checkInputs(kwargs) + return + + def __call__(self,dt): + self.multiProcess(dt) + return + + def process(self,invol,outvol): + path,filename = os.path.split(invol[0]) + logfile = os.path.join(path,self.logfn) + self.initialiseLog(logfile) + + return 'eddy_correct %s %s %d >> %s' % (invol[0],outvol,self.refno, + logfile) + +class bet(DataProcess): + '''Brain extraction with BET from FSL. This variant generates a brain + mask only.''' + + def __init__(self,targetsequence,inprefix=None,outprefix='bet_', \ + f=0.5,acqrule='all',**kwargs): + self.processname = 'FSL BET Brain Extraction' + self.targetsequence = targetsequence + self.inprefix = inprefix + self.outprefix = outprefix + self.f = f + self.runner = self.processCommand + self.acqrule = acqrule + # the log file name ought to be the class name for transparency + self.logfn = self.outprefix + self.__class__.__name__ + '.log' + self.checkInputs(kwargs) + return + + def __call__(self,dt): + self.multiProcess(dt) + return + + def process(self,invol,output): + # Note that many other arguments are possible but not yet + # implemented + # Get base name + path,filename = os.path.split(invol[0]) + logfile = os.path.join(path,self.logfn) + self.initialiseLog(logfile) + maincmd = 'bet %s %s -m -n -f %f >> %s' % (invol[0],output, + self.f,logfile) + # BET has an unfortunate tendency to enforce a certain filename + # (output_mask.nii) - so rename output + ind = output.index(self.outext) + mvcmd = 'mv %s_mask%s %s' % (output[:ind],self.outext,output) + return maincmd + ';' + mvcmd + +class slicer(DataProcess): + '''Interface to FSL slicer utility. Write out diagnostic axials.''' + + def __init__(self,targetsequence,inprefix=None,outprefix='slicer_',\ + nskipaxial=3,imwidth=400,acqrule='all',**kwargs): + '''Identify 2 images (first is base, second overlay) by differences + in targetsequence AND/OR inprefix, inex, acqrule.''' + self.processname = 'FSL Slicer Image Output' + self.targetsequence = targetsequence + self.inprefix = inprefix + self.outprefix = outprefix + self.nskipaxial = nskipaxial + self.imwidth = imwidth + # NB slicer does NOT support other formats - don't change this! + self.outext = '.ppm' + self.acqrule = acqrule + self.runner = self.processCommand + # the log file name ought to be the class name for transparency + self.logfn = self.outprefix + self.__class__.__name__ + '.log' + self.checkInputs(kwargs) + return + + def __call__(self,dt): + self.multiProcess(dt) + return + + def process(self,invol,output): + '''FSL slicer method. input should be a list (first base, second + overlay), output string.''' + # Get base name + path,filename = os.path.split(invol[0]) + logfile = os.path.join(path,self.logfn) + self.initialiseLog(logfile) + return 'slicer %s %s -S %d %d %s >> %s' % (invol[0],invol[1], + self.nskipaxial,self.imwidth,output,logfile) + +class dtifit(DataProcess): + '''Interface to FSL dtifit.''' + + def __init__(self,targetsequence,inprefix=None,outprefix='dtifit_',\ + outext='*',acqrule='all',**kwargs): + '''Identify 2 images (first is base, second overlay) by differences + in targetsequence AND/OR inprefix, inex, acqrule.''' + self.processname = 'FSL DTIFit' + self.targetsequence = targetsequence + self.inprefix = inprefix + self.outprefix = outprefix + self.acqrule = acqrule + self.runner = self.processCommand + self.outext = outext + # the log file name ought to be the class name for transparency + self.logfn = self.outprefix + self.__class__.__name__ + '.log' + self.checkInputs(kwargs) + + return + + def __call__(self,dt): + self.multiProcess(dt) + return + + def process(self,invol,output): + '''FSL DTIFit method. Input should be a list (first DTI, second + mask), output string basename.''' + # Get base name + path,filename = os.path.split(invol[0]) + logfile = os.path.join(path,self.logfn) + self.initialiseLog(logfile) + # Find bvecs/bvals + bvec = glob.glob(os.path.join(path,'*.bvec')) + if len(bvec) != 1: + raise Exception('Expected 1 bvec only, got: \n%s' % bvec) + bvec = bvec[0] + bval = glob.glob(os.path.join(path,'*.bval')) + if len(bval) != 1: + raise Exception('Expected 1 bval only, got: \n%s' % bval) + bval = bval[0] + return 'dtifit -k %s -m %s -r %s -b %s -o %s >> %s' % (invol[0], + invol[1],bvec,bval,output.replace('*',''),logfile) + +class dti_motion(DataProcess): + '''Interface to FSL motion diagnostic bash script by Mark Jenkinson. NB + dti_motion bash script MUST be on your path!''' + + def __init__(self,targetsequence,inprefix='ec_',outprefix='dtimotion_',\ + outext='*',acqrule='all',**kwargs): + self.processname = 'FSL dti_motion' + self.targetsequence = targetsequence + self.inprefix = inprefix + self.outprefix = outprefix + self.acqrule = acqrule + self.runner = self.processCommand + self.outext = outext + # Generated files that we rename + self.outfilelist = ['ec_disp.png','ec_disp.txt','ec_rot.png', + 'ec_rot.txt','ec_trans.png','ec_trans.txt','grot_ts.txt'] + # the log file name ought to be the class name for transparency + self.logfn = self.outprefix + self.__class__.__name__ + '.log' + self.checkInputs(kwargs) + return + + def __call__(self,dt): + self.multiProcess(dt) + return + + def process(self,invol,output): + '''FSL motion estimate (dti_motion).''' + # Get base name + path,filename = os.path.split(invol[0]) + logfile = os.path.join(path,self.logfn) + self.initialiseLog(logfile) + # Find ecclog + ecclog = glob.glob(os.path.join(path,'*.ecclog')) + if len(ecclog) != 1: + raise Exception('Expected 1 ecclog only, got: \n%s' % ecclog) + ecclog = ecclog[0] + # This is the command - but NB must be executed from subject dir to + # avoid disastrous consequences + cdcmd = 'cd %s;' % path + inpath,infile = os.path.split(ecclog) + infn,inext = os.path.splitext(infile) + # Now with piping of log output + basecmd = 'dti_motion %s >> %s;' % (infile,logfile) + # Now one irritating thing about this script is that we can't + # control output file naming, so need to rename after calling + mvcmd = '' + for file in self.outfilelist: + fn,ext = os.path.splitext(file) + mvcmd += 'mv %s %s;' % (file,self.outprefix+infn+'_'+fn+ext) + return cdcmd+basecmd+mvcmd + +class dti_motion_toparams(DataProcess): + '''Place a stored list of motion parameters in the parameters field for + each acquisition.''' + + def __init__(self,targetsequence,inprefix='dti_motion_', \ + inext=['*ec_trans.txt','*ec_rot.txt'],acqrule='all',**kwargs): + self.processname = 'BB dti_motion to parameters' + self.targetsequence = targetsequence + self.inprefix = inprefix + self.inext = inext + # No output generated. + self.outprefix = '' + self.outext = '' + self.acqrule = acqrule + # NB things can go badly awry here if you mix up your inputs + self.parstoset = {'translation': self.inext[0], 'rotation': + self.inext[1]} + # No need to define self.runner or self.postrunfun since we have + # function handles that (presumably) overwrite + + # Generated files that we rename + # the log file name ought to be the class name for transparency + self.logfn = self.outprefix + self.__class__.__name__ + '.log' + self.checkInputs(kwargs) + return + + def __call__(self,dt): + self.multiProcess(dt) + return + + def getOutput(self,invol,sd): + '''Use input volume to deduce acq details. Overwrites default + DataProcess method that operates on files in the acq dir.''' + # Figure out which acq object is relevant + # Will be VERY interesting to see if changes to this object + # actually carry over to main sd object + return sd.pathToAcquisition(invol[0]) + + def checkOutput(self,output): + '''Check if the output (an acq object) contains the desired + parameters. Overwrite default DataProcess method that operates on + files in the acq dir.''' + allouts = [output.parameters.has_key(x) for x in self.parstoset] + # Only score as done if all pars were set + return all(allouts) + + def process(self,infiles,output): + '''Make a dict of parameter file names and their paths. Prepare a + single variable to be called together with runner (below)''' + pathdict = {} + # Find each parameter's associated file + for f,v in self.parstoset.iteritems(): + # Maybe a bit ambitious + hits = self.findStrInList(v.replace('*',''), + infiles) + if len(hits) != 1: + raise Exception('parstoset/inext mismatch for %s, got:\n' \ + % (f,hits)) + pathdict[f] = hits[0] + return [pathdict,output] + + def runner(self,inputs): + '''Get data out of inputs,store in parameters. Also summary stat + (SSQ).''' + # Unpack (deal) input list + pathdict,acq = inputs + resdict = {} + for measure,filepath in pathdict.iteritems(): + # Load the data + F = open(filepath,'r') + R = csv.reader(F,delimiter=' ') + # An annoying aspect of this is that a 4th garbage empty string + # column is included for unknown reasons. + resdict[measure] = numpy.array([i[0:3] for i in R],dtype='float') + return [resdict,acq] + + def postrunfun(self,outputs): + '''Add the list of resdicts to the SD object.''' + # Unpack + resdict,acq = outputs + for k,v in resdict.iteritems(): + # Update acq object + acq.parameters[k] = v + # But is this in place? + # Otherwise we can probably recover with pathToAcq + return + +class tbss_1_preproc(DataProcess): + '''First step of TBSS preprocessing.''' + + def __init__(self,group,**kwargs): + '''Initialise TBSS preprocessing. Unusual in that you only provide + group object.''' + self.processname = 'FSL TBSS Preprocessing 1' + self.targetsequence = targetsequence + self.inprefix = inprefix + self.outprefix = outprefix + self.f = f + self.runner = self.processCommand + self.acqrule = acqrule + # the log file name ought to be the class name for transparency + self.logfn = self.outprefix + self.__class__.__name__ + '.log' + self.checkInputs(kwargs) + return + + def __call__(self,dt): + self.multiProcess(dt) + return + + def process(self,invol,output): + # Note that many other arguments are possible but not yet + # implemented + # Get base name + path,filename = os.path.split(invol[0]) + logfile = os.path.join(path,self.logfn) + self.initialiseLog(logfile) + maincmd = 'bet %s %s -m -n -f %f >> %s' % (invol[0],output, + self.f,logfile) + # BET has an unfortunate tendency to enforce a certain filename + # (output_mask.nii) - so rename output + ind = output.index(self.outext) + mvcmd = 'mv %s_mask%s %s' % (output[:ind],self.outext,output) + return maincmd + ';' + mvcmd + diff --git a/runners.py b/runners.py new file mode 100644 index 0000000..ab58ddd --- /dev/null +++ b/runners.py @@ -0,0 +1,109 @@ +'''Map functions/objects that take the particular input from StudyData.tasks and +runs with it.''' +import sys +import os +import random + +def simpleRunner(sd): + '''Simplest possible task runner: go through in serial order on current + machine.''' + nsteps = len(sd.tasks) + if nsteps == 0: + sd.output('No outstanding processing steps in tasks') + return + sd.output('Running %d processing steps...' % len(sd.tasks)) + for tn,to in sd.tasks.iteritems(): + sd.output('Running %s with %d jobs' % (tn,len(to.jobs))) + outputs = map(to.func,to.jobs) + to.postRunUpdate(outputs) + # Clear out the task list + sd.output('Done.') + sd.tasks.clear() + return + +class IPClusterRunner(): + '''CBU-specific code for starting a IPython cluster and running + jobs.''' + + def __init__(self,nengines=None,targetmachines=None): + '''Initialise the cluster, either by re-connecting to a current + cluster or starting a new one.''' + import IPython.kernel + # Pull out the usual CBU machines + if targetmachines is None: + self.availablemachines = getCBUMachines() + else: + self.availablemachines = targetmachines + # The machines get started without the appropriate path settings, + # so need to bring the current machine's over + self.localpath = os.environ['PATH'] + + # In the absence of true load balance, we can at least randomise + random.shuffle(self.availablemachines) + self.mlist = self.availablemachines.copy() + + self.pophandles = [] + + # See if we already have a controller + try: + rc = IPython.parallel.Client() + except TimeOutError: + # Otherwise give me one + self.pophandles.append(self.startMachine( + self.randomEngine(),iptype='ipcontroller')) + rc = IPython.parallel.Client() + nrunning = len(rc.ids) + + if nengines is None: + if nrunning > 0: + # Assume you already have the engines you want running + return + else: + raise Exception('No engines available. Must \ + specify nengines>0') + + # Time to start some engines + while len(self.pophandles)+1 < nengines: + self.pophandles.append(self.startMachine( + self.randomEngine(),iptype='ipengine')) + # Need to slow down here so the ipcontroller can keep up + time.sleep(1) + return + #### TODO TODO - map function from IP, make consistent with seq run + + def __call__(sd): + '''Do a parallel map on each processing step in sequence.''' + nsteps = len(sd.tasks) + if nsteps == 0: + sd.output('No outstanding processing steps in tasks') + return + sd.output('Running %d processing steps...' % len(sd.tasks)) + for func,args in sd.tasks.iteritems(): + self.map(func,args) + sd.tasks.clear() + return + + def randomEngine(self): + try: + return self.mlist.pop() + except IndexError: + self.mlist = self.availablemachines.copy() + random.shuffle(self.mlist) + return self.mlist.pop() + + def getCBUMachines(self): + sys.path.append('/imaging/local/spm/loadshare') + import loadsharesettings + machines = loadsharesettings.machines() + # Only machines 43 and above are 64bit + return [i for i in machines if int(i[1:])>=43] + + def startMachine(self,id,iptype='ipengine'): + import subprocess + """Start an ipengine or ipcontroller. Return Popen handle for xterm + process.""" + xcmd = 'xterm -e ssh %s -x "' % id + pathcmd = 'setenv PATH %s;' % self.localpath + ipcmd = '%s"' % iptype + fullcmd = xcmd + pathcmd + ipcmd + return subprocess.Popen(fullcmd,shell=True)