diff --git a/narps_open/core/common.py b/narps_open/core/common.py index e40d4e9a..c40f2907 100644 --- a/narps_open/core/common.py +++ b/narps_open/core/common.py @@ -63,3 +63,24 @@ def list_intersection(list_1: list, list_2: list) -> list: - list, the intersection of list_1 and list_2 """ return [e for e in list_1 if e in list_2] + +def list_to_file(input_list: list, file_name: str = 'elements.tsv') -> str: + """ + Create a tsv file containing elements of the input list. + This function is meant to be used in a Nipype Function Node. + + Parameters : + - input_list: list + + Returns: + - output_file: path to the created file + """ + from os.path import abspath + output_file = abspath(file_name) + + # Write un element per line + with open(output_file, 'w') as writer: + for element in input_list: + writer.write(f'{element}\n') + + return output_file diff --git a/narps_open/pipelines/__init__.py b/narps_open/pipelines/__init__.py index 6c5239ca..6bbd2889 100644 --- a/narps_open/pipelines/__init__.py +++ b/narps_open/pipelines/__init__.py @@ -8,7 +8,7 @@ # List all the available pipelines and the corresponding class for each implemented_pipelines = { - '08MQ': None, + '08MQ': 'PipelineTeam08MQ', '0C7Q': None, '0ED6': None, '0H5E': None, @@ -43,7 +43,7 @@ 'B23O': None, 'B5I6': None, 'C22U': None, - 'C88N': None, + 'C88N': 'PipelineTeamC88N', 'DC61': None, 'E3B6': None, 'E6R3': None, diff --git a/narps_open/pipelines/team_08MQ.py b/narps_open/pipelines/team_08MQ.py new file mode 100644 index 00000000..9766c3ce --- /dev/null +++ b/narps_open/pipelines/team_08MQ.py @@ -0,0 +1,1050 @@ +#!/usr/bin/python +# coding: utf-8 + +""" Write the work of NARPS team 08MQ using Nipype """ + +from os.path import join +from itertools import product + +from nipype import Node, Workflow, MapNode +from nipype.interfaces.utility import IdentityInterface, Function, Merge, Split, Select +from nipype.interfaces.io import SelectFiles, DataSink +from nipype.interfaces.fsl import ( + # General usage + FSLCommand, ImageStats, + # Preprocessing + FAST, BET, ErodeImage, PrepareFieldmap, MCFLIRT, SliceTimer, + Threshold, Info, SUSAN, FLIRT, ApplyXFM, ConvertXFM, + # Analyses + Level1Design, FEATModel, L2Model, FILMGLS, + FLAMEO, Randomise, MultipleRegressDesign + ) +from nipype.interfaces.fsl.utils import Merge as MergeImages +from nipype.interfaces.fsl.maths import MultiImageMaths +from nipype.algorithms.confounds import CompCor +from nipype.algorithms.modelgen import SpecifyModel +from nipype.interfaces.ants import Registration, WarpTimeSeriesImageMultiTransform + +from narps_open.pipelines import Pipeline +from narps_open.data.task import TaskInformation +from narps_open.data.participants import get_group +from narps_open.core.common import ( + remove_file, list_intersection, elements_in_string, clean_list, list_to_file + ) + +# Setup FSL +FSLCommand.set_default_output_type('NIFTI_GZ') + +class PipelineTeam08MQ(Pipeline): + """ A class that defines the pipeline of team 08MQ """ + + def __init__(self): + super().__init__() + self.fwhm = 6.0 + self.team_id = '08MQ' + self.contrast_list = ['1', '2', '3'] + self.run_level_contasts = [ + ('positive_effect_gain', 'T', ['gain', 'loss'], [1, 0]), + ('positive_effect_loss', 'T', ['gain', 'loss'], [0, 1]), + ('negative_effect_loss', 'T', ['gain', 'loss'], [0, -1]) + ] + + def get_preprocessing(self): + """ Return a Nipype workflow describing the preprocessing part of the pipeline """ + + # IdentityInterface node - allows to iterate over subjects and runs + information_source = Node(IdentityInterface( + fields = ['subject_id', 'run_id']), + name = 'information_source') + information_source.iterables = [ + ('run_id', self.run_list), + ('subject_id', self.subject_list), + ] + + # SelectFiles node - to select necessary files + file_templates = { + 'anat': join('sub-{subject_id}', 'anat', 'sub-{subject_id}_T1w.nii.gz'), + 'func': join( + 'sub-{subject_id}', 'func', 'sub-{subject_id}_task-MGT_run-{run_id}_bold.nii.gz' + ), + 'sbref': join( + 'sub-{subject_id}', 'func', 'sub-{subject_id}_task-MGT_run-{run_id}_sbref.nii.gz' + ), + 'magnitude': join('sub-{subject_id}', 'fmap', 'sub-{subject_id}_magnitude1.nii.gz'), + 'phasediff': join('sub-{subject_id}', 'fmap', 'sub-{subject_id}_phasediff.nii.gz') + } + select_files = Node(SelectFiles(file_templates), name = 'select_files') + select_files.inputs.base_directory = self.directories.dataset_dir + + # DataSink Node - store the wanted results in the wanted directory + data_sink = Node(DataSink(), name = 'data_sink') + data_sink.inputs.base_directory = self.directories.output_dir + + # FAST Node - Bias field correction on anatomical images + bias_field_correction = Node(FAST(), name = 'bias_field_correction') + bias_field_correction.inputs.img_type = 1 # T1 image + bias_field_correction.inputs.output_biascorrected = True + + # BET Node - Brain extraction for anatomical images + brain_extraction_anat = Node(BET(), name = 'brain_extraction_anat') + brain_extraction_anat.inputs.frac = 0.5 + + # FAST Node - Segmentation of anatomical images + segmentation_anat = Node(FAST(), name = 'segmentation_anat') + segmentation_anat.inputs.no_bias = True # Bias field was already removed + segmentation_anat.inputs.segments = False # Only output partial volume estimation + segmentation_anat.inputs.probability_maps = False # Only output partial volume estimation + + # Split Node - Split probability maps as they output from the segmentation node + # outputs.out1 is CSF + # outputs.out2 is grey matter + # outputs.out3 is white matter + split_segmentation_maps = Node(Split(), name = 'split_segmentation_maps') + split_segmentation_maps.inputs.splits = [1, 1, 1] + split_segmentation_maps.inputs.squeeze = True # Unfold one-element splits removing the list + + # ANTs Node - Normalization of anatomical images to T1 MNI152 space + # https://github.com/ANTsX/ANTs/wiki/Anatomy-of-an-antsRegistration-call + normalization_anat = Node(Registration(), name = 'normalization_anat') + normalization_anat.inputs.fixed_image = Info.standard_image('MNI152_T1_2mm_brain.nii.gz') + normalization_anat.inputs.collapse_output_transforms = True + normalization_anat.inputs.convergence_threshold = [1e-06] + normalization_anat.inputs.convergence_window_size = [10] + normalization_anat.inputs.dimension = 3 + normalization_anat.inputs.initial_moving_transform_com = True + normalization_anat.inputs.radius_or_number_of_bins = [32, 32, 4] + normalization_anat.inputs.sampling_percentage = [0.25, 0.25, 1] + normalization_anat.inputs.sampling_strategy = ['Regular', 'Regular', 'None'] + normalization_anat.inputs.transforms = ['Rigid', 'Affine', 'SyN'] + normalization_anat.inputs.metric = ['MI', 'MI', 'CC'] + normalization_anat.inputs.transform_parameters = [(0.1,), (0.1,), (0.1, 3.0, 0.0)] + normalization_anat.inputs.metric_weight = [1.0]*3 + normalization_anat.inputs.shrink_factors = [[8, 4, 2, 1]]*3 + normalization_anat.inputs.smoothing_sigmas = [[3, 2, 1, 0]]*3 + normalization_anat.inputs.sigma_units = ['vox']*3 + normalization_anat.inputs.number_of_iterations = [ + [1000, 500, 250, 100], + [1000, 500, 250, 100], + [100, 70, 50, 20] + ] + normalization_anat.inputs.use_histogram_matching = True + normalization_anat.inputs.winsorize_lower_quantile = 0.005 + normalization_anat.inputs.winsorize_upper_quantile = 0.995 + + # Threshold Node - create white-matter mask + threshold_white_matter = Node(Threshold(), name = 'threshold_white_matter') + threshold_white_matter.inputs.thresh = 1 + + # Threshold Node - create CSF mask + threshold_csf = Node(Threshold(), name = 'threshold_csf') + threshold_csf.inputs.thresh = 1 + + # ErodeImage Node - Erode white-matter mask + erode_white_matter = Node(ErodeImage(), name = 'erode_white_matter') + erode_white_matter.inputs.kernel_shape = 'sphere' + erode_white_matter.inputs.kernel_size = 2.0 #mm + + # ErodeImage Node - Erode CSF mask + erode_csf = Node(ErodeImage(), name = 'erode_csf') + erode_csf.inputs.kernel_shape = 'sphere' + erode_csf.inputs.kernel_size = 1.5 #mm + + # BET Node - Brain extraction of magnitude images + brain_extraction_magnitude = Node(BET(), name = 'brain_extraction_magnitude') + brain_extraction_magnitude.inputs.frac = 0.5 + + # PrepareFieldmap Node - Convert phase and magnitude to fieldmap images + convert_to_fieldmap = Node(PrepareFieldmap(), name = 'convert_to_fieldmap') + + # BET Node - Brain extraction for high contrast functional images + brain_extraction_sbref = Node(BET(), name = 'brain_extraction_sbref') + brain_extraction_sbref.inputs.frac = 0.3 + brain_extraction_sbref.inputs.mask = True + brain_extraction_sbref.inputs.functional = False # 3D data + + # FLIRT Node - Align high contrast functional images to anatomical + # (i.e.: single-band reference images a.k.a. sbref) + coregistration_sbref = Node(FLIRT(), name = 'coregistration_sbref') + coregistration_sbref.inputs.interp = 'trilinear' + coregistration_sbref.inputs.cost = 'bbr' # boundary-based registration + + # ConvertXFM Node - Inverse coregistration transform, to get anat to func transform + inverse_func_to_anat = Node(ConvertXFM(), name = 'inverse_func_to_anat') + inverse_func_to_anat.inputs.invert_xfm = True + + # BET Node - Brain extraction for functional images + brain_extraction_func = Node(BET(), name = 'brain_extraction_func') + brain_extraction_func.inputs.frac = 0.3 + brain_extraction_func.inputs.mask = True + brain_extraction_func.inputs.functional = True + + # MCFLIRT Node - Motion correction of functional images + motion_correction = Node(MCFLIRT(), name = 'motion_correction') + motion_correction.inputs.cost = 'normcorr' + motion_correction.inputs.interpolation = 'spline' # should be 'trilinear' + motion_correction.inputs.save_plots = True # Save transformation parameters + + # Function Nodes get_slice_timings - Create a file with acquisition timing for each slide + slice_timings = Node(Function( + function = list_to_file, + input_names = ['input_list', 'file_name'], + output_names = ['output_file'] + ), name = 'slice_timings') + slice_timings.inputs.input_list = TaskInformation()['SliceTiming'] + slice_timings.inputs.file_name = 'slice_timings.tsv' + + # SliceTimer Node - Slice time correction + slice_time_correction = Node(SliceTimer(), name = 'slice_time_correction') + slice_time_correction.inputs.time_repetition = TaskInformation()['RepetitionTime'] + + # ImageStats Node - Compute median of voxel values to derive SUSAN's brightness_threshold + # -k option adds a mask + # -p computes the 50th percentile (= median) + # we do not need to filter on not-zero values (option -P) because a mask is passed + # Warning : these options must be passed in the right order + # (i.e.: apply mask then compute stat) + compute_median = Node(ImageStats(), name = 'compute_median') + compute_median.inputs.op_string = '-k %s -p 50' + + # SUSAN Node - smoothing of functional images + # we set brightness_threshold to .75x median of the input file, as performed by fMRIprep + smoothing = Node(SUSAN(), name = 'smoothing') + smoothing.inputs.fwhm = self.fwhm + compute_brightness_threshold = lambda x : .75 * x + + # ApplyXFM Node - Alignment of white matter to functional space + alignment_white_matter = Node(ApplyXFM(), name = 'alignment_white_matter') + alignment_white_matter.inputs.apply_xfm = True + alignment_white_matter.inputs.no_resample = True + + # ApplyXFM Node - Alignment of CSF to functional space + alignment_csf = Node(ApplyXFM(), name = 'alignment_csf') + alignment_csf.inputs.apply_xfm = True + alignment_csf.inputs.no_resample = True + + # FLIRT Node - Alignment of functional data to anatomical space + # To save disk space we force isotropic resampling with 2.0 mm voxel dimension + # instead of 1.0 mm as reference file would suggest. + # We have to use FLIRT instead of ApplyXFM because there is a bug with + # apply_isoxfm and the latter. + alignment_func_to_anat = Node(FLIRT(), name = 'alignment_func_to_anat') + alignment_func_to_anat.inputs.apply_isoxfm = 2.0 + alignment_func_to_anat.inputs.no_resample = True + + # ApplyTransforms Node - Alignment of functional brain mask to anatomical space + alignment_func_mask_to_anat = Node(ApplyXFM(), name = 'alignment_func_mask_to_anat') + alignment_func_mask_to_anat.inputs.apply_xfm = True + alignment_func_mask_to_anat.inputs.no_resample = True + + # Select Node - Change the order of transforms coming from ANTs Registration + reverse_transform_order = Node(Select(), name = 'reverse_transform_order') + reverse_transform_order.inputs.index = [1, 0] + + # ApplyWarp Node - Alignment of functional data to MNI space + alignment_func_to_mni = Node(WarpTimeSeriesImageMultiTransform(), + name = 'alignment_func_to_mni') + alignment_func_to_mni.inputs.reference_image = \ + Info.standard_image('MNI152_T1_2mm_brain.nii.gz') + + # ApplyWarp Node - Alignment of functional data to MNI space + alignment_func_mask_to_mni = Node(WarpTimeSeriesImageMultiTransform(), + name = 'alignment_func_mask_to_mni') + alignment_func_mask_to_mni.inputs.reference_image = \ + Info.standard_image('MNI152_T1_2mm_brain.nii.gz') + + # Merge Node - Merge the two masks (WM and CSF) in one input for the next node + merge_masks = Node(Merge(2), name = 'merge_masks') + + # CompCor Node - Compute anatomical confounds (regressors of no interest in the model) + # from the WM and CSF masks + compute_confounds = Node(CompCor(), name = 'compute_confounds') + compute_confounds.inputs.num_components = 4 + compute_confounds.inputs.merge_method = 'union' + compute_confounds.inputs.repetition_time = TaskInformation()['RepetitionTime'] + + # Merge Node - Merge file names to be removed after datasink node is performed + merge_removable_files = Node(Merge(8), name = 'merge_removable_files') + merge_removable_files.inputs.ravel_inputs = True + + # Function Nodes remove_files - Remove sizeable files once they aren't needed + remove_after_datasink = MapNode(Function( + function = remove_file, + input_names = ['_', 'file_name'], + output_names = [] + ), name = 'remove_after_datasink', iterfield = 'file_name') + remove_func = MapNode(Function( + function = remove_file, + input_names = ['_', 'file_name'], + output_names = [] + ), name = 'remove_func', iterfield = 'file_name') + + preprocessing = Workflow(base_dir = self.directories.working_dir, name = 'preprocessing') + preprocessing.config['execution']['stop_on_first_crash'] = 'true' + preprocessing.connect([ + # Inputs + (information_source, select_files, [ + ('subject_id', 'subject_id'), ('run_id', 'run_id') + ]), + + # Anatomical images + (select_files, bias_field_correction, [('anat', 'in_files')]), + (bias_field_correction, brain_extraction_anat, [('restored_image', 'in_file')]), + (brain_extraction_anat, segmentation_anat, [('out_file', 'in_files')]), + (brain_extraction_anat, normalization_anat, [('out_file', 'moving_image')]), + (segmentation_anat, split_segmentation_maps, [('partial_volume_files', 'inlist')]), + (split_segmentation_maps, threshold_white_matter, [('out3', 'in_file')]), + (split_segmentation_maps, threshold_csf, [('out1', 'in_file')]), + (threshold_white_matter, erode_white_matter, [('out_file', 'in_file')]), + (threshold_csf, erode_csf, [('out_file', 'in_file')]), + (erode_white_matter, alignment_white_matter, [('out_file', 'in_file')]), + (inverse_func_to_anat, alignment_white_matter, [('out_file', 'in_matrix_file')]), + (select_files, alignment_white_matter, [('sbref', 'reference')]), + (erode_csf, alignment_csf, [('out_file', 'in_file')]), + (inverse_func_to_anat, alignment_csf, [('out_file', 'in_matrix_file')]), + (select_files, alignment_csf, [('sbref', 'reference')]), + (alignment_csf, merge_masks, [('out_file', 'in1')]), + (alignment_white_matter, merge_masks, [('out_file', 'in2')]), + + # Field maps + (select_files, brain_extraction_magnitude, [('magnitude', 'in_file')]), + (brain_extraction_magnitude, convert_to_fieldmap, [('out_file', 'in_magnitude')]), + (select_files, convert_to_fieldmap, [('phasediff', 'in_phase')]), + + # High contrast functional volume + (select_files, brain_extraction_sbref, [('sbref', 'in_file')]), + (brain_extraction_sbref, coregistration_sbref, [('out_file', 'in_file')]), + (brain_extraction_anat, coregistration_sbref, [('out_file', 'reference')]), + (split_segmentation_maps, coregistration_sbref, [('out3', 'wm_seg')]), + (convert_to_fieldmap, coregistration_sbref, [('out_fieldmap', 'fieldmap')]), + (coregistration_sbref, inverse_func_to_anat, [('out_matrix_file', 'in_file')]), + + # Functional images + (select_files, brain_extraction_func, [('func', 'in_file')]), + (brain_extraction_func, motion_correction, [('out_file', 'in_file')]), + (select_files, motion_correction, [('sbref', 'ref_file')]), + (slice_timings, slice_time_correction, [('output_file', 'custom_timings')]), + (motion_correction, slice_time_correction, [('out_file', 'in_file')]), + (slice_time_correction, smoothing, [('slice_time_corrected_file', 'in_file')]), + (slice_time_correction, compute_median, [('slice_time_corrected_file', 'in_file')]), + (brain_extraction_func, compute_median, [('mask_file', 'mask_file')]), + (compute_median, smoothing, [ + (('out_stat', compute_brightness_threshold), 'brightness_threshold') + ]), + (smoothing, alignment_func_to_anat, [('smoothed_file', 'in_file')]), + (coregistration_sbref, alignment_func_to_anat, [ + ('out_matrix_file', 'in_matrix_file') + ]), + (brain_extraction_anat, alignment_func_to_anat, [('out_file', 'reference')]), + (brain_extraction_func, alignment_func_mask_to_anat, [('mask_file', 'in_file')]), + (coregistration_sbref, alignment_func_mask_to_anat, [ + ('out_matrix_file', 'in_matrix_file') + ]), + (brain_extraction_anat, alignment_func_mask_to_anat, [('out_file', 'reference')]), + (alignment_func_to_anat, alignment_func_to_mni, [('out_file', 'input_image')]), + (alignment_func_mask_to_anat, alignment_func_mask_to_mni, [ + ('out_file', 'input_image') + ]), + (normalization_anat, reverse_transform_order, [('forward_transforms', 'inlist')]), + (reverse_transform_order, alignment_func_to_mni, [('out', 'transformation_series')]), + (reverse_transform_order, alignment_func_mask_to_mni, [ + ('out', 'transformation_series') + ]), + (merge_masks, compute_confounds, [('out', 'mask_files')]), #Masks are in the func space + (slice_time_correction, compute_confounds, [ + ('slice_time_corrected_file', 'realigned_file') + ]), + + # Outputs of preprocessing + (motion_correction, data_sink, [('par_file', 'preprocessing.@par_file')]), + (compute_confounds, data_sink, [ + ('components_file', 'preprocessing.@components_file')]), + (alignment_func_to_mni, data_sink, [('output_image', 'preprocessing.@output_image')]), + (alignment_func_mask_to_mni, data_sink, [ + ('output_image', 'preprocessing.@output_mask')]), + + # File removals + (alignment_func_to_anat, remove_func, [('out_file', 'file_name')]), + (alignment_func_to_mni, remove_func, [('output_image', '_')]), + + (motion_correction, merge_removable_files, [('out_file', 'in1')]), + (slice_time_correction, merge_removable_files, [('slice_time_corrected_file', 'in2')]), + (smoothing, merge_removable_files, [('smoothed_file', 'in3')]), + (alignment_func_to_mni, merge_removable_files, [('output_image', 'in4')]), + (brain_extraction_func, merge_removable_files, [('out_file', 'in5')]), + (brain_extraction_anat, merge_removable_files, [('out_file', 'in6')]), + (bias_field_correction, merge_removable_files, [('restored_image', 'in7')]), + (normalization_anat, merge_removable_files, [('forward_transforms', 'in8')]), + (merge_removable_files, remove_after_datasink, [('out', 'file_name')]), + (data_sink, remove_after_datasink, [('out_file', '_')]) + ]) + + return preprocessing + + def get_preprocessing_outputs(self): + """ Return a list of the files generated by the preprocessing """ + + parameters = { + 'subject_id': self.subject_list, + 'run_id': self.run_list, + 'file': [ + 'components_file.txt', + 'sub-{subject_id}_task-MGT_run-{run_id}_bold_brain_mcf.nii.gz.par', + 'sub-{subject_id}_task-MGT_run-{run_id}_bold_brain_mcf_st_smooth_flirt_wtsimt.nii.gz', + 'sub-{subject_id}_task-MGT_run-{run_id}_bold_brain_mask_flirt_wtsimt.nii.gz' + ] + } + parameter_sets = product(*parameters.values()) + template = join( + self.directories.output_dir, + 'preprocessing', + '_run_id_{run_id}_subject_id_{subject_id}', + '{file}' + ) + + return [template.format(**dict(zip(parameters.keys(), parameter_values)))\ + for parameter_values in parameter_sets] + + def get_subject_information(event_file): + """ + Extract information from an event file, to setup the model. 4 regressors are extracted : + - event: a regressor with 4 second ON duration + - gain : a parametric modulation of events corresponding to gain magnitude. Mean centred. + - loss : a parametric modulation of events corresponding to loss magnitude. Mean centred. + - response : a regressor with 1 for accept and -1 for reject. Mean centred. + + Parameters : + - event_file : str, event file corresponding to the run and the subject to analyze + + Returns : + - subject_info : list of Bunch containing event information + """ + from nipype.interfaces.base import Bunch + + condition_names = ['event', 'gain', 'loss', 'response'] + onsets = {} + durations = {} + amplitudes = {} + + # Create dictionary items with empty lists + for condition in condition_names: + onsets.update({condition : []}) + durations.update({condition : []}) + amplitudes.update({condition : []}) + + # Parse information in the event_file + with open(event_file, 'rt') as file: + next(file) # skip the header + + for line in file: + info = line.strip().split() + onsets['event'].append(float(info[0])) + durations['event'].append(float(info[1])) + amplitudes['event'].append(1.0) + onsets['gain'].append(float(info[0])) + durations['gain'].append(float(info[1])) + amplitudes['gain'].append(float(info[2])) + onsets['loss'].append(float(info[0])) + durations['loss'].append(float(info[1])) + amplitudes['loss'].append(float(info[3])) + onsets['response'].append(float(info[0])) + durations['response'].append(float(info[1])) + if 'accept' in info[5]: + amplitudes['response'].append(1.0) + elif 'reject' in info[5]: + amplitudes['response'].append(-1.0) + else: + amplitudes['response'].append(0.0) + + return [ + Bunch( + conditions = condition_names, + onsets = [onsets[k] for k in condition_names], + durations = [durations[k] for k in condition_names], + amplitudes = [amplitudes[k] for k in condition_names], + regressor_names = None, + regressors = None) + ] + + def get_run_level_analysis(self): + """ Return a Nipype workflow describing the run level analysis part of the pipeline + + Returns: + - run_level_analysis : nipype.WorkFlow + """ + + # IdentityInterface node - allows to iterate over subjects and runs + information_source = Node(IdentityInterface( + fields = ['subject_id', 'run_id']), + name = 'information_source') + information_source.iterables = [ + ('run_id', self.run_list), + ('subject_id', self.subject_list), + ] + + # SelectFiles node - to select necessary files + templates = { + # Functional MRI - computed by preprocessing + 'func' : join(self.directories.output_dir, 'preprocessing', + '_run_id_{run_id}_subject_id_{subject_id}', + 'sub-{subject_id}_task-MGT_run-{run_id}_bold_brain_mcf_st_smooth_flirt_wtsimt.nii.gz' + ), + # Event file - from the original dataset + 'event' : join('sub-{subject_id}', 'func', + 'sub-{subject_id}_task-MGT_run-{run_id}_events.tsv' + ), + # Motion parameters - computed by preprocessing's motion_correction Node + 'motion' : join(self.directories.output_dir, 'preprocessing', + '_run_id_{run_id}_subject_id_{subject_id}', + 'sub-{subject_id}_task-MGT_run-{run_id}_bold_brain_mcf.nii.gz.par', + ) + } + select_files = Node(SelectFiles(templates), name = 'select_files') + select_files.inputs.base_directory = self.directories.dataset_dir + + # DataSink Node - store the wanted results in the wanted directory + data_sink = Node(DataSink(), name = 'data_sink') + data_sink.inputs.base_directory = self.directories.output_dir + + # Function Node get_subject_information - Get subject information from event files + subject_information = Node(Function( + function = self.get_subject_information, + input_names = ['event_file'], + output_names = ['subject_info'] + ), name = 'subject_information') + + # SpecifyModel Node - Generates a model + specify_model = Node(SpecifyModel(), name = 'specify_model') + specify_model.inputs.high_pass_filter_cutoff = 90 + specify_model.inputs.input_units = 'secs' + specify_model.inputs.time_repetition = TaskInformation()['RepetitionTime'] + specify_model.inputs.parameter_source = 'FSL' # Source of motion parameters. + + # Level1Design Node - Generate files for first level computation + model_design = Node(Level1Design(), 'model_design') + model_design.inputs.bases = { + 'dgamma':{'derivs' : True} # Canonical double gamma HRF plus temporal derivative + } + model_design.inputs.interscan_interval = TaskInformation()['RepetitionTime'] + model_design.inputs.model_serial_correlations = True + model_design.inputs.contrasts = self.run_level_contasts + + # FEATModel Node - Generate first level model + model_generation = Node(FEATModel(), name = 'model_generation') + + # FILMGLS Node - Estimate first level model + model_estimate = Node(FILMGLS(), name = 'model_estimate') + + # Create l1 analysis workflow and connect its nodes + run_level_analysis = Workflow( + base_dir = self.directories.working_dir, + name = 'run_level_analysis' + ) + run_level_analysis.connect([ + (information_source, select_files, [ + ('subject_id', 'subject_id'), ('run_id', 'run_id') + ]), + (select_files, subject_information, [('event', 'event_file')]), + (subject_information, specify_model, [('subject_info', 'subject_info')]), + (select_files, specify_model, [('motion', 'realignment_parameters')]), + (select_files, specify_model, [('func', 'functional_runs')]), + (specify_model, model_design, [('session_info', 'session_info')]), + (model_design, model_generation, [ + ('ev_files', 'ev_files'), + ('fsf_files', 'fsf_file')]), + (select_files, model_estimate, [('func', 'in_file')]), + (model_generation, model_estimate, [ + ('con_file', 'tcon_file'), + ('design_file', 'design_file')]), + (model_estimate, data_sink, [('results_dir', 'run_level_analysis.@results')]), + (model_generation, data_sink, [ + ('design_file', 'run_level_analysis.@design_file'), + ('design_image', 'run_level_analysis.@design_img')]), + ]) + + return run_level_analysis + + def get_run_level_outputs(self): + """ Return a list of the files generated by the run level analysis """ + + parameters = { + 'run_id' : self.run_list, + 'subject_id' : self.subject_list, + 'file' : [ + 'run0.mat', + 'run0.png' + ] + } + parameter_sets = product(*parameters.values()) + template = join( + self.directories.output_dir, + 'run_level_analysis', '_run_id_{run_id}_subject_id_{subject_id}','{file}' + ) + return_list = [template.format(**dict(zip(parameters.keys(), parameter_values)))\ + for parameter_values in parameter_sets] + + parameters = { + 'run_id' : self.run_list, + 'subject_id' : self.subject_list, + 'contrast_id' : self.contrast_list, + 'file' : [ + join('results', 'cope{contrast_id}.nii.gz'), + join('results', 'tstat{contrast_id}.nii.gz'), + join('results', 'varcope{contrast_id}.nii.gz'), + join('results', 'zstat{contrast_id}.nii.gz'), + ] + } + parameter_sets = product(*parameters.values()) + template = join( + self.directories.output_dir, + 'run_level_analysis', '_run_id_{run_id}_subject_id_{subject_id}','{file}' + ) + + return_list += [template.format(**dict(zip(parameters.keys(), parameter_values)))\ + for parameter_values in parameter_sets] + + return return_list + + def get_subject_level_analysis(self): + """ Return a Nipype workflow describing the subject level analysis part of the pipeline """ + + # IdentityInterface node - allows to iterate over subjects and contrasts + information_source = Node(IdentityInterface( + fields = ['subject_id', 'contrast_id']), + name = 'information_source') + information_source.iterables = [ + ('subject_id', self.subject_list), + ('contrast_id', self.contrast_list) + ] + + # SelectFiles Node - select necessary files + templates = { + 'copes' : join(self.directories.output_dir, 'run_level_analysis', + '_run_id_*_subject_id_{subject_id}', 'results', 'cope{contrast_id}.nii.gz'), + 'varcopes' : join(self.directories.output_dir, 'run_level_analysis', + '_run_id_*_subject_id_{subject_id}', 'results', 'varcope{contrast_id}.nii.gz'), + 'masks' : join(self.directories.output_dir, 'preprocessing', + '_run_id_*_subject_id_{subject_id}', + 'sub-{subject_id}_task-MGT_run-*_bold_brain_mask_flirt_wtsimt.nii.gz') + } + select_files = Node(SelectFiles(templates), name = 'select_files') + select_files.inputs.base_directory = self.directories.dataset_dir + + # DataSink Node - store the wanted results in the wanted directory + data_sink = Node(DataSink(), name = 'data_sink') + data_sink.inputs.base_directory = self.directories.output_dir + + # L2Model Node - Generate subject specific second level model + generate_model = Node(L2Model(), name = 'generate_model') + generate_model.inputs.num_copes = len(self.run_list) + + # Merge Node - Merge copes files for each subject + merge_copes = Node(MergeImages(), name = 'merge_copes') + merge_copes.inputs.dimension = 't' + + # Merge Node - Merge varcopes files for each subject + merge_varcopes = Node(MergeImages(), name = 'merge_varcopes') + merge_varcopes.inputs.dimension = 't' + + # Split Node - Split mask list to serve them as inputs of the MultiImageMaths node. + split_masks = Node(Split(), name = 'split_masks') + split_masks.inputs.splits = [1, len(self.run_list) - 1] + split_masks.inputs.squeeze = True # Unfold one-element splits removing the list + + # MultiImageMaths Node - Create a subject mask by + # computing the intersection of all run masks. + mask_intersection = Node(MultiImageMaths(), name = 'mask_intersection') + mask_intersection.inputs.op_string = '-mul %s ' * (len(self.run_list) - 1) + + # FLAMEO Node - Estimate model + estimate_model = Node(FLAMEO(), name = 'estimate_model') + estimate_model.inputs.run_mode = 'fe' # Fixed effect + + # Second level (single-subject, mean of all four scans) analyses: Fixed effects analysis. + subject_level_analysis = Workflow( + base_dir = self.directories.working_dir, + name = 'subject_level_analysis') + subject_level_analysis.connect([ + (information_source, select_files, [ + ('subject_id', 'subject_id'), + ('contrast_id', 'contrast_id')]), + (select_files, merge_copes, [('copes', 'in_files')]), + (select_files, merge_varcopes, [('varcopes', 'in_files')]), + (select_files, split_masks, [('masks', 'inlist')]), + (split_masks, mask_intersection, [('out1', 'in_file')]), + (split_masks, mask_intersection, [('out2', 'operand_files')]), + (merge_copes, estimate_model, [('merged_file', 'cope_file')]), + (merge_varcopes, estimate_model, [('merged_file', 'var_cope_file')]), + (mask_intersection, estimate_model, [('out_file', 'mask_file')]), + (generate_model, estimate_model, [ + ('design_mat', 'design_file'), + ('design_con', 't_con_file'), + ('design_grp', 'cov_split_file')]), + (mask_intersection, data_sink, [('out_file', 'subject_level_analysis.@mask')]), + (estimate_model, data_sink, [ + ('zstats', 'subject_level_analysis.@stats'), + ('tstats', 'subject_level_analysis.@tstats'), + ('copes', 'subject_level_analysis.@copes'), + ('var_copes', 'subject_level_analysis.@varcopes')])]) + + return subject_level_analysis + + def get_subject_level_outputs(self): + """ Return a list of the files generated by the subject level analysis """ + + parameters = { + 'contrast_id' : self.contrast_list, + 'subject_id' : self.subject_list, + 'file' : ['cope1.nii.gz', 'tstat1.nii.gz', 'varcope1.nii.gz', 'zstat1.nii.gz'] + } + parameter_sets = product(*parameters.values()) + template = join( + self.directories.output_dir, + 'subject_level_analysis', '_contrast_id_{contrast_id}_subject_id_{subject_id}','{file}' + ) + + return [template.format(**dict(zip(parameters.keys(), parameter_values)))\ + for parameter_values in parameter_sets] + + def get_one_sample_t_test_regressors(subject_list: list) -> dict: + """ + Create dictionary of regressors for one sample t-test group analysis. + + Parameters: + - subject_list: ids of subject in the group for which to do the analysis + + Returns: + - dict containing named lists of regressors. + """ + + return dict(group_mean = [1 for _ in subject_list]) + + def get_two_sample_t_test_regressors( + equal_range_ids: list, + equal_indifference_ids: list, + subject_list: list, + ) -> dict: + """ + Create dictionary of regressors for two sample t-test group analysis. + + Parameters: + - equal_range_ids: ids of subjects in equal range group + - equal_indifference_ids: ids of subjects in equal indifference group + - subject_list: ids of subject for which to do the analysis + + Returns: + - regressors, dict: containing named lists of regressors. + - groups, list: group identifiers to distinguish groups in FSL analysis. + """ + + # Create 2 lists containing n_sub values which are + # * 1 if the participant is on the group + # * 0 otherwise + equal_range_regressors = [1 if i in equal_range_ids else 0 for i in subject_list] + equal_indifference_regressors = [ + 1 if i in equal_indifference_ids else 0 for i in subject_list + ] + + # Create regressors output : a dict with the two list + regressors = dict( + equalRange = equal_range_regressors, + equalIndifference = equal_indifference_regressors + ) + + # Create groups outputs : a list with 1 for equalRange subjects and 2 for equalIndifference + groups = [1 if i == 1 else 2 for i in equal_range_regressors] + + return regressors, groups + + def get_group_level_analysis(self): + """ Return all workflows for the group level analysis. """ + + methods = ['equalRange', 'equalIndifference', 'groupComp'] + return [self.get_group_level_analysis_sub_workflow(method) for method in methods] + + def get_group_level_analysis_sub_workflow(self, method): + """ + Return a workflow for the group level analysis. + + Parameters: + - method: one of 'equalRange', 'equalIndifference' or 'groupComp' + + Returns: + - group_level_analysis: nipype.WorkFlow + """ + # Infosource Node - iterate over the contrasts generated by the subject level analysis + information_source = Node( + IdentityInterface( + fields = ['contrast_id'] + ), + name = 'information_source', + ) + information_source.iterables = [('contrast_id', self.contrast_list)] + + # SelectFiles Node - select necessary files + templates = { + 'copes' : join(self.directories.output_dir, 'subject_level_analysis', + '_contrast_id_{contrast_id}_subject_id_*', 'cope1.nii.gz'), + 'varcopes' : join(self.directories.output_dir, 'subject_level_analysis', + '_contrast_id_{contrast_id}_subject_id_*', 'varcope1.nii.gz'), + 'masks' : join(self.directories.output_dir, 'subject_level_analysis', + '_contrast_id_{contrast_id}_subject_id_*', + 'sub-*_task-MGT_run-*_bold_brain_mask_flirt_wtsimt_maths.nii.gz') + } + select_files = Node(SelectFiles(templates), name = 'select_files') + select_files.inputs.base_directory = self.directories.dataset_dir + select_files.inputs.force_list = True + + # Datasink Node - save important files + data_sink = Node(DataSink(), name = 'data_sink') + data_sink.inputs.base_directory = self.directories.output_dir + + # Function Node elements_in_string + # Get contrast of parameter estimates (cope) for these subjects + # Note : using a MapNode with elements_in_string requires using clean_list to remove + # None values from the out_list + get_copes = MapNode(Function( + function = elements_in_string, + input_names = ['input_str', 'elements'], + output_names = ['out_list'] + ), + name = 'get_copes', iterfield = 'input_str' + ) + + # Function Node elements_in_string + # Get variance of the estimated copes (varcope) for these subjects + # Note : using a MapNode with elements_in_string requires using clean_list to remove + # None values from the out_list + get_varcopes = MapNode(Function( + function = elements_in_string, + input_names = ['input_str', 'elements'], + output_names = ['out_list'] + ), + name = 'get_varcopes', iterfield = 'input_str' + ) + + # Merge Node - Merge cope files + merge_copes = Node(MergeImages(), name = 'merge_copes') + merge_copes.inputs.dimension = 't' + + # Merge Node - Merge cope files + merge_varcopes = Node(MergeImages(), name = 'merge_varcopes') + merge_varcopes.inputs.dimension = 't' + + # Split Node - Split mask list to serve them as inputs of the MultiImageMaths node. + split_masks = Node(Split(), name = 'split_masks') + split_masks.inputs.splits = [1, len(self.subject_list) - 1] + split_masks.inputs.squeeze = True + + # MultiImageMaths Node - Create a group mask by + # computing the intersection of all subject masks. + mask_intersection = Node(MultiImageMaths(), name = 'mask_intersection') + mask_intersection.inputs.op_string = '-mul %s ' * (len(self.subject_list) - 1) + + # MultipleRegressDesign Node - Specify model + specify_model = Node(MultipleRegressDesign(), name = 'specify_model') + + # FLAMEO Node - Estimate model + estimate_model = Node(FLAMEO(), name = 'estimate_model') + estimate_model.inputs.run_mode = 'ols' # Ordinary least squares + + # Randomise Node - + randomise = Node(Randomise(), name = 'randomise') + randomise.inputs.num_perm = 10000 + randomise.inputs.tfce = True + randomise.inputs.vox_p_values = True + randomise.inputs.c_thresh = 0.05 + randomise.inputs.tfce_E = 0.01 + + # Compute the number of participants used to do the analysis + nb_subjects = len(self.subject_list) + + # Declare the workflow + group_level_analysis = Workflow( + base_dir = self.directories.working_dir, + name = f'group_level_analysis_{method}_nsub_{nb_subjects}' + ) + group_level_analysis.connect([ + (information_source, select_files, [('contrast_id', 'contrast_id')]), + (select_files, get_copes, [('copes', 'input_str')]), + (select_files, get_varcopes, [('varcopes', 'input_str')]), + (select_files, split_masks, [('masks', 'inlist')]), + (split_masks, mask_intersection, [('out1', 'in_file')]), + (split_masks, mask_intersection, [('out2', 'operand_files')]), + (get_copes, merge_copes, [(('out_list', clean_list), 'in_files')]), + (get_varcopes, merge_varcopes,[(('out_list', clean_list), 'in_files')]), + (merge_copes, estimate_model, [('merged_file', 'cope_file')]), + (merge_varcopes, estimate_model, [('merged_file', 'var_cope_file')]), + (mask_intersection, estimate_model, [('out_file', 'mask_file')]), + (specify_model, estimate_model, [ + ('design_mat', 'design_file'), + ('design_con', 't_con_file'), + ('design_grp', 'cov_split_file') + ]), + (merge_copes, randomise, [('merged_file', 'in_file')]), + (mask_intersection, randomise, [('out_file', 'mask')]), + (specify_model, randomise, [ + ('design_mat', 'design_mat'), + ('design_con', 'tcon') + ]), + (randomise, data_sink, [ + ('t_corrected_p_files', + f'group_level_analysis_{method}_nsub_{nb_subjects}.@tcorpfile'), + ('tstat_files', f'group_level_analysis_{method}_nsub_{nb_subjects}.@tstat') + ]), + (estimate_model, data_sink, [ + ('zstats', f'group_level_analysis_{method}_nsub_{nb_subjects}.@zstats'), + ('tstats', f'group_level_analysis_{method}_nsub_{nb_subjects}.@tstats') + ]) + ]) + + if method in ('equalRange', 'equalIndifference'): + + # Setup a one sample t-test + specify_model.inputs.contrasts = [ + ('Group', 'T', ['group_mean'], [1]), + ('Group', 'T', ['group_mean'], [-1]) + ] + + # Function Node get_group_subjects - Get subjects in the group and in the subject_list + get_group_subjects = Node(Function( + function = list_intersection, + input_names = ['list_1', 'list_2'], + output_names = ['out_list'] + ), + name = 'get_group_subjects' + ) + get_group_subjects.inputs.list_1 = get_group(method) + get_group_subjects.inputs.list_2 = self.subject_list + + # Function Node get_one_sample_t_test_regressors + # Get regressors in the equalRange and equalIndifference method case + regressors_one_sample = Node( + Function( + function = self.get_one_sample_t_test_regressors, + input_names = ['subject_list'], + output_names = ['regressors'] + ), + name = 'regressors_one_sample', + ) + + # Add missing connections + group_level_analysis.connect([ + (get_group_subjects, get_copes, [('out_list', 'elements')]), + (get_group_subjects, get_varcopes, [('out_list', 'elements')]), + (get_group_subjects, regressors_one_sample, [('out_list', 'subject_list')]), + (regressors_one_sample, specify_model, [('regressors', 'regressors')]) + ]) + + elif method == 'groupComp': + + # Select copes and varcopes corresponding to the selected subjects + # Indeed the SelectFiles node asks for all (*) subjects available + get_copes.inputs.elements = self.subject_list + get_varcopes.inputs.elements = self.subject_list + + # Setup a two sample t-test + specify_model.inputs.contrasts = [( + 'Eq range vs Eq indiff in loss', + 'T', + ['equalRange', 'equalIndifference'], + [1, -1] + )] + + # Function Node get_equal_range_subjects + # Get subjects in the equalRange group and in the subject_list + get_equal_range_subjects = Node(Function( + function = list_intersection, + input_names = ['list_1', 'list_2'], + output_names = ['out_list'] + ), + name = 'get_equal_range_subjects' + ) + get_equal_range_subjects.inputs.list_1 = get_group('equalRange') + get_equal_range_subjects.inputs.list_2 = self.subject_list + + # Function Node get_equal_indifference_subjects + # Get subjects in the equalIndifference group and in the subject_list + get_equal_indifference_subjects = Node(Function( + function = list_intersection, + input_names = ['list_1', 'list_2'], + output_names = ['out_list'] + ), + name = 'get_equal_indifference_subjects' + ) + get_equal_indifference_subjects.inputs.list_1 = get_group('equalIndifference') + get_equal_indifference_subjects.inputs.list_2 = self.subject_list + + # Function Node get_two_sample_t_test_regressors + # Get regressors in the groupComp method case + regressors_two_sample = Node( + Function( + function = self.get_two_sample_t_test_regressors, + input_names = [ + 'equal_range_ids', + 'equal_indifference_ids', + 'subject_list', + ], + output_names = ['regressors', 'groups'] + ), + name = 'regressors_two_sample', + ) + regressors_two_sample.inputs.subject_list = self.subject_list + + # Add missing connections + group_level_analysis.connect([ + (get_equal_range_subjects, regressors_two_sample, [ + ('out_list', 'equal_range_ids') + ]), + (get_equal_indifference_subjects, regressors_two_sample, [ + ('out_list', 'equal_indifference_ids') + ]), + (regressors_two_sample, specify_model, [ + ('regressors', 'regressors'), + ('groups', 'groups')]) + ]) + + return group_level_analysis + + def get_hypotheses_outputs(self): + """ Return the names of the files used by the team to answer the hypotheses of NARPS. """ + + nb_sub = len(self.subject_list) + files = [ + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_1', 'randomise_tfce_corrp_tstat1.nii.gz'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_1', 'zstat1.nii.gz'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_1', 'randomise_tfce_corrp_tstat1.nii.gz'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_1', 'zstat1.nii.gz'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_1', 'randomise_tfce_corrp_tstat1.nii.gz'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_1', 'zstat1.nii.gz'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_1', 'randomise_tfce_corrp_tstat1.nii.gz'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_1', 'zstat1.nii.gz'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_2', 'randomise_tfce_corrp_tstat2.nii.gz'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_2', 'zstat2.nii.gz'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_2', 'randomise_tfce_corrp_tstat2.nii.gz'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_2', 'zstat2.nii.gz'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_2', 'randomise_tfce_corrp_tstat1.nii.gz'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_2', 'zstat1.nii.gz'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_2', 'randomise_tfce_corrp_tstat1.nii.gz'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_2', 'zstat1.nii.gz'), + join(f'group_level_analysis_groupComp_nsub_{nb_sub}', + '_contrast_id_2', 'randomise_tfce_corrp_tstat1.nii.gz'), + join(f'group_level_analysis_groupComp_nsub_{nb_sub}', + '_contrast_id_2', 'zstat1.nii.gz') + ] + return [join(self.directories.output_dir, f) for f in files] diff --git a/narps_open/pipelines/team_2T6S.py b/narps_open/pipelines/team_2T6S.py index 6c42201a..22f444a5 100755 --- a/narps_open/pipelines/team_2T6S.py +++ b/narps_open/pipelines/team_2T6S.py @@ -502,6 +502,7 @@ def get_group_level_analysis_sub_workflow(self, method): height_threshold = 0.001, height_threshold_type = 'p-value', force_activation = True), name = 'threshold', iterfield = ['stat_image', 'contrast_index']) + threshold.synchronize = True l2_analysis = Workflow( base_dir = self.directories.working_dir, @@ -512,7 +513,8 @@ def get_group_level_analysis_sub_workflow(self, method): (selectfiles_groupanalysis, sub_contrasts, [ ('contrast', 'file_list'), ('participants', 'participants_file')]), - (estimate_model, estimate_contrast, [('spm_mat_file', 'spm_mat_file'), + (estimate_model, estimate_contrast, [ + ('spm_mat_file', 'spm_mat_file'), ('residual_image', 'residual_image'), ('beta_images', 'beta_images')]), (estimate_contrast, threshold, [('spm_mat_file', 'spm_mat_file'), @@ -528,11 +530,9 @@ def get_group_level_analysis_sub_workflow(self, method): if method in ('equalRange', 'equalIndifference'): contrasts = [('Group', 'T', ['mean'], [1]), ('Group', 'T', ['mean'], [-1])] - threshold.inputs.contrast_index = [1, 2] - threshold.synchronize = True - ## Specify design matrix + # Specify design matrix one_sample_t_test_design = Node(OneSampleTTestDesign(), name = 'one_sample_t_test_design') @@ -543,11 +543,9 @@ def get_group_level_analysis_sub_workflow(self, method): elif method == 'groupComp': contrasts = [ ('Eq range vs Eq indiff in loss', 'T', ['Group_{1}', 'Group_{2}'], [-1, 1])] - threshold.inputs.contrast_index = [1] - threshold.synchronize = True - # Node for the design matrix + # Specify design matrix two_sample_t_test_design = Node(TwoSampleTTestDesign(), name = 'two_sample_t_test_design') diff --git a/narps_open/pipelines/team_C88N.py b/narps_open/pipelines/team_C88N.py index 723b4fa7..a6cc9fea 100755 --- a/narps_open/pipelines/team_C88N.py +++ b/narps_open/pipelines/team_C88N.py @@ -1,574 +1,645 @@ -from nipype.interfaces.spm import (Smooth, OneSampleTTestDesign, EstimateModel, EstimateContrast, Level1Design, - TwoSampleTTestDesign) -from nipype.interfaces.spm import Threshold -from nipype.algorithms.modelgen import SpecifySPMModel +#!/usr/bin/python +# coding: utf-8 + +""" Write the work of NARPS' team C88N using Nipype """ + +from os.path import join +from itertools import product + +from nipype import Workflow, Node, MapNode from nipype.interfaces.utility import IdentityInterface, Function from nipype.interfaces.io import SelectFiles, DataSink +from nipype.interfaces.spm import ( + Smooth, + OneSampleTTestDesign, EstimateModel, EstimateContrast, + Level1Design, TwoSampleTTestDesign, Threshold + ) +from nipype.algorithms.modelgen import SpecifySPMModel from nipype.algorithms.misc import Gunzip -from nipype import Workflow, Node, MapNode -from nipype.interfaces.base import Bunch - -from os.path import join as opj -import os -import json - -def get_subject_infos_gain(event_files): - ''' - Create Bunchs for specifySPMModel. - Here, the team wanted to concatenate runs and used RT (response time) for duration except for NoResponse trials - for which the duration was set to 4. - Gain and loss amounts were used as parametric regressors. - - Parameters : - - event_files : list of files containing events information for each run - - Returns : - - subject_info : list of Bunch for 1st level analysis. - ''' - from os.path import join as opj - from nipype.interfaces.base import Bunch - - cond_names = ['trial'] - onset = {} - duration = {} - weights_gain = {} - weights_loss = {} - runs = ['01', '02', '03', '04'] - - for r in range(len(runs)): # Loop over number of runs. - onset.update({s + '_run' + str(r+1) : [] for s in cond_names}) # creates dictionary items with empty lists - duration.update({s + '_run' + str(r+1) : [] for s in cond_names}) - weights_gain.update({'gain_run' + str(r+1) : []}) - weights_loss.update({'loss_run' + str(r+1) : []}) - - # subject_id = '001' - # file = sub-001_func_sub-001_task-MGT_run-01_events.tsv - for r, f_events in enumerate(event_files): - with open(f_events, 'rt') as f: - next(f) # skip the header - - for line in f: - info = line.strip().split() - - for cond in cond_names: - val = cond + '_run' + str(r+1) # trial_run1 - val_gain = 'gain_run' + str(r+1) # gain_run1 - val_loss = 'loss_run' + str(r+1) # loss_run1 - onset[val].append(float(info[0])) # onsets for trial_run1 - duration[val].append(float(0)) # durations for trial : 0 - weights_gain[val_gain].append(float(info[2])) # weights gain for trial_run1 - weights_loss[val_loss].append(float(info[3])) # weights loss for trial_run1 - - # Bunching is done per run, i.e. trial_run1, trial_run2, etc. - # But names must not have '_run1' etc because we concatenate runs - subject_info = [] - for r in range(len(runs)): - - cond = [c + '_run' + str(r+1) for c in cond_names] - gain = 'gain_run' + str(r+1) - loss = 'loss_run' + str(r+1) - - subject_info.insert(r, - Bunch(conditions=cond_names, - onsets=[onset[k] for k in cond], - durations=[duration[k] for k in cond], - amplitudes=None, - tmod=None, - pmod=[Bunch(name=['loss', 'gain'], - poly=[1, 1], - param=[weights_loss[loss], - weights_gain[gain]])], - regressor_names=None, - regressors=None)) - - return subject_info - -def get_subject_infos_loss(event_files): - ''' - Create Bunchs for specifySPMModel. - Here, the team wanted to concatenate runs and used RT (response time) for duration except for NoResponse trials - for which the duration was set to 4. - Gain and loss amounts were used as parametric regressors. - - Parameters : - - event_files : list of files containing events information for each run - - Returns : - - subject_info : list of Bunch for 1st level analysis. - ''' - from os.path import join as opj - from nipype.interfaces.base import Bunch - - cond_names = ['trial'] - onset = {} - duration = {} - weights_gain = {} - weights_loss = {} - runs = ['01', '02', '03', '04'] - - for r in range(len(runs)): # Loop over number of runs. - onset.update({s + '_run' + str(r+1) : [] for s in cond_names}) # creates dictionary items with empty lists - duration.update({s + '_run' + str(r+1) : [] for s in cond_names}) - weights_gain.update({'gain_run' + str(r+1) : []}) - weights_loss.update({'loss_run' + str(r+1) : []}) - - # subject_id = '001' - # file = sub-001_func_sub-001_task-MGT_run-01_events.tsv - for r, f_events in enumerate(event_files): - with open(f_events, 'rt') as f: - next(f) # skip the header - - for line in f: - info = line.strip().split() - - for cond in cond_names: - val = cond + '_run' + str(r+1) # trial_run1 - val_gain = 'gain_run' + str(r+1) # gain_run1 - val_loss = 'loss_run' + str(r+1) # loss_run1 - onset[val].append(float(info[0])) # onsets for trial_run1 - duration[val].append(float(0)) # durations for trial : 0 - weights_gain[val_gain].append(float(info[2])) # weights gain for trial_run1 - weights_loss[val_loss].append(float(info[3])) # weights loss for trial_run1 - - # Bunching is done per run, i.e. trial_run1, trial_run2, etc. - # But names must not have '_run1' etc because we concatenate runs - subject_info = [] - for r in range(len(runs)): - - cond = [c + '_run' + str(r+1) for c in cond_names] - gain = 'gain_run' + str(r+1) - loss = 'loss_run' + str(r+1) - - subject_info.insert(r, - Bunch(conditions=cond_names, - onsets=[onset[k] for k in cond], - durations=[duration[k] for k in cond], - amplitudes=None, - tmod=None, - pmod=[Bunch(name=['gain', 'loss'], - poly=[1, 1], - param=[weights_gain[gain], weights_loss[loss]])], - regressor_names=None, - regressors=None)) - - return subject_info - -def get_contrasts_gain(subject_id): - ''' - Create the list of tuples that represents contrasts. - Each contrast is in the form : - (Name,Stat,[list of condition names],[weights on those conditions]) - - Parameters: - - subject_id: str, ID of the subject - - Returns: - - contrasts: list of tuples, list of contrasts to analyze - ''' - # list of condition names - conditions = ['trialxloss^1', 'trialxgain^1'] - - # create contrasts - effect_gain = ('effect_of_gain', 'T', conditions, [0, 1]) - - # contrast list - contrasts = [effect_gain] - - return contrasts - -def get_contrasts_loss(subject_id): - ''' - Create the list of tuples that represents contrasts. - Each contrast is in the form : - (Name,Stat,[list of condition names],[weights on those conditions]) - - Parameters: - - subject_id: str, ID of the subject - - Returns: - - contrasts: list of tuples, list of contrasts to analyze - ''' - # list of condition names - conditions = ['trialxgain^1', 'trialxloss^1'] - - # create contrasts - positive_effect_loss = ('positive_effect_of_loss', 'T', conditions, [0, 1]) - - negative_effect_loss = ('negative_effect_of_loss', 'T', conditions, [0, -1]) - - # contrast list - contrasts = [positive_effect_loss, negative_effect_loss] - - return contrasts - -def rm_gunzip_files(files, subject_id, result_dir, working_dir): - import shutil - from os.path import join as opj - - gunzip_dir = opj(result_dir, working_dir, 'l1_analysis', f"_subject_id_{subject_id}", 'gunzip_func') - - try: - shutil.rmtree(gunzip_dir) - except OSError as e: - print(e) - else: - print("The directory is deleted successfully") - - return files - -def rm_smoothed_files(files, subject_id, result_dir, working_dir): - import shutil - from os.path import join as opj - - smooth_dir = opj(result_dir, working_dir, 'l1_analysis', f"_subject_id_{subject_id}", 'smooth') - - try: - shutil.rmtree(smooth_dir) - except OSError as e: - print(e) - else: - print("The directory is deleted successfully") - - return files - -def get_l1_analysis(subject_list, TR, fwhm, run_list, exp_dir, result_dir, working_dir, output_dir): - """ - Returns the first level analysis workflow. - - Parameters: - - exp_dir: str, directory where raw data are stored - - result_dir: str, directory where results will be stored - - working_dir: str, name of the sub-directory for intermediate results - - output_dir: str, name of the sub-directory for final results - - subject_list: list of str, list of subject for which you want to do the analysis - - run_list: list of str, list of runs for which you want to do the analysis - - fwhm: float, fwhm for smoothing step - - TR: float, time repetition used during acquisition - - Returns: - - l1_analysis : Nipype WorkFlow - """ - # Infosource Node - To iterate on subjects - infosource = Node(IdentityInterface(fields = ['subject_id']), name = 'infosource') - infosource.iterables = [('subject_id', subject_list)] - - # Templates to select files node - func_file = opj('derivatives', 'fmriprep', 'sub-{subject_id}', 'func', - 'sub-{subject_id}_task-MGT_run-*_bold_space-MNI152NLin2009cAsym_preproc.nii.gz') - - event_file = opj('sub-{subject_id}', 'func', - 'sub-{subject_id}_task-MGT_run-*_events.tsv') - - template = {'event' : event_file, 'func' : func_file} - - # SelectFiles node - to select necessary files - selectfiles = Node(SelectFiles(template, base_directory=exp_dir), name = 'selectfiles') - - # DataSink Node - store the wanted results in the wanted repository - datasink = Node(DataSink(base_directory=result_dir, container=output_dir), name='datasink') - - # GUNZIP NODE : SPM do not use .nii.gz files - gunzip_func = MapNode(Gunzip(), name = 'gunzip_func', iterfield = ['in_file']) - - ## Smoothing node - smooth = Node(Smooth(fwhm = fwhm), name = 'smooth') - - # Get Subject Info - get subject specific condition information - subject_infos_gain = Node(Function(input_names=['event_files'], - output_names=['subject_info'], - function=get_subject_infos_gain), - name='subject_infos_gain') - - subject_infos_loss = Node(Function(input_names=['event_files'], - output_names=['subject_info'], - function=get_subject_infos_loss), - name='subject_infos_loss') - - - # SpecifyModel - Generates SPM-specific Model - specify_model_gain = Node(SpecifySPMModel(concatenate_runs = True, input_units = 'secs', output_units = 'secs', - time_repetition = TR, high_pass_filter_cutoff = 128), - name='specify_model_gain') - - specify_model_loss = Node(SpecifySPMModel(concatenate_runs = True, input_units = 'secs', output_units = 'secs', - time_repetition = TR, high_pass_filter_cutoff = 128), - name='specify_model_loss') - - # Level1Design - Generates an SPM design matrix - l1_design_gain = Node(Level1Design(bases = {'hrf': {'derivs': [0, 0]}}, timing_units = 'secs', - interscan_interval = TR), name='l1_design_gain') - - l1_design_loss = Node(Level1Design(bases = {'hrf': {'derivs': [0, 0]}}, timing_units = 'secs', - interscan_interval = TR), name='l1_design_loss') - - # EstimateModel - estimate the parameters of the model - l1_estimate_gain = Node(EstimateModel(estimation_method={'Classical': 1}), - name="l1_estimate_gain") - - l1_estimate_loss = Node(EstimateModel(estimation_method={'Classical': 1}), - name="l1_estimate_loss") - - # Node contrasts to get contrasts - contrasts_gain = Node(Function(function=get_contrasts_gain, - input_names=['subject_id'], - output_names=['contrasts']), - name='contrasts_gain') - - contrasts_loss = Node(Function(function=get_contrasts_loss, - input_names=['subject_id'], - output_names=['contrasts']), - name='contrasts_loss') - - # EstimateContrast - estimates contrasts - contrast_estimate_gain = Node(EstimateContrast(), name="contrast_estimate_gain") - - contrast_estimate_loss = Node(EstimateContrast(), name="contrast_estimate_loss") - - remove_gunzip_files = Node(Function(input_names = ['files', 'subject_id', 'result_dir', 'working_dir'], - output_names = ['files'], - function = rm_gunzip_files), name = 'remove_gunzip_files') - - remove_gunzip_files.inputs.result_dir = result_dir - remove_gunzip_files.inputs.working_dir = working_dir - - remove_smoothed_files = Node(Function(input_names = ['files', 'subject_id', 'result_dir', 'working_dir'], - output_names = ['files'], - function = rm_smoothed_files), name = 'remove_smoothed_files') - - remove_smoothed_files.inputs.result_dir = result_dir - remove_smoothed_files.inputs.working_dir = working_dir - - # Create l1 analysis workflow and connect its nodes - l1_analysis = Workflow(base_dir = opj(result_dir, working_dir), name = "l1_analysis") - - l1_analysis.connect([(infosource, selectfiles, [('subject_id', 'subject_id')]), - (selectfiles, subject_infos_gain, [('event','event_files')]), - (selectfiles, subject_infos_loss, [('event','event_files')]), - (infosource, contrasts_gain, [('subject_id', 'subject_id')]), - (infosource, contrasts_loss, [('subject_id', 'subject_id')]), - (infosource, remove_gunzip_files, [('subject_id', 'subject_id')]), - (infosource, remove_smoothed_files, [('subject_id', 'subject_id')]), - (subject_infos_gain, specify_model_gain, [('subject_info', 'subject_info')]), - (subject_infos_loss, specify_model_loss, [('subject_info', 'subject_info')]), - (contrasts_gain, contrast_estimate_gain, [('contrasts', 'contrasts')]), - (contrasts_loss, contrast_estimate_loss, [('contrasts', 'contrasts')]), - (selectfiles, gunzip_func, [('func', 'in_file')]), - (gunzip_func, smooth, [('out_file', 'in_files')]), - (smooth, remove_gunzip_files, [('smoothed_files', 'files')]), - (remove_gunzip_files, specify_model_gain, [('files', 'functional_runs')]), - (remove_gunzip_files, specify_model_loss, [('files', 'functional_runs')]), - (specify_model_gain, l1_design_gain, [('session_info', 'session_info')]), - (specify_model_loss, l1_design_loss, [('session_info', 'session_info')]), - (l1_design_gain, l1_estimate_gain, [('spm_mat_file', 'spm_mat_file')]), - (l1_design_loss, l1_estimate_loss, [('spm_mat_file', 'spm_mat_file')]), - (l1_estimate_gain, contrast_estimate_gain, [('spm_mat_file', 'spm_mat_file'), - ('beta_images', 'beta_images'), - ('residual_image', 'residual_image')]), - (l1_estimate_loss, contrast_estimate_loss, [('spm_mat_file', 'spm_mat_file'), - ('beta_images', 'beta_images'), - ('residual_image', 'residual_image')]), - (contrast_estimate_gain, datasink, [('con_images', 'l1_analysis_gain.@con_images'), - ('spmT_images', 'l1_analysis_gain.@spmT_images'), - ('spm_mat_file', 'l1_analysis_gain.@spm_mat_file')]), - (contrast_estimate_loss, datasink, [('con_images', 'l1_analysis_loss.@con_images'), - ('spmT_images', 'l1_analysis_loss.@spmT_images'), - ('spm_mat_file', 'l1_analysis_loss.@spm_mat_file')]), - (contrast_estimate_gain, remove_smoothed_files, [('spmT_images', 'files')]) - ]) - - return l1_analysis - - -def get_subset_contrasts(file_list, method, subject_list, participants_file): - ''' - Parameters : - - file_list : original file list selected by selectfiles node - - subject_list : list of subject IDs that are in the wanted group for the analysis - - participants_file: str, file containing participants characteristics - - method: str, one of "equalRange", "equalIndifference" or "groupComp" - - This function return the file list containing only the files belonging to subject in the wanted group. - ''' - equalIndifference_id = [] - equalRange_id = [] - equalIndifference_files = [] - equalRange_files = [] - - with open(participants_file, 'rt') as f: - next(f) # skip the header - - for line in f: - info = line.strip().split() - - if info[0][-3:] in subject_list and info[1] == "equalIndifference": - equalIndifference_id.append(info[0][-3:]) - elif info[0][-3:] in subject_list and info[1] == "equalRange": - equalRange_id.append(info[0][-3:]) - - for file in file_list: - sub_id = file.split('/') - if sub_id[-2][-3:] in equalIndifference_id: - equalIndifference_files.append(file) - elif sub_id[-2][-3:] in equalRange_id: - equalRange_files.append(file) - - return equalIndifference_id, equalRange_id, equalIndifference_files, equalRange_files - - -def get_l2_analysis(subject_list, n_sub, model_list, contrast_list, method, exp_dir, result_dir, working_dir, output_dir): - """ - Returns the 2nd level of analysis workflow. - - Parameters: - - exp_dir: str, directory where raw data are stored - - result_dir: str, directory where results will be stored - - working_dir: str, name of the sub-directory for intermediate results - - output_dir: str, name of the sub-directory for final results - - subject_list: list of str, list of subject for which you want to do the preprocessing - - model_list: list of str, list of models to use for the analysis - - contrast_list: list of str, list of contrasts to analyze - - n_sub: float, number of subjects used to do the analysis - - method: one of "equalRange", "equalIndifference" or "groupComp" - - Returns: - - l2_analysis: Nipype WorkFlow - """ - # Infosource - a function free node to iterate over the list of subject names - infosource_groupanalysis = Node(IdentityInterface(fields=['model_type','contrast_id', 'subjects'], - subjects = subject_list), - name="infosource_groupanalysis") - - infosource_groupanalysis.iterables = [('model_type', model_list), ('contrast_id', contrast_list)] - - # SelectFiles - contrast_file = opj(result_dir, output_dir, "l1_analysis_{model_type}", "_subject_id_*", "con_000{contrast_id}.nii") - - participants_file = opj(exp_dir, 'participants.tsv') - - templates = {'contrast' : contrast_file, 'participants' : participants_file} - - selectfiles_groupanalysis = Node(SelectFiles(templates, base_directory=result_dir, force_list= True), - name="selectfiles_groupanalysis") - - # Datasink node : to save important files - datasink_groupanalysis = Node(DataSink(base_directory = result_dir, container = output_dir), - name = 'datasink_groupanalysis') - - # Node to select subset of contrasts - sub_contrasts = Node(Function(input_names = ['file_list', 'method', 'subject_list', 'participants_file'], - output_names = ['equalIndifference_id', 'equalRange_id', 'equalIndifference_files', 'equalRange_files'], - function = get_subset_contrasts), - name = 'sub_contrasts') - - sub_contrasts.inputs.method = method - - ## Estimate model - estimate_model = Node(EstimateModel(estimation_method={'Classical':1}), name = "estimate_model") - - ## Estimate contrasts - estimate_contrast = Node(EstimateContrast(group_contrast=True), - name = "estimate_contrast") - - ## Create thresholded maps - threshold = MapNode(Threshold(contrast_index=1, - use_topo_fdr=True, - use_fwe_correction=False, - extent_threshold=0, - height_threshold=0.001, - height_threshold_type='p-value'), name = "threshold", - iterfield = ["stat_image", "contrast_index"]) - - - l2_analysis = Workflow(base_dir = opj(result_dir, working_dir), name = 'l2_analysis') - - l2_analysis.connect([(infosource_groupanalysis, selectfiles_groupanalysis, [('contrast_id', 'contrast_id'), - ('model_type', 'model_type')]), - (infosource_groupanalysis, sub_contrasts, [('subjects', 'subject_list')]), - (selectfiles_groupanalysis, sub_contrasts, [('contrast', 'file_list'), ('participants', 'participants_file')]), - (estimate_model, estimate_contrast, [('spm_mat_file', 'spm_mat_file'), - ('residual_image', 'residual_image'), - ('beta_images', 'beta_images')]), - (estimate_contrast, threshold, [('spm_mat_file', 'spm_mat_file'), - ('spmT_images', 'stat_image')]), - (estimate_model, datasink_groupanalysis, [('mask_image', f"l2_analysis_{method}_nsub_{n_sub}.@mask")]), - (estimate_contrast, datasink_groupanalysis, [('spm_mat_file', f"l2_analysis_{method}_nsub_{n_sub}.@spm_mat"), - ('spmT_images', f"l2_analysis_{method}_nsub_{n_sub}.@T"), - ('con_images', f"l2_analysis_{method}_nsub_{n_sub}.@con")]), - (threshold, datasink_groupanalysis, [('thresholded_map', f"l2_analysis_{method}_nsub_{n_sub}.@thresh")])]) - - if method=='equalRange' or method=='equalIndifference': - contrasts = [('Group', 'T', ['mean'], [1]), ('Group', 'T', ['mean'], [-1])] - ## Specify design matrix - one_sample_t_test_design = Node(OneSampleTTestDesign(), name = "one_sample_t_test_design") - - l2_analysis.connect([(sub_contrasts, one_sample_t_test_design, [(f"{method}_files", 'in_files')]), - (one_sample_t_test_design, estimate_model, [('spm_mat_file', 'spm_mat_file')])]) - - threshold.inputs.contrast_index = [1, 2] - threshold.synchronize = True - - elif method == 'groupComp': - contrasts = [('Eq range vs Eq indiff in loss', 'T', ['Group_{1}', 'Group_{2}'], [1, -1])] - # Node for the design matrix - two_sample_t_test_design = Node(TwoSampleTTestDesign(), name = 'two_sample_t_test_design') - l2_analysis.connect([(sub_contrasts, two_sample_t_test_design, [('equalRange_files', "group1_files"), - ('equalIndifference_files', 'group2_files')]), - (two_sample_t_test_design, estimate_model, [("spm_mat_file", "spm_mat_file")])]) - - threshold.inputs.contrast_index = [1] +from narps_open.pipelines import Pipeline +from narps_open.data.task import TaskInformation +from narps_open.data.participants import get_group +from narps_open.core.common import remove_file, list_intersection, elements_in_string, clean_list + +class PipelineTeamC88N(Pipeline): + """ A class that defines the pipeline of team C88N. """ + + def __init__(self): + super().__init__() + self.fwhm = 8.0 + self.team_id = 'C88N' + self.model_list = ['gain', 'loss'] + self.subject_level_contrasts_gain = [ + ['effect_of_gain', 'T', ['trialxloss^1', 'trialxgain^1'], [0, 1]] + ] + self.subject_level_contrasts_loss = [ + ['positive_effect_of_loss', 'T', ['trialxgain^1', 'trialxloss^1'], [0, 1]], + ['negative_effect_of_loss', 'T', ['trialxgain^1', 'trialxloss^1'], [0, -1]] + ] + + def get_preprocessing(self): + """ No preprocessing has been done by team C88N """ + return None + + def get_run_level_analysis(self): + """ No run level analysis has been done by team C88N """ + return None + + # @staticmethod # Starting python 3.10, staticmethod should be used here + # Otherwise it produces a TypeError: 'staticmethod' object is not callable + def get_subject_information(event_files: list, model: str): + """ Create Bunchs for SpecifySPMModel. + + Parameters : + - event_files: list of str, list of events files (one per run) for the subject + - model: str, either 'gain' or 'loss' + + Returns : + - subject_information : list of Bunch for 1st level analysis. + """ + from nipype.interfaces.base import Bunch + + subject_information = [] + + # Create on Bunch per run + for event_file in event_files: + + # Create empty lists + onsets = [] + durations = [] + weights_gain = [] + weights_loss = [] + + # Parse event file + with open(event_file, 'rt') as file: + next(file) # skip the header + + for line in file: + info = line.strip().split() + if 'NoResp' not in info[5]: + onsets.append(float(info[0])) + durations.append(0.0) + weights_gain.append(float(info[2])) + weights_loss.append(float(info[3])) + + # Create Bunch + if model == 'gain': + parametric_modulation_bunch = Bunch( + name = ['loss', 'gain'], + poly = [1, 1], + param = [weights_loss, weights_gain] + ) + elif model == 'loss': + parametric_modulation_bunch = Bunch( + name = ['gain', 'loss'], + poly = [1, 1], + param = [weights_gain, weights_loss] + ) + else: + raise AttributeError + + subject_information.append( + Bunch( + conditions = ['trial'], + onsets = [onsets], + durations = [durations], + amplitudes = None, + tmod = None, + pmod = [parametric_modulation_bunch], + regressor_names = None, + regressors = None) + ) + + return subject_information + + def get_subject_level_analysis(self): + """ + Create the subject level analysis workflow. + + Returns: + - subject_level_analysis : nipype.WorkFlow + """ + # Infosource Node - To iterate on subjects + infosource = Node(IdentityInterface( + fields = ['subject_id']), + name = 'infosource') + infosource.iterables = [('subject_id', self.subject_list)] + + # Templates to select files + template = { + # Functional MRI + 'func' : join('derivatives', 'fmriprep', 'sub-{subject_id}', 'func', + 'sub-{subject_id}_task-MGT_run-*_bold_space-MNI152NLin2009cAsym_preproc.nii.gz'), + + # Event file + 'event' : join('sub-{subject_id}', 'func', + 'sub-{subject_id}_task-MGT_run-*_events.tsv') + } + + # SelectFiles - to select necessary files + select_files = Node(SelectFiles(template), name = 'select_files') + select_files.inputs.base_directory = self.directories.dataset_dir + + # DataSink - store the wanted results in the wanted repository + data_sink = Node(DataSink(), name = 'data_sink') + data_sink.inputs.base_directory = self.directories.output_dir + + # Gunzip - gunzip files because SPM do not use .nii.gz files + gunzip_func = MapNode(Gunzip(), + name = 'gunzip_func', + iterfield = ['in_file']) + + # Smoothing - smoothing node + smoothing = Node(Smooth(), name = 'smoothing') + smoothing.inputs.fwhm = self.fwhm + + # Function node get_subject_information - get subject specific condition information + subject_infos_gain = Node(Function( + function = self.get_subject_information, + input_names = ['event_files', 'model'], + output_names = ['subject_info'] + ), + name = 'subject_infos_gain') + subject_infos_gain.inputs.model = 'gain' + + subject_infos_loss = Node(Function( + function = self.get_subject_information, + input_names = ['event_files', 'model'], + output_names = ['subject_info'] + ), + name = 'subject_infos_loss') + subject_infos_loss.inputs.model = 'loss' + + # SpecifyModel - Generates SPM-specific Model + specify_model_gain = Node(SpecifySPMModel(), name = 'specify_model_gain') + specify_model_gain.inputs.concatenate_runs = True + specify_model_gain.inputs.input_units = 'secs' + specify_model_gain.inputs.output_units = 'secs' + specify_model_gain.inputs.time_repetition = TaskInformation()['RepetitionTime'] + specify_model_gain.inputs.high_pass_filter_cutoff = 128 + + specify_model_loss = Node(SpecifySPMModel(), name = 'specify_model_loss') + specify_model_loss.inputs.concatenate_runs = True + specify_model_loss.inputs.input_units = 'secs' + specify_model_loss.inputs.output_units = 'secs' + specify_model_loss.inputs.time_repetition = TaskInformation()['RepetitionTime'] + specify_model_loss.inputs.high_pass_filter_cutoff = 128 + + # Level1Design - Generates an SPM design matrix + model_design_gain = Node(Level1Design(), name = 'model_design_gain') + model_design_gain.inputs.bases = {'hrf': {'derivs': [0, 0]}} + model_design_gain.inputs.timing_units = 'secs' + model_design_gain.inputs.interscan_interval = TaskInformation()['RepetitionTime'] + + model_design_loss = Node(Level1Design(), name = 'model_design_loss') + model_design_loss.inputs.bases = {'hrf': {'derivs': [0, 0]}} + model_design_loss.inputs.timing_units = 'secs' + model_design_loss.inputs.interscan_interval = TaskInformation()['RepetitionTime'] + + # EstimateModel - estimate the parameters of the model + model_estimate_gain = Node(EstimateModel(), name = 'model_estimate_gain') + model_estimate_gain.inputs.estimation_method = {'Classical': 1} + + model_estimate_loss = Node(EstimateModel(), name = 'model_estimate_loss') + model_estimate_loss.inputs.estimation_method = {'Classical': 1} + + # EstimateContrast - estimates contrasts + contrast_estimate_gain = Node(EstimateContrast(), name = 'contrast_estimate_gain') + contrast_estimate_gain.inputs.contrasts = self.subject_level_contrasts_gain + + contrast_estimate_loss = Node(EstimateContrast(), name = 'contrast_estimate_loss') + contrast_estimate_loss.inputs.contrasts = self.subject_level_contrasts_loss + + # Function node remove_gunzip_files - remove output of the gunzip node + remove_gunzip_files = MapNode(Function( + function = remove_file, + input_names = ['_', 'file_name'], + output_names = []), + name = 'remove_gunzip_files', iterfield = 'file_name') + + # Function node remove_smoothed_files - remove output of the smoothing node + remove_smoothed_files = MapNode(Function( + function = remove_file, + input_names = ['_', 'file_name'], + output_names = []), + name = 'remove_smoothed_files', iterfield = 'file_name') + + # Create l1 analysis workflow and connect its nodes + subject_level_analysis = Workflow( + base_dir = self.directories.working_dir, name = 'subject_level_analysis' + ) + subject_level_analysis.connect([ + (infosource, select_files, [('subject_id', 'subject_id')]), + (select_files, subject_infos_gain, [('event','event_files')]), + (select_files, subject_infos_loss, [('event','event_files')]), + (subject_infos_gain, specify_model_gain, [('subject_info', 'subject_info')]), + (subject_infos_loss, specify_model_loss, [('subject_info', 'subject_info')]), + (select_files, gunzip_func, [('func', 'in_file')]), + (gunzip_func, smoothing, [('out_file', 'in_files')]), + (gunzip_func, remove_gunzip_files, [('out_file', 'file_name')]), + (smoothing, remove_gunzip_files, [('smoothed_files', '_')]), + (smoothing, specify_model_gain, [('smoothed_files', 'functional_runs')]), + (smoothing, specify_model_loss, [('smoothed_files', 'functional_runs')]), + (smoothing, remove_smoothed_files, [('smoothed_files', 'file_name')]), + (specify_model_gain, model_design_gain, [('session_info', 'session_info')]), + (specify_model_loss, model_design_loss, [('session_info', 'session_info')]), + (model_design_gain, model_estimate_gain, [('spm_mat_file', 'spm_mat_file')]), + (model_design_loss, model_estimate_loss, [('spm_mat_file', 'spm_mat_file')]), + (model_estimate_gain, contrast_estimate_gain, [ + ('spm_mat_file', 'spm_mat_file'), + ('beta_images', 'beta_images'), + ('residual_image', 'residual_image')]), + (model_estimate_loss, contrast_estimate_loss, [ + ('spm_mat_file', 'spm_mat_file'), + ('beta_images', 'beta_images'), + ('residual_image', 'residual_image')]), + (contrast_estimate_gain, data_sink, [ + ('con_images', 'subject_level_analysis_gain.@con_images'), + ('spmT_images', 'subject_level_analysis_gain.@spmT_images'), + ('spm_mat_file', 'subject_level_analysis_gain.@spm_mat_file')]), + (contrast_estimate_loss, data_sink, [ + ('con_images', 'subject_level_analysis_loss.@con_images'), + ('spmT_images', 'subject_level_analysis_loss.@spmT_images'), + ('spm_mat_file', 'subject_level_analysis_loss.@spm_mat_file')]), + (contrast_estimate_gain, remove_smoothed_files, [('spmT_images', '_')]) + ]) + + return subject_level_analysis + + def get_subject_level_outputs(self): + """ Return the names of the files the subject level analysis is supposed to generate. """ + + # Handle gain files + templates = [join( + self.directories.output_dir, + 'subject_level_analysis_gain', '_subject_id_{subject_id}', 'con_0001.nii')] + templates += [join( + self.directories.output_dir, + 'subject_level_analysis_gain', '_subject_id_{subject_id}', 'SPM.mat')] + templates += [join( + self.directories.output_dir, + 'subject_level_analysis_gain', '_subject_id_{subject_id}', 'spmT_0001.nii')] + + # Handle loss files + contrast_list = ['0001', '0002'] + templates += [join( + self.directories.output_dir, + 'subject_level_analysis_loss', '_subject_id_{subject_id}', f'con_{contrast_id}.nii')\ + for contrast_id in contrast_list] + templates += [join( + self.directories.output_dir, + 'subject_level_analysis_loss', '_subject_id_{subject_id}', 'SPM.mat')] + templates += [join( + self.directories.output_dir, + 'subject_level_analysis_loss', '_subject_id_{subject_id}', f'spmT_{contrast_id}.nii')\ + for contrast_id in contrast_list] + + # Format with subject_ids + return_list = [] + for template in templates: + return_list += [template.format(subject_id = s) for s in self.subject_list] + + return return_list + + def get_group_level_analysis(self): + """ + Return all workflows for the group level analysis. + + Returns; + - a list of nipype.WorkFlow + """ + return_list = [] + + self.model_list = ['gain', 'loss'] + self.contrast_list = ['0001'] + return_list.append(self.get_group_level_analysis_sub_workflow('equalRange')) + return_list.append(self.get_group_level_analysis_sub_workflow('equalIndifference')) + + self.model_list = ['loss'] + self.contrast_list = ['0001'] + return_list.append(self.get_group_level_analysis_sub_workflow('groupComp')) + + self.model_list = ['loss'] + self.contrast_list = ['0002'] + return_list.append(self.get_group_level_analysis_sub_workflow('equalRange')) + return_list.append(self.get_group_level_analysis_sub_workflow('equalIndifference')) + + return return_list + + def get_group_level_analysis_sub_workflow(self, method): + """ + Return a workflow for the group level analysis. + + Parameters: + - method: one of 'equalRange', 'equalIndifference' or 'groupComp' + + Returns: + - group_level_analysis: nipype.WorkFlow + """ + # Compute the number of participants used to do the analysis + nb_subjects = len(self.subject_list) + + # Infosource - iterate over the list of contrasts + information_source = Node(IdentityInterface( + fields = ['model_type', 'contrast_id']), + name = 'information_source') + information_source.iterables = [ + ('model_type', self.model_list), + ('contrast_id', self.contrast_list) + ] + + # SelectFiles Node + templates = { + # Contrast files for all participants + 'contrasts' : join(self.directories.output_dir, + 'subject_level_analysis_{model_type}', '_subject_id_*', 'con_{contrast_id}.nii' + ) + } + select_files = Node(SelectFiles(templates), name = 'select_files') + select_files.inputs.base_directory = self.directories.dataset_dir + select_files.inputs.force_list = True + + # Datasink - save important files + data_sink = Node(DataSink(), name = 'data_sink') + data_sink.inputs.base_directory = self.directories.output_dir + + # Function Node get_equal_range_subjects + # Get subjects in the equalRange group and in the subject_list + get_equal_range_subjects = Node(Function( + function = list_intersection, + input_names = ['list_1', 'list_2'], + output_names = ['out_list'] + ), + name = 'get_equal_range_subjects' + ) + get_equal_range_subjects.inputs.list_1 = get_group('equalRange') + get_equal_range_subjects.inputs.list_2 = self.subject_list + + # Function Node get_equal_indifference_subjects + # Get subjects in the equalIndifference group and in the subject_list + get_equal_indifference_subjects = Node(Function( + function = list_intersection, + input_names = ['list_1', 'list_2'], + output_names = ['out_list'] + ), + name = 'get_equal_indifference_subjects' + ) + get_equal_indifference_subjects.inputs.list_1 = get_group('equalIndifference') + get_equal_indifference_subjects.inputs.list_2 = self.subject_list + + # Create a function to complete the subject ids out from the get_equal_*_subjects nodes + # If not complete, subject id '001' in search patterns + # would match all contrast files with 'con_0001.nii'. + complete_subject_ids = lambda l : [f'_subject_id_{a}' for a in l] + + # Function Node elements_in_string + # Get contrast files for required subjects + # Note : using a MapNode with elements_in_string requires using clean_list to remove + # None values from the out_list + get_contrasts = MapNode(Function( + function = elements_in_string, + input_names = ['input_str', 'elements'], + output_names = ['out_list'] + ), + name = 'get_contrasts', iterfield = 'input_str' + ) + + # Estimate model + estimate_model = Node(EstimateModel(), name = 'estimate_model') + estimate_model.inputs.estimation_method = {'Classical':1} + + # Estimate contrasts + estimate_contrast = Node(EstimateContrast(), name = 'estimate_contrast') + estimate_contrast.inputs.group_contrast = True + + # Create thresholded maps + threshold = MapNode(Threshold(), name = 'threshold', + iterfield = ['stat_image', 'contrast_index']) + threshold.inputs.contrast_index = 1 + threshold.inputs.use_topo_fdr = True + threshold.inputs.use_fwe_correction = False + threshold.inputs.extent_threshold = 0 + threshold.inputs.height_threshold = 0.001 + threshold.inputs.height_threshold_type = 'p-value' threshold.synchronize = True - estimate_contrast.inputs.contrasts = contrasts - - return l2_analysis - - -def reorganize_results(result_dir, output_dir, n_sub, team_ID): - """ - Reorganize the results to analyze them. - - Parameters: - - result_dir: str, directory where results will be stored - - output_dir: str, name of the sub-directory for final results - - n_sub: float, number of subject used for the analysis - - team_ID: str, ID of the team to reorganize results - """ - from os.path import join as opj - import os - import shutil - import gzip - - h1 = opj(result_dir, output_dir, f"l2_analysis_equalIndifference_nsub_{n_sub}", '_contrast_id_1_model_type_gain') - h2 = opj(result_dir, output_dir, f"l2_analysis_equalRange_nsub_{n_sub}", '_contrast_id_1_model_type_gain') - h3 = opj(result_dir, output_dir, f"l2_analysis_equalIndifference_nsub_{n_sub}", '_contrast_id_1_model_type_gain') - h4 = opj(result_dir, output_dir, f"l2_analysis_equalRange_nsub_{n_sub}", '_contrast_id_1_model_type_gain') - h5 = opj(result_dir, output_dir, f"l2_analysis_equalIndifference_nsub_{n_sub}", '_contrast_id_1_model_type_loss') - h6 = opj(result_dir, output_dir, f"l2_analysis_equalRange_nsub_{n_sub}", '_contrast_id_1_model_type_loss') - h7 = opj(result_dir, output_dir, f"l2_analysis_equalIndifference_nsub_{n_sub}", '_contrast_id_1_model_type_loss') - h8 = opj(result_dir, output_dir, f"l2_analysis_equalRange_nsub_{n_sub}", '_contrast_id_1_model_type_loss') - h9 = opj(result_dir, output_dir, f"l2_analysis_groupComp_nsub_{n_sub}", '_contrast_id_1_model_type_loss') - - h = [h1, h2, h3, h4, h5, h6, h7, h8, h9] - h_gain = [h1, h2, h3, h4] - h_neg_loss = [h5, h6] - h_pos_loss = [h7, h8, h9] - - repro_unthresh = [opj(filename, "spmT_0002.nii") if i in [4,5] else opj(filename, "spmT_0001.nii") for i, filename in enumerate(h)] - - repro_thresh = [opj(filename, '_threshold1', "spmT_0002_thr.nii") if i in [4,5] else opj(filename, '_threshold0', "spmT_0001_thr.nii") for i, filename in enumerate(h)] - - if not os.path.isdir(opj(result_dir, "NARPS-reproduction")): - os.mkdir(opj(result_dir, "NARPS-reproduction")) - - for i, filename in enumerate(repro_unthresh): - f_in = filename - f_out = opj(result_dir, "NARPS-reproduction", f"team_{team_ID}_nsub_{n_sub}_hypo{i+1}_unthresholded.nii") - shutil.copyfile(f_in, f_out) - - for i, filename in enumerate(repro_thresh): - f_in = filename - f_out = opj(result_dir, "NARPS-reproduction", f"team_{team_ID}_nsub_{n_sub}_hypo{i+1}_thresholded.nii") - shutil.copyfile(f_in, f_out) - - print(f"Results files of team {team_ID} reorganized.") \ No newline at end of file + group_level_analysis = Workflow( + base_dir = self.directories.working_dir, + name = f'group_level_analysis_{method}_nsub_{nb_subjects}') + group_level_analysis.connect([ + (information_source, select_files, [ + ('contrast_id', 'contrast_id'), + ('model_type', 'model_type')]), + (select_files, get_contrasts, [('contrasts', 'input_str')]), + (estimate_model, estimate_contrast, [ + ('spm_mat_file', 'spm_mat_file'), + ('residual_image', 'residual_image'), + ('beta_images', 'beta_images')]), + (estimate_contrast, threshold, [ + ('spm_mat_file', 'spm_mat_file'), + ('spmT_images', 'stat_image')]), + (estimate_model, data_sink, [ + ('mask_image', f'group_level_analysis_{method}_nsub_{nb_subjects}.@mask')]), + (estimate_contrast, data_sink, [ + ('spm_mat_file', f'group_level_analysis_{method}_nsub_{nb_subjects}.@spm_mat'), + ('spmT_images', f'group_level_analysis_{method}_nsub_{nb_subjects}.@T'), + ('con_images', f'group_level_analysis_{method}_nsub_{nb_subjects}.@con')]), + (threshold, data_sink, [ + ('thresholded_map', f'group_level_analysis_{method}_nsub_{nb_subjects}.@thresh')])]) + + if method in ('equalRange', 'equalIndifference'): + estimate_contrast.inputs.contrasts = [ + ('Group', 'T', ['mean'], [1]), + ('Group', 'T', ['mean'], [-1]) + ] + threshold.inputs.contrast_index = [1, 2] + + # Specify design matrix + one_sample_t_test_design = Node(OneSampleTTestDesign(), + name = 'one_sample_t_test_design') + group_level_analysis.connect([ + (get_contrasts, one_sample_t_test_design, [ + (('out_list', clean_list), 'in_files') + ]), + (one_sample_t_test_design, estimate_model, [('spm_mat_file', 'spm_mat_file')]) + ]) + + if method == 'equalRange': + group_level_analysis.connect([ + (get_equal_range_subjects, get_contrasts, [ + (('out_list', complete_subject_ids), 'elements') + ]) + ]) + + elif method == 'equalIndifference': + group_level_analysis.connect([ + (get_equal_indifference_subjects, get_contrasts, [ + (('out_list', complete_subject_ids), 'elements') + ]) + ]) + + elif method == 'groupComp': + estimate_contrast.inputs.contrasts = [ + ('Eq range vs Eq indiff in loss', 'T', ['Group_{1}', 'Group_{2}'], [1, -1]) + ] + threshold.inputs.contrast_index = [1] + + # Function Node elements_in_string + # Get contrast files for required subjects + # Note : using a MapNode with elements_in_string requires using clean_list to remove + # None values from the out_list + get_contrasts_2 = MapNode(Function( + function = elements_in_string, + input_names = ['input_str', 'elements'], + output_names = ['out_list'] + ), + name = 'get_contrasts_2', iterfield = 'input_str' + ) + + # Specify design matrix + two_sample_t_test_design = Node(TwoSampleTTestDesign(), + name = 'two_sample_t_test_design') + + group_level_analysis.connect([ + (select_files, get_contrasts_2, [('contrasts', 'input_str')]), + (get_equal_range_subjects, get_contrasts, [ + (('out_list', complete_subject_ids), 'elements') + ]), + (get_equal_indifference_subjects, get_contrasts_2, [ + (('out_list', complete_subject_ids), 'elements') + ]), + (get_contrasts, two_sample_t_test_design, [ + (('out_list', clean_list), 'group1_files') + ]), + (get_contrasts_2, two_sample_t_test_design, [ + (('out_list', clean_list), 'group2_files') + ]), + (two_sample_t_test_design, estimate_model, [('spm_mat_file', 'spm_mat_file')]) + ]) + + return group_level_analysis + + def get_group_level_outputs(self): + """ Return all names for the files the group level analysis is supposed to generate. """ + + # Handle equalRange and equalIndifference + + ## Contrast id 0001 + parameters = { + 'method': ['equalRange', 'equalIndifference'], + 'file': [ + 'con_0001.nii', 'con_0002.nii', 'mask.nii', 'SPM.mat', + 'spmT_0001.nii', 'spmT_0002.nii', + join('_threshold0', 'spmT_0001_thr.nii'), join('_threshold1', 'spmT_0002_thr.nii') + ], + 'model_type' : ['gain', 'loss'], + 'nb_subjects' : [str(len(self.subject_list))] + } + + parameter_sets = product(*parameters.values()) + template = join( + self.directories.output_dir, + 'group_level_analysis_{method}_nsub_{nb_subjects}', + '_contrast_id_0001_model_type_{model_type}', + '{file}' + ) + + return_list = [template.format(**dict(zip(parameters.keys(), parameter_values)))\ + for parameter_values in parameter_sets] + + ## Contrast id 0002 + parameters = { + 'method': ['equalRange', 'equalIndifference'], + 'file': [ + 'con_0001.nii', 'con_0002.nii', 'mask.nii', 'SPM.mat', + 'spmT_0001.nii', 'spmT_0002.nii', + join('_threshold0', 'spmT_0001_thr.nii'), join('_threshold1', 'spmT_0002_thr.nii') + ], + 'nb_subjects' : [str(len(self.subject_list))] + } + + parameter_sets = product(*parameters.values()) + template = join( + self.directories.output_dir, + 'group_level_analysis_{method}_nsub_{nb_subjects}', + '_contrast_id_0002_model_type_loss', + '{file}' + ) + + return_list += [template.format(**dict(zip(parameters.keys(), parameter_values)))\ + for parameter_values in parameter_sets] + + # Handle groupComp + parameters = { + 'method': ['groupComp'], + 'file': [ + 'con_0001.nii', 'mask.nii', 'SPM.mat', 'spmT_0001.nii', + join('_threshold0', 'spmT_0001_thr.nii') + ], + 'nb_subjects' : [str(len(self.subject_list))] + } + parameter_sets = product(*parameters.values()) + template = join( + self.directories.output_dir, + 'group_level_analysis_{method}_nsub_{nb_subjects}', + '_contrast_id_0001_model_type_loss', + '{file}' + ) + + return_list += [template.format(**dict(zip(parameters.keys(), parameter_values)))\ + for parameter_values in parameter_sets] + + return return_list + + def get_hypotheses_outputs(self): + """ Return all hypotheses output file names. """ + nb_sub = len(self.subject_list) + files = [ + # Hypothesis 1 + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_0001_model_type_gain', '_threshold0', 'spmT_0001_thr.nii'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_0001_model_type_gain', 'spmT_0001.nii'), + # Hypothesis 2 + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_0001_model_type_gain', '_threshold0', 'spmT_0001_thr.nii'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_0001_model_type_gain', 'spmT_0001.nii'), + # Hypothesis 3 + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_0001_model_type_gain', '_threshold0', 'spmT_0001_thr.nii'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_0001_model_type_gain', 'spmT_0001.nii'), + # Hypothesis 4 + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_0001_model_type_gain', '_threshold0', 'spmT_0001_thr.nii'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_0001_model_type_gain', 'spmT_0001.nii'), + # Hypothesis 5 + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_0001_model_type_loss', '_threshold1', 'spmT_0002_thr.nii'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_0001_model_type_loss', 'spmT_0002.nii'), + # Hypothesis 6 + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_0001_model_type_loss', '_threshold1', 'spmT_0002_thr.nii'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_0001_model_type_loss', 'spmT_0002.nii'), + # Hypothesis 7 + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_0001_model_type_loss', '_threshold0', 'spmT_0001_thr.nii'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_0001_model_type_loss', 'spmT_0001.nii'), + # Hypothesis 8 + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_0001_model_type_loss', '_threshold0', 'spmT_0001_thr.nii'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_0001_model_type_loss', 'spmT_0001.nii'), + # Hypothesis 9 + join(f'group_level_analysis_groupComp_nsub_{nb_sub}', + '_contrast_id_0001_model_type_loss', '_threshold0', 'spmT_0001_thr.nii'), + join(f'group_level_analysis_groupComp_nsub_{nb_sub}', + '_contrast_id_0001_model_type_loss', 'spmT_0001.nii') + ] + return [join(self.directories.output_dir, f) for f in files] diff --git a/narps_open/utils/configuration/testing_config.toml b/narps_open/utils/configuration/testing_config.toml index b1fb28ba..40733c5a 100644 --- a/narps_open/utils/configuration/testing_config.toml +++ b/narps_open/utils/configuration/testing_config.toml @@ -3,9 +3,9 @@ title = "Testing configuration for the NARPS open pipelines project" config_type = "testing" [directories] -dataset = "run/data/ds001734/" +dataset = "data/original/ds001734/" reproduced_results = "run/data/reproduced/" -narps_results = "run/data/results/" +narps_results = "data/results/" test_data = "tests/test_data/" test_runs = "run/" diff --git a/tests/conftest.py b/tests/conftest.py index 7c57c1f9..42badb65 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -34,7 +34,7 @@ def test_pipeline_execution( Returns: - list(float) the correlation coefficients between the following - (reference and computed) files: + (result and reproduced) files: This function can be used as follows: results = pytest.helpers.test_pipeline('2T6S', 4) diff --git a/tests/core/test_common.py b/tests/core/test_common.py index 3e00fd1b..64c385e9 100644 --- a/tests/core/test_common.py +++ b/tests/core/test_common.py @@ -317,3 +317,66 @@ def test_connect_list_intersection(remove_test_dir): test_file_2 = join(TEMPORARY_DIR, 'test_workflow', 'node_2', '_report', 'report.rst') with open(test_file_2, 'r', encoding = 'utf-8') as file: assert f'* out_value : {output_list_2}' in file.read() + + @staticmethod + @mark.unit_test + def test_node_list_to_file_1(): + """ Test the list_to_file function as a nipype.Node """ + + # Inputs + input_list = ['001', 23.560, 'azerty', False, None] + + # Create a Nipype Node using list_to_file + test_node = Node(Function( + function = co.list_to_file, + input_names = ['input_list'], + output_names = ['out_file'] + ), name = 'test_node') + test_node.inputs.input_list = input_list + test_node.run() + + # Expected output (in the Node's working directory) + out_file = join(test_node.output_dir(), 'elements.tsv') + out_list = [str(a) for a in input_list] + + # Check file was created + assert exists(out_file) + + # Check file was created + with open(out_file, 'r', encoding = 'utf-8') as file: + for list_element, file_element in zip(out_list, file.read().split('\n')): + assert list_element == file_element + + @staticmethod + @mark.unit_test + def test_node_list_to_file_2(): + """ Test the list_to_file function as a nipype.Node + Test changing name of output file + """ + + # Inputs + input_list = ['001', 23.560, [2.0, 1, 53, True], False, None] + file_name = 'custom_filename.txt' + + # Create a Nipype Node using list_to_file + test_node = Node(Function( + function = co.list_to_file, + input_names = ['input_list', 'file_name'], + output_names = ['out_file'] + ), name = 'test_node') + test_node.inputs.input_list = input_list + test_node.inputs.file_name = file_name + test_node.run() + + # Expected output + out_file = join(test_node.output_dir(), file_name) + out_list = [str(a) for a in input_list] + + # Check file was created + assert exists(out_file) + + # Check file was created + with open(out_file, 'r', encoding = 'utf-8') as file: + for list_element, file_element in zip(out_list, file.read().split('\n')): + assert list_element == file_element + diff --git a/tests/pipelines/__init__.py b/tests/pipelines/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/pipelines/test_team_08MQ.py b/tests/pipelines/test_team_08MQ.py new file mode 100644 index 00000000..b962557f --- /dev/null +++ b/tests/pipelines/test_team_08MQ.py @@ -0,0 +1,143 @@ +#!/usr/bin/python +# coding: utf-8 + +""" Tests of the 'narps_open.pipelines.team_08MQ' module. + +Launch this test with PyTest + +Usage: +====== + pytest -q test_team_08MQ.py + pytest -q test_team_08MQ.py -k +""" +from os.path import join + +from pytest import helpers, mark +from numpy import isclose +from nipype import Workflow +from nipype.interfaces.base import Bunch + +from narps_open.utils.configuration import Configuration +from narps_open.pipelines.team_08MQ import PipelineTeam08MQ + +class TestPipelinesTeam08MQ: + """ A class that contains all the unit tests for the PipelineTeam08MQ class.""" + + @staticmethod + @mark.unit_test + def test_create(): + """ Test the creation of a PipelineTeam08MQ object """ + + pipeline = PipelineTeam08MQ() + + # 1 - check the parameters + assert pipeline.fwhm == 6.0 + assert pipeline.team_id == '08MQ' + assert pipeline.contrast_list == ['1', '2', '3'] + assert pipeline.run_level_contasts == [ + ('positive_effect_gain', 'T', ['gain', 'loss'], [1, 0]), + ('positive_effect_loss', 'T', ['gain', 'loss'], [0, 1]), + ('negative_effect_loss', 'T', ['gain', 'loss'], [0, -1]) + ] + + # 2 - check workflows + assert isinstance(pipeline.get_preprocessing(), Workflow) + assert isinstance(pipeline.get_run_level_analysis(), Workflow) + assert isinstance(pipeline.get_subject_level_analysis(), Workflow) + + group_level = pipeline.get_group_level_analysis() + assert len(group_level) == 3 + for sub_workflow in group_level: + assert isinstance(sub_workflow, Workflow) + + @staticmethod + @mark.unit_test + def test_outputs(): + """ Test the expected outputs of a PipelineTeam08MQ object """ + pipeline = PipelineTeam08MQ() + # 1 - 1 subject outputs + pipeline.subject_list = ['001'] + assert len(pipeline.get_preprocessing_outputs()) == 4*4 + assert len(pipeline.get_run_level_outputs()) == 8+4*3*4 + assert len(pipeline.get_subject_level_outputs()) == 4*3 + assert len(pipeline.get_group_level_outputs()) == 0 + assert len(pipeline.get_hypotheses_outputs()) == 18 + + # 2 - 4 subjects outputs + pipeline.subject_list = ['001', '002', '003', '004'] + assert len(pipeline.get_preprocessing_outputs()) == 4*4*4 + assert len(pipeline.get_run_level_outputs()) == (8+4*3*4)*4 + assert len(pipeline.get_subject_level_outputs()) == 4*3*4 + assert len(pipeline.get_group_level_outputs()) == 0 + assert len(pipeline.get_hypotheses_outputs()) == 18 + + @staticmethod + @mark.unit_test + def test_subject_information(): + """ Test the get_subject_information method """ + + information = PipelineTeam08MQ.get_subject_information(join( + Configuration()['directories']['test_data'], + 'pipelines', + 'events.tsv' + ))[0] + + assert isinstance(information, Bunch) + assert information.conditions == ['event', 'gain', 'loss', 'response'] + + reference_amplitudes = [ + [1.0, 1.0, 1.0, 1.0, 1.0], + [14.0, 34.0, 38.0, 10.0, 16.0], + [6.0, 14.0, 19.0, 15.0, 17.0], + [1.0, 1.0, 0.0, -1.0, -1.0] + ] + for reference_array, test_array in zip(reference_amplitudes, information.amplitudes): + assert isclose(reference_array, test_array).all() + + reference_durations = [ + [4.0, 4.0, 4.0, 4.0, 4.0], + [4.0, 4.0, 4.0, 4.0, 4.0], + [4.0, 4.0, 4.0, 4.0, 4.0], + [4.0, 4.0, 4.0, 4.0, 4.0] + ] + for reference_array, test_array in zip(reference_durations, information.durations): + assert isclose(reference_array, test_array).all() + + reference_onsets = [ + [4.071, 11.834, 19.535, 27.535, 36.435], + [4.071, 11.834, 19.535, 27.535, 36.435], + [4.071, 11.834, 19.535, 27.535, 36.435], + [4.071, 11.834, 19.535, 27.535, 36.435] + ] + for reference_array, test_array in zip(reference_onsets, information.onsets): + assert isclose(reference_array, test_array).all() + + @staticmethod + @mark.unit_test + def test_one_sample_t_test_regressors(): + """ Test the get_one_sample_t_test_regressors method """ + + regressors = PipelineTeam08MQ.get_one_sample_t_test_regressors(['001', '002']) + assert regressors == {'group_mean': [1, 1]} + + @staticmethod + @mark.unit_test + def test_two_sample_t_test_regressors(): + """ Test the get_two_sample_t_test_regressors method """ + + regressors, groups = PipelineTeam08MQ.get_two_sample_t_test_regressors( + ['001', '003'], # equalRange group + ['002', '004'], # equalIndifference group + ['001', '002', '003', '004'] # all subjects + ) + assert regressors == dict( + equalRange = [1, 0, 1, 0], + equalIndifference = [0, 1, 0, 1] + ) + assert groups == [1, 2, 1, 2] + + @staticmethod + @mark.pipeline_test + def test_execution(): + """ Test the execution of a PipelineTeam08MQ and compare results """ + helpers.test_pipeline_evaluation('08MQ') diff --git a/tests/pipelines/test_team_C88N.py b/tests/pipelines/test_team_C88N.py new file mode 100644 index 00000000..e6bb70fd --- /dev/null +++ b/tests/pipelines/test_team_C88N.py @@ -0,0 +1,140 @@ +#!/usr/bin/python +# coding: utf-8 + +""" Tests of the 'narps_open.pipelines.team_C88N' module. + +Launch this test with PyTest + +Usage: +====== + pytest -q test_team_C88N.py + pytest -q test_team_C88N.py -k +""" + +from os.path import join + +from pytest import helpers, mark +from numpy import isclose +from nipype import Workflow +from nipype.interfaces.base import Bunch + +from narps_open.utils.configuration import Configuration +from narps_open.pipelines.team_C88N import PipelineTeamC88N + +class TestPipelinesTeamC88N: + """ A class that contains all the unit tests for the PipelineTeamC88N class.""" + + @staticmethod + @mark.unit_test + def test_create(): + """ Test the creation of a PipelineTeamC88N object """ + + pipeline = PipelineTeamC88N() + + # 1 - check the parameters + assert pipeline.fwhm == 8.0 + assert pipeline.team_id == 'C88N' + + # 2 - check workflows + assert pipeline.get_preprocessing() is None + assert pipeline.get_run_level_analysis() is None + assert isinstance(pipeline.get_subject_level_analysis(), Workflow) + group_level = pipeline.get_group_level_analysis() + + assert len(group_level) == 5 + for sub_workflow in group_level: + assert isinstance(sub_workflow, Workflow) + + @staticmethod + @mark.unit_test + def test_outputs(): + """ Test the expected outputs of a PipelineTeamC88N object """ + pipeline = PipelineTeamC88N() + # 1 - 1 subject outputs + pipeline.subject_list = ['001'] + assert len(pipeline.get_preprocessing_outputs()) == 0 + assert len(pipeline.get_run_level_outputs()) == 0 + assert len(pipeline.get_subject_level_outputs()) == 8 + assert len(pipeline.get_group_level_outputs()) == 53 + assert len(pipeline.get_hypotheses_outputs()) == 18 + + # 2 - 4 subjects outputs + pipeline.subject_list = ['001', '002', '003', '004'] + assert len(pipeline.get_preprocessing_outputs()) == 0 + assert len(pipeline.get_run_level_outputs()) == 0 + assert len(pipeline.get_subject_level_outputs()) == 32 + assert len(pipeline.get_group_level_outputs()) == 53 + assert len(pipeline.get_hypotheses_outputs()) == 18 + + @staticmethod + @mark.unit_test + def test_subject_information(): + """ Test the get_subject_information method """ + + # Test with 'gain' + test_event_file = join(Configuration()['directories']['test_data'], 'pipelines', 'events.tsv') + information = PipelineTeamC88N.get_subject_information( + [test_event_file, test_event_file], + 'gain' + )[0] + + assert isinstance(information, Bunch) + assert information.conditions == ['trial'] + + reference_durations = [[0.0, 0.0, 0.0, 0.0]] + assert len(reference_durations) == len(information.durations) + for reference_array, test_array in zip(reference_durations, information.durations): + assert isclose(reference_array, test_array).all() + + reference_onsets = [[4.071, 11.834, 27.535, 36.435]] + assert len(reference_onsets) == len(information.onsets) + for reference_array, test_array in zip(reference_onsets, information.onsets): + assert isclose(reference_array, test_array).all() + + paramateric_modulation = information.pmod[0] + + assert isinstance(paramateric_modulation, Bunch) + assert paramateric_modulation.name == ['loss', 'gain'] + assert paramateric_modulation.poly == [1, 1] + + reference_param = [[6.0, 14.0, 15.0, 17.0], [14.0, 34.0, 10.0, 16.0]] + assert len(reference_param) == len(paramateric_modulation.param) + for reference_array, test_array in zip(reference_param, paramateric_modulation.param): + assert isclose(reference_array, test_array).all() + + # Test with 'loss' + test_event_file = join(Configuration()['directories']['test_data'], 'pipelines', 'events.tsv') + information = PipelineTeamC88N.get_subject_information( + [test_event_file, test_event_file], + 'loss' + )[0] + + assert isinstance(information, Bunch) + assert information.conditions == ['trial'] + + reference_durations = [[0.0, 0.0, 0.0, 0.0]] + assert len(reference_durations) == len(information.durations) + for reference_array, test_array in zip(reference_durations, information.durations): + assert isclose(reference_array, test_array).all() + + reference_onsets = [[4.071, 11.834, 27.535, 36.435]] + assert len(reference_onsets) == len(information.onsets) + for reference_array, test_array in zip(reference_onsets, information.onsets): + assert isclose(reference_array, test_array).all() + + paramateric_modulation = information.pmod[0] + + assert isinstance(paramateric_modulation, Bunch) + assert paramateric_modulation.name == ['gain', 'loss'] + assert paramateric_modulation.poly == [1, 1] + + reference_param = [[14.0, 34.0, 10.0, 16.0], [6.0, 14.0, 15.0, 17.0]] + assert len(reference_param) == len(paramateric_modulation.param) + for reference_array, test_array in zip(reference_param, paramateric_modulation.param): + assert isclose(reference_array, test_array).all() + + @staticmethod + @mark.pipeline_test + def test_execution(): + """ Test the execution of a PipelineTeamC88N and compare results """ + helpers.test_pipeline_evaluation('C88N') diff --git a/tests/test_data/pipelines/events.tsv b/tests/test_data/pipelines/events.tsv new file mode 100644 index 00000000..4b8f04e6 --- /dev/null +++ b/tests/test_data/pipelines/events.tsv @@ -0,0 +1,6 @@ +onset duration gain loss RT participant_response +4.071 4 14 6 2.388 weakly_accept +11.834 4 34 14 2.289 strongly_accept +19.535 4 38 19 0 NoResp +27.535 4 10 15 2.08 strongly_reject +36.435 4 16 17 2.288 weakly_reject \ No newline at end of file diff --git a/tests/test_runner.py b/tests/test_runner.py index 12a2059c..bb2a62c3 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -195,7 +195,7 @@ def test_create(): # 3 - Instantiate a runner with a not implemented team id with raises(NotImplementedError): - PipelineRunner('08MQ') + PipelineRunner('1K0E') # 4 - Instantiate a runner with an implemented team id runner = PipelineRunner('2T6S') @@ -204,7 +204,7 @@ def test_create(): # 5 - Modify team id for an existing runner (with a not implemented team id) with raises(NotImplementedError): - runner.team_id = '08MQ' + runner.team_id = '1K0E' @staticmethod @mark.unit_test