From 69fd60b0b5fbe611e48643d8acfa9f9aaf064b90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Boris=20Cl=C3=A9net?= Date: Tue, 29 Aug 2023 14:42:21 +0200 Subject: [PATCH] [TEST] change pipeline test_utils to avoid having to rewrite it at every repro --- tests/pipelines/test_pipelines.py | 8 +- tests/test_conftest.py | 139 ++++++++++++++++++++++++++++++ 2 files changed, 143 insertions(+), 4 deletions(-) create mode 100644 tests/test_conftest.py diff --git a/tests/pipelines/test_pipelines.py b/tests/pipelines/test_pipelines.py index 9016aeb7..c38cf36a 100644 --- a/tests/pipelines/test_pipelines.py +++ b/tests/pipelines/test_pipelines.py @@ -135,8 +135,8 @@ class TestUtils: @mark.unit_test def test_utils(): """ Test the utils methods of PipelineRunner """ - # 1 - Get number of not implemented pipelines - assert len(get_not_implemented_pipelines()) == 69 + # 1 - Get not implemented pipelines + assert '1K0E' in get_not_implemented_pipelines() - # 2 - Get number of implemented pipelines - assert len(get_implemented_pipelines()) == 1 + # 2 - Get implemented pipelines + assert '2T6S' in get_implemented_pipelines() diff --git a/tests/test_conftest.py b/tests/test_conftest.py new file mode 100644 index 00000000..5b1a1f4f --- /dev/null +++ b/tests/test_conftest.py @@ -0,0 +1,139 @@ +#!/usr/bin/python +# coding: utf-8 + +""" Tests of the 'conftest.py' module. + +Launch this test with PyTest + +Usage: +====== + pytest -q test_conftest.py + pytest -q test_conftest.py -k +""" + +from os import remove +from os.path import join, isfile, abspath +from pathlib import Path + +from datetime import datetime + +from pytest import raises, mark + +from nipype import Node, Workflow +from nipype.interfaces.utility import Function + +from narps_open.utils.configuration import Configuration +from narps_open.runner import PipelineRunner +from narps_open.pipelines import Pipeline +from narps_open.pipelines.team_2T6S import PipelineTeam2T6S + +class MockupPipeline(Pipeline): + """ A simple Pipeline class for test purposes """ + + def __init__(self): + super().__init__() + self.test_file = abspath( + join(Configuration()['directories']['test_runs'], 'test_conftest.txt')) + if isfile(self.test_file): + remove(self.test_file) + + def __del__(self): + if isfile(self.test_file): + remove(self.test_file) + + # @staticmethod + def write_to_file(_, text_to_write: str, file_path: str): + """ Method used inside a nipype Node, to write a line in a test file """ + with open(file_path, 'a', encoding = 'utf-8') as file: + file.write(text_to_write) + + def create_workflow(self, workflow_name: str): + """ Return a nipype workflow with two nodes writing in a file """ + node_1 = Node(Function( + input_names = ['_', 'text_to_write', 'file_path'], + output_names = ['_'], + function = self.write_to_file), + name = 'node_1' + ) + # this input is set to now(), so that it changes at every run, thus preventing + # nipype's cache to work + node_1.inputs._ = datetime.now() + node_1.inputs.text_to_write = 'MockupPipeline : '+workflow_name+' node_1\n' + node_1.inputs.file_path = self.test_file + + node_2 = Node(Function( + input_names = ['_', 'text_to_write', 'file_path'], + output_names = [], + function = self.write_to_file), + name = 'node_2' + ) + node_2.inputs.text_to_write = 'MockupPipeline : '+workflow_name+' node_2\n' + node_2.inputs.file_path = self.test_file + + workflow = Workflow( + base_dir = Configuration()['directories']['test_runs'], + name = workflow_name + ) + workflow.add_nodes([node_1, node_2]) + workflow.connect(node_1, '_', node_2, '_') + + return workflow + + def get_preprocessing(self): + """ Return a fake preprocessing workflow """ + return self.create_workflow('TestPipelineRunner_preprocessing_workflow') + + def get_run_level_analysis(self): + """ Return a fake run level workflow """ + return self.create_workflow('TestPipelineRunner_run_level_workflow') + + def get_subject_level_analysis(self): + """ Return a fake subject level workflow """ + return self.create_workflow('TestPipelineRunner_subject_level_workflow') + + def get_group_level_analysis(self): + """ Return a fake subject level workflow """ + return self.create_workflow('TestPipelineRunner_group_level_workflow') + + def get_preprocessing_outputs(self): + """ Return a list of templates of the output files generated by the preprocessing """ + return [join(Configuration()['directories']['test_runs'], 'preprocessing_output.md')] + + def get_run_level_outputs(self): + """ Return a list of templates of the output files generated by the run level analysis. + Templates are expressed relatively to the self.directories.output_dir. + """ + return [join(Configuration()['directories']['test_runs'], 'run_output.md')] + + def get_subject_level_outputs(self): + """ Return a list of templates of the output files generated by the subject level analysis. + Templates are expressed relatively to the self.directories.output_dir. + """ + templates = [ + join(Configuration()['directories']['test_runs'], 'subject_{subject_id}_output_1.md'), + join(Configuration()['directories']['test_runs'], 'subject_{subject_id}_output_2.md') + ] + return_list = [] + for subject_id in self.subject_list: + return_list += [t.format(subject_id = subject_id) for t in templates] + + return return_list + + def get_group_level_outputs(self): + """ Return a list of templates of the output files generated by the group level analysis. + Templates are expressed relatively to the self.directories.output_dir. + """ + templates = [ + join(Configuration()['directories']['test_runs'], 'group_{nb_subjects}_output_a.md'), + join(Configuration()['directories']['test_runs'], 'group_{nb_subjects}_output_b.md') + ] + return [t.format(nb_subjects = len(self.subject_list)) for t in templates] + + def get_hypotheses_outputs(self): + """ Return the names of the files used by the team to answer the hypotheses of NARPS. + """ + template = join(Configuration()['directories']['test_runs'], 'hypothesis_{id}.md') + return [template.format(id = i) for i in range(1,18)] + +class TestPipelineRunner: + """ A class that contains all the unit tests for the PipelineRunner class."""