From 9ce943acd395731a862a57222efee8dd7eeb3801 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Boris=20Cl=C3=A9net?= Date: Tue, 17 Dec 2024 09:55:39 +0100 Subject: [PATCH] Refactoring pipeline --- narps_open/pipelines/team_98BT.py | 421 ++++++++++++++++-------------- 1 file changed, 220 insertions(+), 201 deletions(-) diff --git a/narps_open/pipelines/team_98BT.py b/narps_open/pipelines/team_98BT.py index bb14a708..e495be82 100755 --- a/narps_open/pipelines/team_98BT.py +++ b/narps_open/pipelines/team_98BT.py @@ -58,60 +58,57 @@ def get_dartel_template_sub_workflow(self): Returns: - dartel : nipype.WorkFlow """ - # Infosource Node - To iterate on subjects + + # Init workflow + dartel_workflow = Workflow( + base_dir = self.directories.working_dir, name = 'dartel_workflow') + + # IDENTITY INTERFACE - To iterate on subjects information_source = Node(IdentityInterface( fields = ['subject_id']), name = 'information_source') information_source.iterables = ('subject_id', self.subject_list) - # SelectFiles Node - to select necessary files + # SELECT FILES - Select anat file template = { 'anat' : join('sub-{subject_id}', 'anat', 'sub-{subject_id}_T1w.nii.gz') } select_files = Node(SelectFiles(template), name = 'select_files') select_files.inputs.base_directory = self.directories.dataset_dir + dartel_workflow.connect(information_source, 'subject_id', select_files, 'subject_id') - # Gunzip node - SPM do not use .nii.gz files + # GUNZIP - SPM do not use .nii.gz files gunzip_anat = Node(Gunzip(), name = 'gunzip_anat') + dartel_workflow.connect(select_files, 'anat', gunzip_anat, 'in_file') - def get_dartel_input(structural_files): - print(structural_files) - return structural_files - - dartel_input = JoinNode(Function( - function = get_dartel_input, - input_names = ['structural_files'], - output_names = ['structural_files']), - name = 'dartel_input', + # IDENTITY INTERFACE - Join all gunziped files + dartel_inputs = JoinNode(IdentityInterface(fields = ['structural_files']), + name = 'dartel_inputs', joinsource = 'information_source', joinfield = 'structural_files') + dartel_workflow.connect(gunzip_anat, 'out_file', dartel_inputs, 'structural_files') + # RENAME - Rename files before dartel workflow rename_dartel = MapNode(Rename(format_string = 'subject_id_%(subject_id)s_struct'), iterfield = ['in_file', 'subject_id'], name = 'rename_dartel') rename_dartel.inputs.subject_id = self.subject_list rename_dartel.inputs.keep_ext = True + dartel_workflow.connect(dartel_inputs, 'structural_files', rename_dartel) + # DARTEL - Using a already existing workflow dartel_sub_workflow = create_DARTEL_template(name = 'dartel_sub_workflow') dartel_sub_workflow.inputs.inputspec.template_prefix = 'template' + dartel_workflow.connect( + rename_dartel, 'out_file', dartel_sub_workflow, 'inputspec.structural_files') - # DataSink Node - store the wanted results in the wanted repository + # DATASINK - store the wanted results in the wanted repository data_sink = Node(DataSink(), name = 'data_sink') data_sink.inputs.base_directory = self.directories.output_dir - - # Create dartel workflow and connect its nodes - dartel_workflow = Workflow( - base_dir = self.directories.working_dir, name = 'dartel_workflow') - dartel_workflow.connect([ - (information_source, select_files, [('subject_id', 'subject_id')]), - (select_files, gunzip_anat, [('anat', 'in_file')]), - (gunzip_anat, dartel_input, [('out_file', 'structural_files')]), - (dartel_input, rename_dartel, [('structural_files', 'in_file')]), - (rename_dartel, dartel_sub_workflow, [('out_file', 'inputspec.structural_files')]), - (dartel_sub_workflow, data_sink, [ - ('outputspec.template_file', 'dartel_template.@template_file'), - ('outputspec.flow_fields', 'dartel_template.@flow_fields')]) - ]) + dartel_workflow.connect( + dartel_sub_workflow, 'outputspec.template_file', data_sink, 'dartel_template.@template_file') + dartel_workflow.connect( + dartel_sub_workflow, 'outputspec.flow_fields', data_sink, 'dartel_template.@flow_fields') # Remove large files, if requested if Configuration()['pipelines']['remove_unused_data']: @@ -122,12 +119,8 @@ def get_dartel_input(structural_files): input_names = ['_', 'file_name'], output_names = [] ), name = 'remove_gunzip') - - # Add connections - dartel_workflow.connect([ - (gunzip_anat, remove_gunzip, [('out_file', 'file_name')]), - (data_sink, remove_gunzip, [('out_file', '_')]) - ]) + dartel_workflow.connect(gunzip_anat, 'out_file', remove_gunzip, 'file_name') + dartel_workflow.connect(data_sink, 'out_file', remove_gunzip, '_') return dartel_workflow @@ -165,19 +158,26 @@ def get_preprocessing_sub_workflow(self): Returns: - preprocessing : nipype.WorkFlow """ - # Infosource Node - To iterate on subjects - information_source = Node(IdentityInterface( + # Create preprocessing workflow + preprocessing = Workflow(base_dir = self.directories.working_dir, name = 'preprocessing') + + # IDENTITY INTERFACE - To iterate on subjects + information_source_subjects = Node(IdentityInterface( + fields = ['subject_id']), + name = 'information_source_subjects') + information_source_subjects.iterables = ('subject_id', self.subject_list) + + # IDENTITY INTERFACE - To iterate on runs + information_source_runs = Node(IdentityInterface( fields = ['subject_id', 'run_id']), - name = 'information_source') - information_source.iterables = [ - ('subject_id', self.subject_list), ('run_id', self.run_list) - ] + name = 'information_source_runs') + information_source_runs.iterables = ('run_id', self.run_list) + preprocessing.connect( + information_source_runs, 'subject_id', information_source_subjects, 'subject_id') - # SelectFiles Node - Select necessary files + # SELECT FILES - Select necessary subject files templates = { 'anat' : join('sub-{subject_id}', 'anat', 'sub-{subject_id}_T1w.nii.gz'), - 'func' : join('sub-{subject_id}', 'func', - 'sub-{subject_id}_task-MGT_run-{run_id}_bold.nii.gz'), 'magnitude' : join('sub-{subject_id}', 'fmap', 'sub-{subject_id}_magnitude*.nii.gz'), 'phasediff' : join('sub-{subject_id}', 'fmap', 'sub-{subject_id}_phasediff.nii.gz'), 'info_fmap' : join('sub-{subject_id}', 'fmap', 'sub-{subject_id}_phasediff.json'), @@ -186,27 +186,51 @@ def get_preprocessing_sub_workflow(self): 'dartel_template' :join( self.directories.output_dir, 'dartel_template', 'template_6.nii') } - select_files = Node(SelectFiles(templates), name = 'select_files') - select_files.inputs.base_directory = self.directories.dataset_dir + select_subject_files = Node(SelectFiles(templates), name = 'select_subject_files') + select_subject_files.inputs.base_directory = self.directories.dataset_dir + preprocessing.connect( + information_source_subjects, 'subject_id', select_subject_files, 'subject_id') - # Gunzip nodes - gunzip files because SPM do not use .nii.gz files + # SELECT FILES - Select necessary run files + templates = { + 'func' : join('sub-{subject_id}', 'func', + 'sub-{subject_id}_task-MGT_run-{run_id}_bold.nii.gz') + } + select_run_files = Node(SelectFiles(templates), name = 'select_run_files') + select_run_files.inputs.base_directory = self.directories.dataset_dir + preprocessing.connect(information_source, 'subject_id', select_run_files, 'subject_id') + preprocessing.connect(information_source, 'run_id', select_run_files, 'run_id') + + # GUNZIP - gunzip files because SPM do not use .nii.gz files gunzip_anat = Node(Gunzip(), name = 'gunzip_anat') gunzip_func = Node(Gunzip(), name = 'gunzip_func') gunzip_magnitude = Node(Gunzip(), name = 'gunzip_magnitude') gunzip_phasediff = Node(Gunzip(), name = 'gunzip_phasediff') - - # Function Node get_fieldmap_info - + preprocessing.connect(select_subject_files, 'anat', gunzip_anat, 'in_file'), + preprocessing.connect(select_run_files, 'func', gunzip_func, 'in_file') + preprocessing.connect(select_subject_files, 'phasediff', gunzip_phasediff, 'in_file') + + # FUNCTION Node get_fieldmap_info - Retrieve magnitude and phasediff metadata to decide + # which files to use for the fieldmap node, and what echo times fieldmap_info = Node(Function( function = self.get_fieldmap_info, input_names = ['fieldmap_info_file', 'magnitude_files'], output_names = ['echo_times', 'magnitude_file']), name = 'fieldmap_info') + preprocessing.connect( + select_subject_files, 'info_fmap', fieldmap_info, 'fieldmap_info_file') + preprocessing.connect(select_subject_files, 'magnitude', fieldmap_info, 'magnitude_files') + preprocessing.connect(fieldmap_info, 'magnitude_file', gunzip_magnitude, 'in_file') - # FieldMap Node - + # FIELDMAP Node - create fieldmap from phasediff and magnitude files fieldmap = Node(FieldMap(), name = 'fieldmap') fieldmap.inputs.blip_direction = -1 fieldmap.inputs.total_readout_time = TaskInformation()['TotalReadoutTime'] - + preprocessing.connect(fieldmap_info, 'echo_times', fieldmap, 'echo_times') + preprocessing.connect(gunzip_magnitude, 'out_file', fieldmap, 'magnitude_file') + preprocessing.connect(gunzip_phasediff, 'out_file', fieldmap, 'phase_file') + preprocessing.connect(gunzip_func, 'out_file', fieldmap, 'epi_file') + # Get SPM Tissue Probability Maps file spm_tissues_file = join(SPMInfo.getinfo()['path'], 'tpm', 'TPM.nii') @@ -222,6 +246,7 @@ def get_preprocessing_sub_workflow(self): [(spm_tissues_file, 5), 4, (True,False), (False, False)], [(spm_tissues_file, 6), 2, (False,False), (False, False)] ] + preprocessing.connect(gunzip_anat, 'out_file', segmentation, 'channel_files') # Slice timing - SPM slice time correction with default parameters slice_timing = Node(SliceTiming(), name = 'slice_timing') @@ -230,74 +255,62 @@ def get_preprocessing_sub_workflow(self): slice_timing.inputs.slice_order = TaskInformation()['SliceTiming'] slice_timing.inputs.time_acquisition = TaskInformation()['AcquisitionTime'] slice_timing.inputs.time_repetition = TaskInformation()['RepetitionTime'] + preprocessing.connect(gunzip_func, 'out_file', slice_timing, 'in_files') # Motion correction - SPM realign and unwarp motion_correction = Node(RealignUnwarp(), name = 'motion_correction') motion_correction.inputs.interp = 4 - + preprocessing.connect(fieldmap, 'vdm', motion_correction, 'phase_map') + preprocessing.connect(slice_timing, 'timecorrected_files', motion_correction, 'in_files') + # Intrasubject coregistration extract_first = Node(ExtractROI(), name = 'extract_first') extract_first.inputs.t_min = 1 extract_first.inputs.t_size = 1 extract_first.inputs.output_type = 'NIFTI' - + preprocessing.connect( + motion_correction, 'realigned_unwarped_files', extract_first, 'in_file') + preprocessing.connect(extract_first, 'roi_file', coregistration, 'source') + coregistration = Node(Coregister(), name = 'coregistration') coregistration.inputs.cost_function = 'nmi' coregistration.inputs.jobtype = 'estimate' - + preprocessing.connect( + motion_correction, 'realigned_unwarped_files', coregistration, 'apply_to_files') + preprocessing.connect(gunzip_anat, 'out_file', coregistration, 'target') + dartel_norm_func = Node(DARTELNorm2MNI(), name = 'dartel_norm_func') dartel_norm_func.inputs.fwhm = self.fwhm dartel_norm_func.inputs.modulate = False dartel_norm_func.inputs.voxel_size = (2.3, 2.3, 2.15) - + preprocessing.connect( + select_subject_files, 'dartel_flow_field', dartel_norm_func, 'flowfield_files') + preprocessing.connect( + select_subject_files, 'dartel_template', dartel_norm_func, 'template_file') + preprocessing.connect( + coregistration, 'coregistered_files', dartel_norm_func, 'apply_to_files') + dartel_norm_anat = Node(DARTELNorm2MNI(), name = 'dartel_norm_anat') dartel_norm_anat.inputs.fwhm = self.fwhm dartel_norm_anat.inputs.voxel_size = (1, 1, 1) - + preprocessing.connect( + select_subject_files, 'dartel_flow_field', dartel_norm_anat, 'flowfield_files') + preprocessing.connect( + select_subject_files, 'dartel_template', dartel_norm_anat, 'template_file') + preprocessing.connect(gunzip_anat, 'out_file', dartel_norm_anat, 'apply_to_files') + # DataSink Node - store the wanted results in the wanted repository data_sink = Node(DataSink(), name = 'data_sink') data_sink.inputs.base_directory = self.directories.output_dir - - # Create preprocessing workflow and connect its nodes - preprocessing = Workflow(base_dir = self.directories.working_dir, name = 'preprocessing') - preprocessing.connect([ - (information_source, select_files, [ - ('subject_id', 'subject_id'), - ('run_id', 'run_id')]), - (select_files, gunzip_anat, [('anat', 'in_file')]), - (select_files, gunzip_func, [('func', 'in_file')]), - (select_files, gunzip_phasediff, [('phasediff', 'in_file')]), - (select_files, fieldmap_info, [ - ('info_fmap', 'fieldmap_info_file'), - ('magnitude', 'magnitude_files')]), - (fieldmap_info, gunzip_magnitude, [('magnitude_file', 'in_file')]), - (fieldmap_info, fieldmap, [('echo_times', 'echo_times')]), - (gunzip_magnitude, fieldmap, [('out_file', 'magnitude_file')]), - (gunzip_phasediff, fieldmap, [('out_file', 'phase_file')]), - (gunzip_func, fieldmap, [('out_file', 'epi_file')]), - (fieldmap, motion_correction, [('vdm', 'phase_map')]), - (gunzip_anat, segmentation, [('out_file', 'channel_files')]), - (gunzip_func, slice_timing, [('out_file', 'in_files')]), - (slice_timing, motion_correction, [('timecorrected_files', 'in_files')]), - (motion_correction, coregistration, [('realigned_unwarped_files', 'apply_to_files')]), - (gunzip_anat, coregistration, [('out_file', 'target')]), - (motion_correction, extract_first, [('realigned_unwarped_files', 'in_file')]), - (extract_first, coregistration, [('roi_file', 'source')]), - (select_files, dartel_norm_func, [ - ('dartel_flow_field', 'flowfield_files'), - ('dartel_template', 'template_file')]), - (select_files, dartel_norm_anat, [ - ('dartel_flow_field', 'flowfield_files'), - ('dartel_template', 'template_file')]), - (gunzip_anat, dartel_norm_anat, [('out_file', 'apply_to_files')]), - (coregistration, dartel_norm_func, [('coregistered_files', 'apply_to_files')]), - (dartel_norm_func, data_sink, [ - ('normalized_files', 'preprocessing.@normalized_files')]), - (motion_correction, data_sink, [ - ('realigned_unwarped_files', 'preprocessing.@motion_corrected'), - ('realignment_parameters', 'preprocessing.@param')]), - (segmentation, data_sink, [('normalized_class_images', 'preprocessing.@seg')]), - ]) + preprocessing.connect( + dartel_norm_func, 'normalized_files', data_sink, 'preprocessing.@normalized_files') + preprocessing.connect( + motion_correction, 'realigned_unwarped_files', + data_sink, 'preprocessing.@motion_corrected') + preprocessing.connect( + motion_correction, 'realignment_parameters', data_sink, 'preprocessing.@param') + preprocessing.connect( + segmentation, 'normalized_class_images', data_sink, 'preprocessing.@seg') # Remove large files, if requested if Configuration()['pipelines']['remove_unused_data']: @@ -312,8 +325,6 @@ def get_preprocessing_sub_workflow(self): input_names = ['_', 'file_name'], output_names = [] ), name = 'remove_after_datasink', iterfield = 'file_name') - - # Add connections preprocessing.connect([ (gunzip_func, merge_removable_files, [('out_file', 'in1')]), (gunzip_phasediff, merge_removable_files, [('out_file', 'in2')]), @@ -506,13 +517,19 @@ def get_subject_level_analysis(self): Returns: - subject_level_analysis : nipype.WorkFlow """ - # Infosource Node - To iterate on subjects + # Init workflow + subject_level_analysis = Workflow( + base_dir = self.directories.working_dir, + name = 'subject_level_analysis' + ) + + # IDENTITY INTERFACE - To iterate on subjects information_source = Node(IdentityInterface( fields = ['subject_id']), name = 'information_source') information_source.iterables = [('subject_id', self.subject_list)] - # SelectFiles Node - to select necessary files + # SELECT FILES - to select necessary files templates = { 'func' : join(self.directories.output_dir, 'preprocessing', '_run_id_*_subject_id_{subject_id}', @@ -531,20 +548,19 @@ def get_subject_level_analysis(self): } select_files = Node(SelectFiles(templates),name = 'select_files') select_files.inputs.base_directory = self.directories.results_dir + subject_level_analysis.connect( + information_source, 'subject_id', select_files, 'subject_id') - # DataSink Node - store the wanted results in the wanted repository - data_sink = Node(DataSink(), name = 'data_sink') - data_sink.inputs.base_directory = self.directories.output_dir - - # Get Subject Info - get subject specific condition information + # FUNCTION node get_subject_information - get subject specific condition information subject_information = MapNode(Function( function = self.get_subject_information, input_names = ['event_file', 'short_run_id'], output_names = ['subject_info']), name = 'subject_information', iterfield = ['event_file', 'short_run_id']) subject_information.inputs.short_run_id = list(range(1, len(self.run_list) + 1)) - - # Get parameters + subject_level_analysis.connect(select_files, 'events', subject_information, 'event_file') + + # FUNCTION node get_parameters_file - Get subject parameters parameters = MapNode(Function( function = self.get_parameters_file, input_names = [ @@ -560,57 +576,56 @@ def get_subject_level_analysis(self): name = 'parameters') parameters.inputs.run_id = self.run_list parameters.inputs.working_dir = self.directories.working_dir + subject_level_analysis.connect(information_source, 'subject_id', parameters, 'subject_id') + subject_level_analysis.connect(select_files, 'motion_correction', parameters, 'func_file') + subject_level_analysis.connect(select_files, 'param', parameters,'parameters_file') + subject_level_analysis.connect(select_files, 'wc2', parameters,'wc2_file') - # SpecifyModel - Generates SPM-specific Model + # SPECIFY MODEL - Generates SPM-specific Model specify_model = Node(SpecifySPMModel(), name = 'specify_model') specify_model.inputs.concatenate_runs = False specify_model.inputs.input_units = 'secs' specify_model.inputs.output_units = 'secs' specify_model.inputs.time_repetition = TaskInformation()['RepetitionTime'] specify_model.inputs.high_pass_filter_cutoff = 128 - - # Level1Design - Generates an SPM design matrix + subject_level_analysis.connect(select_files, 'func', specify_model, 'functional_runs') + subject_level_analysis.connect( + subject_information, 'subject_info', specify_model, 'subject_info') + subject_level_analysis.connect( + parameters, 'new_parameters_file', specify_model, 'realignment_parameters') + + # LEVEL1DESIGN - Generates an SPM design matrix model_design = Node(Level1Design(), name = 'model_design') model_design.inputs.bases = {'hrf': {'derivs': [1, 1]}} model_design.inputs.timing_units = 'secs' model_design.inputs.interscan_interval = TaskInformation()['RepetitionTime'] + subject_level_analysis.connect(specify_model, 'session_info', model_design, 'session_info') - # EstimateModel - estimate the parameters of the model + # ESTIMATE MODEL - estimate the parameters of the model model_estimate = Node(EstimateModel(), name = 'model_estimate') model_estimate.inputs.estimation_method = {'Classical': 1} + subject_level_analysis.connect( + model_design, 'spm_mat_file', model_estimate, 'spm_mat_file') - # EstimateContrast - estimates contrasts + # ESTIMATE CONTRAST - estimates contrasts contrast_estimate = Node(EstimateContrast(), name = 'contrast_estimate') contrast_estimate.inputs.contrasts = self.subject_level_contrasts - - # Create l1 analysis workflow and connect its nodes - subject_level_analysis = Workflow( - base_dir = self.directories.working_dir, - name = 'subject_level_analysis' - ) - subject_level_analysis.connect([ - (information_source, select_files, [('subject_id', 'subject_id')]), - (information_source, parameters, [('subject_id', 'subject_id')]), - (select_files, subject_information, [('events', 'event_file')]), - (select_files, parameters, [ - ('motion_correction', 'func_file'), - ('param', 'parameters_file'), - ('wc2', 'wc2_file')]), - (select_files, specify_model, [('func', 'functional_runs')]), - (subject_information, specify_model, [('subject_info', 'subject_info')]), - (parameters, specify_model, [ - ('new_parameters_file', 'realignment_parameters')]), - (specify_model, model_design, [('session_info', 'session_info')]), - (model_design, model_estimate, [('spm_mat_file', 'spm_mat_file')]), - (model_estimate, contrast_estimate, [ - ('spm_mat_file', 'spm_mat_file'), - ('beta_images', 'beta_images'), - ('residual_image', 'residual_image')]), - (contrast_estimate, data_sink, [ - ('con_images', 'subject_level_analysis.@con_images'), - ('spmT_images', 'subject_level_analysis.@spmT_images'), - ('spm_mat_file', 'subject_level_analysis.@spm_mat_file')]) - ]) + subject_level_analysis.connect( + model_estimate, 'spm_mat_file', contrast_estimate, 'spm_mat_file') + subject_level_analysis.connect( + model_estimate, 'beta_images', contrast_estimate, 'beta_images') + subject_level_analysis.connect( + model_estimate, 'residual_image', contrast_estimate,'residual_image') + + # DATASINK - store the wanted results in the wanted repository + data_sink = Node(DataSink(), name = 'data_sink') + data_sink.inputs.base_directory = self.directories.output_dir + subject_level_analysis.connect( + contrast_estimate, 'con_images', data_sink, 'subject_level_analysis.@con_images') + subject_level_analysis.connect( + contrast_estimate, 'spmT_images', data_sink, 'subject_level_analysis.@spmT_images') + subject_level_analysis.connect( + contrast_estimate, 'spm_mat_file', data_sink, 'subject_level_analysis.@spm_mat_file') return subject_level_analysis @@ -665,26 +680,28 @@ def get_group_level_analysis_sub_workflow(self, method): # Compute the number of participants used to do the analysis nb_subjects = len(self.subject_list) - # Infosource - a function free node to iterate over the list of subject names + # Init workflow + group_level_analysis = Workflow( + base_dir = self.directories.working_dir, + name = f'group_level_analysis_{method}_nsub_{nb_subjects}') + + # IDENTITY INTERFACE - iterate over the list of contrasts information_source = Node( IdentityInterface( fields = ['contrast_id', 'subjects']), name = 'information_source') information_source.iterables = [('contrast_id', self.contrast_list)] - # SelectFiles + # SELECT FILES - get contrast files from subject-level output templates = { 'contrasts' : join(self.directories.output_dir, 'subject_level_analysis', '_subject_id_*', 'con_{contrast_id}.nii') } - select_files = Node(SelectFiles(templates), name = 'select_files') select_files.inputs.base_directory = self.directories.results_dir select_files.inputs.force_list = True - - # Datasink - save important files - data_sink = Node(DataSink(), name = 'data_sink') - data_sink.inputs.base_directory = self.directories.output_dir + group_level_analysis.connect( + information_source, 'contrast_id', select_files, 'contrast_id') # Function Node get_equal_range_subjects # Get subjects in the equalRange group and in the subject_list @@ -726,16 +743,23 @@ def get_group_level_analysis_sub_workflow(self, method): ), name = 'get_contrasts', iterfield = 'input_str' ) + group_level_analysis.connect(select_files, 'contrasts', get_contrasts, 'input_str') - # Estimate model + # ESTIMATE MODEL (inputs will be set later, depending on the method) estimate_model = Node(EstimateModel(), name = 'estimate_model') estimate_model.inputs.estimation_method = {'Classical':1} - # Estimate contrasts + # ESTIMATE CONTRAST estimate_contrast = Node(EstimateContrast(), name = 'estimate_contrast') estimate_contrast.inputs.group_contrast = True - - # Create thresholded maps + group_level_analysis.connect( + estimate_model, 'spm_mat_file', estimate_contrast, 'spm_mat_file') + group_level_analysis.connect( + estimate_model, 'residual_image', estimate_contrast, 'residual_image') + group_level_analysis.connect( + estimate_model, 'beta_images', estimate_contrast, 'beta_images') + + # THRESHOLD - Create thresholded maps threshold = MapNode(Threshold(), name = 'threshold', iterfield = ['stat_image', 'contrast_index']) threshold.inputs.use_fwe_correction = False @@ -744,28 +768,8 @@ def get_group_level_analysis_sub_workflow(self, method): threshold.inputs.use_topo_fdr = False threshold.inputs.force_activation = True threshold.synchronize = True - - group_level_analysis = Workflow( - base_dir = self.directories.working_dir, - name = f'group_level_analysis_{method}_nsub_{nb_subjects}') - group_level_analysis.connect([ - (information_source, select_files, [('contrast_id', 'contrast_id')]), - (select_files, get_contrasts, [('contrasts', 'input_str')]), - (estimate_model, estimate_contrast, [ - ('spm_mat_file', 'spm_mat_file'), - ('residual_image', 'residual_image'), - ('beta_images', 'beta_images')]), - (estimate_contrast, threshold, [ - ('spm_mat_file', 'spm_mat_file'), - ('spmT_images', 'stat_image')]), - (threshold, data_sink, [ - ('thresholded_map', f'group_level_analysis_{method}_nsub_{nb_subjects}.@thresh')]), - (estimate_model, data_sink, [ - ('mask_image', f'group_level_analysis_{method}_nsub_{nb_subjects}.@mask')]), - (estimate_contrast, data_sink, [ - ('spm_mat_file', f'group_level_analysis_{method}_nsub_{nb_subjects}.@spm_mat'), - ('spmT_images', f'group_level_analysis_{method}_nsub_{nb_subjects}.@T'), - ('con_images', f'group_level_analysis_{method}_nsub_{nb_subjects}.@con')])]) + group_level_analysis.connect(estimate_contrast, 'spm_mat_file', threshold, 'spm_mat_file') + group_level_analysis.connect(estimate_contrast, 'spmT_images', threshold, 'stat_image') if method in ('equalRange', 'equalIndifference'): estimate_contrast.inputs.contrasts = [ @@ -773,18 +777,15 @@ def get_group_level_analysis_sub_workflow(self, method): ('Group', 'T', ['mean'], [-1]) ] - threshold.inputs.contrast_index = [1, 2] - - # Specify design matrix + # ONE SAMPLE T TEST DESIGN - Create the design matrix one_sample_t_test_design = Node(OneSampleTTestDesign(), name = 'one_sample_t_test_design') + group_level_analysis.connect( + get_contrasts, ('out_list', clean_list), one_sample_t_test_design, 'in_files') + group_level_analysis.connect( + one_sample_t_test_design, 'spm_mat_file', estimate_model, 'spm_mat_file') - group_level_analysis.connect([ - (get_contrasts, one_sample_t_test_design, [ - (('out_list', clean_list), 'in_files') - ]), - (one_sample_t_test_design, estimate_model, [('spm_mat_file', 'spm_mat_file')]) - ]) + threshold.inputs.contrast_index = [1, 2] if method == 'equalRange': group_level_analysis.connect( @@ -799,12 +800,14 @@ def get_group_level_analysis_sub_workflow(self, method): ) elif method == 'groupComp': + group_level_analysis.connect( + get_equal_range_subjects, ('out_list', complete_subject_ids), + get_contrasts, 'elements') + estimate_contrast.inputs.contrasts = [ ('Eq range vs Eq indiff in loss', 'T', ['Group_{1}', 'Group_{2}'], [1, -1]) ] - threshold.inputs.contrast_index = [1] - # Function Node elements_in_string # Get contrast files for required subjects # Note : using a MapNode with elements_in_string requires using clean_list to remove @@ -816,27 +819,43 @@ def get_group_level_analysis_sub_workflow(self, method): ), name = 'get_contrasts_2', iterfield = 'input_str' ) + group_level_analysis.connect(select_files, 'contrasts', get_contrasts_2, 'input_str') + group_level_analysis.connect( + get_equal_indifference_subjects, ('out_list', complete_subject_ids), + get_contrasts_2, 'elements') - # Node for the design matrix + # TWO SAMPLE T TEST DESIGN - Create the design matrix two_sample_t_test_design = Node(TwoSampleTTestDesign(), name = 'two_sample_t_test_design') + group_level_analysis.connect( + get_contrasts, ('out_list', clean_list), + two_sample_t_test_design, 'group1_files') + group_level_analysis.connect( + get_contrasts_2, ('out_list', clean_list), + two_sample_t_test_design, 'group2_files') + group_level_analysis.connect( + two_sample_t_test_design, 'spm_mat_file', estimate_model, 'spm_mat_file') - group_level_analysis.connect([ - (select_files, get_contrasts_2, [('contrasts', 'input_str')]), - (get_equal_range_subjects, get_contrasts, [ - (('out_list', complete_subject_ids), 'elements') - ]), - (get_equal_indifference_subjects, get_contrasts_2, [ - (('out_list', complete_subject_ids), 'elements') - ]), - (get_contrasts, two_sample_t_test_design, [ - (('out_list', clean_list), 'group1_files') - ]), - (get_contrasts_2, two_sample_t_test_design, [ - (('out_list', clean_list), 'group2_files') - ]), - (two_sample_t_test_design, estimate_model, [('spm_mat_file', 'spm_mat_file')]) - ]) + threshold.inputs.contrast_index = [1] + + # Datasink - save important files + data_sink = Node(DataSink(), name = 'data_sink') + data_sink.inputs.base_directory = self.directories.output_dir + group_level_analysis.connect( + threshold, 'thresholded_map', + data_sink, f'group_level_analysis_{method}_nsub_{nb_subjects}.@thresh') + group_level_analysis.connect( + estimate_model, 'mask_image', + data_sink, f'group_level_analysis_{method}_nsub_{nb_subjects}.@mask') + group_level_analysis.connect( + estimate_contrast, 'spm_mat_file', + data_sink, f'group_level_analysis_{method}_nsub_{nb_subjects}.@spm_mat') + group_level_analysis.connect( + estimate_contrast, 'spmT_images', + data_sink, f'group_level_analysis_{method}_nsub_{nb_subjects}.@T') + group_level_analysis.connect( + estimate_contrast, 'con_images', + data_sink, f'group_level_analysis_{method}_nsub_{nb_subjects}.@con') return group_level_analysis