diff --git a/.gitignore b/.gitignore index 71450ef6d..356a37f04 100644 --- a/.gitignore +++ b/.gitignore @@ -107,3 +107,8 @@ ENV/ #kubernetes stuff kubernetes/jobs/ + +# Local tests +dmriprep/data/tests/local +*.ipynb +run_tests.py \ No newline at end of file diff --git a/dmriprep/cli/parser.py b/dmriprep/cli/parser.py index 7a1b1b45b..1222b30e9 100644 --- a/dmriprep/cli/parser.py +++ b/dmriprep/cli/parser.py @@ -23,20 +23,20 @@ """Parser.""" import os import sys + from .. import config def _build_parser(): """Build parser object.""" + from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser from functools import partial from pathlib import Path - from argparse import ( - ArgumentParser, - ArgumentDefaultsHelpFormatter, - ) + + from niworkflows.utils.spaces import OutputReferencesAction, Reference from packaging.version import Version + from .version import check_latest, is_flagged - from niworkflows.utils.spaces import Reference, OutputReferencesAction def _path_exists(path, parser): """Ensure a given path exists.""" @@ -52,7 +52,7 @@ def _min_one(value, parser): return value def _to_gb(value): - scale = {"G": 1, "T": 10 ** 3, "M": 1e-3, "K": 1e-6, "B": 1e-9} + scale = {"G": 1, "T": 10**3, "M": 1e-3, "K": 1e-6, "B": 1e-9} digits = "".join([c for c in value if c.isdigit()]) units = value[len(digits) :] or "M" return int(digits) * scale[units[0]] @@ -70,7 +70,11 @@ def _bids_filter(value): verstr = f"dMRIPrep v{config.environment.version}" currentv = Version(config.environment.version) is_release = not any( - (currentv.is_devrelease, currentv.is_prerelease, currentv.is_postrelease) + ( + currentv.is_devrelease, + currentv.is_prerelease, + currentv.is_postrelease, + ) ) parser = ArgumentParser( @@ -96,7 +100,8 @@ def _bids_filter(value): "output_dir", action="store", type=Path, - help="the output path for the outcomes of preprocessing and visual " "reports", + help="the output path for the outcomes of preprocessing and visual " + "reports", ) parser.add_argument( "analysis_level", @@ -181,7 +186,9 @@ def _bids_filter(value): help="nipype plugin configuration file", ) g_perfm.add_argument( - "--anat-only", action="store_true", help="run anatomical workflows only" + "--anat-only", + action="store_true", + help="run anatomical workflows only", ) g_perfm.add_argument( "--boilerplate_only", @@ -249,7 +256,9 @@ def _bids_filter(value): ) # ANTs options - g_ants = parser.add_argument_group("Specific options for ANTs registrations") + g_ants = parser.add_argument_group( + "Specific options for ANTs registrations" + ) g_ants.add_argument( "--skull-strip-template", default="OASIS30ANTs", @@ -264,7 +273,9 @@ def _bids_filter(value): ) # SyN-unwarp options - g_syn = parser.add_argument_group("Specific options for SyN distortion correction") + g_syn = parser.add_argument_group( + "Specific options for SyN distortion correction" + ) g_syn.add_argument( "--use-syn-sdc", action="store_true", @@ -287,7 +298,9 @@ def _bids_filter(value): ) # FreeSurfer options - g_fs = parser.add_argument_group("Specific options for FreeSurfer preprocessing") + g_fs = parser.add_argument_group( + "Specific options for FreeSurfer preprocessing" + ) g_fs.add_argument( "--fs-license-file", metavar="PATH", @@ -415,11 +428,14 @@ def _bids_filter(value): def parse_args(args=None, namespace=None): """Parse args and run further checks on the command line.""" import logging + from niworkflows.utils.spaces import Reference, SpatialReferences parser = _build_parser() opts = parser.parse_args(args, namespace) - config.execution.log_level = int(max(25 - 5 * opts.verbose_count, logging.DEBUG)) + config.execution.log_level = int( + max(25 - 5 * opts.verbose_count, logging.DEBUG) + ) config.from_dict(vars(opts)) config.loggers.init() @@ -469,18 +485,33 @@ def parse_args(args=None, namespace=None): output_dir = config.execution.output_dir work_dir = config.execution.work_dir version = config.environment.version + output_layout = config.execution.output_layout if config.execution.fs_subjects_dir is None: config.execution.fs_subjects_dir = output_dir / "freesurfer" - + if config.execution.fs_subjects_dir is None: + if output_layout == "bids": + config.execution.fs_subjects_dir = ( + output_dir / "sourcedata" / "freesurfer" + ) + elif output_layout == "legacy": + config.execution.fs_subjects_dir = output_dir / "freesurfer" + if config.execution.dmriprep_dir is None: + if output_layout == "bids": + config.execution.dmriprep_dir = output_dir + elif output_layout == "legacy": + config.execution.dmriprep_dir = output_dir / "dmriprep" # Wipe out existing work_dir if opts.clean_workdir and work_dir.exists(): from niworkflows.utils.misc import clean_directory - build_log.log("Clearing previous dMRIPrep working directory: %s", work_dir) + build_log.log( + "Clearing previous dMRIPrep working directory: %s", work_dir + ) if not clean_directory(work_dir): build_log.warning( - "Could not clear all contents of working directory: %s", work_dir + "Could not clear all contents of working directory: %s", + work_dir, ) # Ensure input and output folders are not the same @@ -533,4 +564,6 @@ def parse_args(args=None, namespace=None): ) config.execution.participant_label = sorted(participant_label) - config.workflow.skull_strip_template = config.workflow.skull_strip_template[0] + config.workflow.skull_strip_template = ( + config.workflow.skull_strip_template[0] + ) diff --git a/dmriprep/config/__init__.py b/dmriprep/config/__init__.py index 3c5a46822..fd81d8338 100644 --- a/dmriprep/config/__init__.py +++ b/dmriprep/config/__init__.py @@ -1,570 +1 @@ -# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- -# vi: set ft=python sts=4 ts=4 sw=4 et: -r""" -A Python module to maintain unique, run-wide *dMRIPrep* settings. - -This module implements the memory structures to keep a consistent, singleton config. -Settings are passed across processes via filesystem, and a copy of the settings for -each run and subject is left under -``/sub-/log//dmriprep.toml``. -Settings are stored using :abbr:`ToML (Tom's Markup Language)`. -The module has a :py:func:`~dmriprep.config.to_filename` function to allow writing out -the settings to hard disk in *ToML* format, which looks like: - -.. literalinclude:: ../../dmriprep/data/tests/config.toml - :language: toml - :name: dmriprep.toml - :caption: **Example file representation of dMRIPrep settings**. - -This config file is used to pass the settings across processes, -using the :py:func:`~dmriprep.config.load` function. - -Configuration sections ----------------------- -.. autoclass:: environment - :members: -.. autoclass:: execution - :members: -.. autoclass:: workflow - :members: -.. autoclass:: nipype - :members: - -Usage ------ -A config file is used to pass settings and collect information as the execution -graph is built across processes. - -.. code-block:: Python - - from dmriprep import config - config_file = config.execution.work_dir / '.dmriprep.toml' - config.to_filename(config_file) - # Call build_workflow(config_file, retval) in a subprocess - with Manager() as mgr: - from .workflow import build_workflow - retval = mgr.dict() - p = Process(target=build_workflow, args=(str(config_file), retval)) - p.start() - p.join() - config.load(config_file) - # Access configs from any code section as: - value = config.section.setting - -Logging -------- -.. autoclass:: loggers - :members: - -Other responsibilities ----------------------- -The :py:mod:`config` is responsible for other convenience actions. - - * Switching Python's :obj:`multiprocessing` to *forkserver* mode. - * Set up a filter for warnings as early as possible. - * Automated I/O magic operations. Some conversions need to happen in the - store/load processes (e.g., from/to :obj:`~pathlib.Path` \<-\> :obj:`str`, - :py:class:`~bids.layout.BIDSLayout`, etc.) - -""" -from multiprocessing import set_start_method -import warnings - -# cmp is not used by dmriprep, so ignore nipype-generated warnings -warnings.filterwarnings("ignore", "cmp not installed") -warnings.filterwarnings( - "ignore", "This has not been fully tested. Please report any failures." -) -warnings.filterwarnings("ignore", "sklearn.externals.joblib is deprecated in 0.21") -warnings.filterwarnings("ignore", "can't resolve package from __spec__ or __package__") -warnings.filterwarnings("ignore", category=DeprecationWarning) -warnings.filterwarnings("ignore", category=FutureWarning) -warnings.filterwarnings("ignore", category=ResourceWarning) - - -try: - set_start_method("forkserver") -except RuntimeError: - pass # context has been already set -finally: - # Defer all custom import for after initializing the forkserver and - # ignoring the most annoying warnings - import os - import sys - import logging - - from uuid import uuid4 - from pathlib import Path - from time import strftime - from niworkflows.utils.spaces import SpatialReferences as _SRs, Reference as _Ref - from nipype import logging as nlogging, __version__ as _nipype_ver - from templateflow import __version__ as _tf_ver - from .. import __version__ - - -def redirect_warnings(message, category, filename, lineno, file=None, line=None): - """Redirect other warnings.""" - logger = logging.getLogger() - logger.debug("Captured warning (%s): %s", category, message) - - -warnings.showwarning = redirect_warnings - -logging.addLevelName(25, "IMPORTANT") # Add a new level between INFO and WARNING -logging.addLevelName(15, "VERBOSE") # Add a new level between INFO and DEBUG - -DEFAULT_MEMORY_MIN_GB = 0.01 -NONSTANDARD_REFERENCES = ["anat", "T1w", "dwi", "fsnative"] - -_exec_env = os.name -_docker_ver = None -# special variable set in the container -if os.getenv("IS_DOCKER_8395080871"): - _exec_env = "singularity" - _cgroup = Path("/proc/1/cgroup") - if _cgroup.exists() and "docker" in _cgroup.read_text(): - _docker_ver = os.getenv("DOCKER_VERSION_8395080871") - _exec_env = "dmriprep-docker" if _docker_ver else "docker" - del _cgroup - -_fs_license = os.getenv("FS_LICENSE") -if _fs_license is None and os.getenv("FREESURFER_HOME"): - _fs_license = os.path.join(os.getenv("FREESURFER_HOME"), "license.txt") - -_templateflow_home = Path( - os.getenv( - "TEMPLATEFLOW_HOME", os.path.join(os.getenv("HOME"), ".cache", "templateflow") - ) -) - -try: - from psutil import virtual_memory - - _free_mem_at_start = round(virtual_memory().free / 1024 ** 3, 1) -except Exception: - _free_mem_at_start = None - -_oc_limit = "n/a" -_oc_policy = "n/a" -try: - # Memory policy may have a large effect on types of errors experienced - _proc_oc_path = Path("/proc/sys/vm/overcommit_memory") - if _proc_oc_path.exists(): - _oc_policy = {"0": "heuristic", "1": "always", "2": "never"}.get( - _proc_oc_path.read_text().strip(), "unknown" - ) - if _oc_policy != "never": - _proc_oc_kbytes = Path("/proc/sys/vm/overcommit_kbytes") - if _proc_oc_kbytes.exists(): - _oc_limit = _proc_oc_kbytes.read_text().strip() - if ( - _oc_limit in ("0", "n/a") - and Path("/proc/sys/vm/overcommit_ratio").exists() - ): - _oc_limit = "{}%".format( - Path("/proc/sys/vm/overcommit_ratio").read_text().strip() - ) -except Exception: - pass - - -class _Config: - """An abstract class forbidding instantiation.""" - - _paths = tuple() - - def __init__(self): - """Avert instantiation.""" - raise RuntimeError("Configuration type is not instantiable.") - - @classmethod - def load(cls, settings, init=True): - """Store settings from a dictionary.""" - for k, v in settings.items(): - if v is None: - continue - if k in cls._paths: - setattr(cls, k, Path(v).absolute()) - continue - if hasattr(cls, k): - setattr(cls, k, v) - - if init: - try: - cls.init() - except AttributeError: - pass - - @classmethod - def get(cls): - """Return defined settings.""" - out = {} - for k, v in cls.__dict__.items(): - if k.startswith("_") or v is None: - continue - if callable(getattr(cls, k)): - continue - if k in cls._paths: - v = str(v) - if isinstance(v, _SRs): - v = " ".join([str(s) for s in v.references]) or None - if isinstance(v, _Ref): - v = str(v) or None - out[k] = v - return out - - -class environment(_Config): - """ - Read-only options regarding the platform and environment. - - Crawls runtime descriptive settings (e.g., default FreeSurfer license, - execution environment, nipype and *dMRIPrep* versions, etc.). - The ``environment`` section is not loaded in from file, - only written out when settings are exported. - This config section is useful when reporting issues, - and these variables are tracked whenever the user does not - opt-out using the ``--notrack`` argument. - - """ - - cpu_count = os.cpu_count() - """Number of available CPUs.""" - exec_docker_version = _docker_ver - """Version of Docker Engine.""" - exec_env = _exec_env - """A string representing the execution platform.""" - free_mem = _free_mem_at_start - """Free memory at start.""" - overcommit_policy = _oc_policy - """Linux's kernel virtual memory overcommit policy.""" - overcommit_limit = _oc_limit - """Linux's kernel virtual memory overcommit limits.""" - nipype_version = _nipype_ver - """Nipype's current version.""" - templateflow_version = _tf_ver - """The TemplateFlow client version installed.""" - version = __version__ - """*dMRIPrep*'s version.""" - - -class nipype(_Config): - """Nipype settings.""" - - crashfile_format = "txt" - """The file format for crashfiles, either text or pickle.""" - get_linked_libs = False - """Run NiPype's tool to enlist linked libraries for every interface.""" - memory_gb = None - """Estimation in GB of the RAM this workflow can allocate at any given time.""" - nprocs = os.cpu_count() - """Number of processes (compute tasks) that can be run in parallel (multiprocessing only).""" - omp_nthreads = os.cpu_count() - """Number of CPUs a single process can access for multithreaded execution.""" - parameterize_dirs = False - """The node’s output directory will contain full parameterization of any iterable, otherwise - parameterizations over 32 characters will be replaced by their hash.""" - plugin = "MultiProc" - """NiPype's execution plugin.""" - plugin_args = { - "maxtasksperchild": 1, - "raise_insufficient": False, - } - """Settings for NiPype's execution plugin.""" - resource_monitor = False - """Enable resource monitor.""" - stop_on_first_crash = True - """Whether the workflow should stop or continue after the first error.""" - - @classmethod - def get_plugin(cls): - """Format a dictionary for Nipype consumption.""" - nprocs = int(cls.nprocs) - if nprocs == 1: - cls.plugin = "Linear" - return {"plugin": "Linear"} - - out = { - "plugin": cls.plugin, - "plugin_args": cls.plugin_args, - } - if cls.plugin in ("MultiProc", "LegacyMultiProc"): - out["plugin_args"]["n_procs"] = int(cls.nprocs) - if cls.memory_gb: - out["plugin_args"]["memory_gb"] = float(cls.memory_gb) - return out - - @classmethod - def init(cls): - """Set NiPype configurations.""" - from nipype import config as ncfg - - # Configure resource_monitor - if cls.resource_monitor: - ncfg.update_config( - { - "monitoring": { - "enabled": cls.resource_monitor, - "sample_frequency": "0.5", - "summary_append": True, - } - } - ) - ncfg.enable_resource_monitor() - - # Nipype config (logs and execution) - ncfg.update_config( - { - "execution": { - "crashdump_dir": str(execution.log_dir), - "crashfile_format": cls.crashfile_format, - "get_linked_libs": cls.get_linked_libs, - "stop_on_first_crash": cls.stop_on_first_crash, - "parameterize_dirs": cls.parameterize_dirs, - } - } - ) - - -class execution(_Config): - """Configure run-level settings.""" - - anat_derivatives = None - """A path where anatomical derivatives are found to fast-track *sMRIPrep*.""" - bids_dir = None - """An existing path to the dataset, which must be BIDS-compliant.""" - bids_description_hash = None - """Checksum (SHA256) of the ``dataset_description.json`` of the BIDS dataset.""" - bids_filters = None - """A dictionary of BIDS selection filters.""" - boilerplate_only = False - """Only generate a boilerplate.""" - debug = False - """Run in sloppy mode (meaning, suboptimal parameters that minimize run-time).""" - fs_license_file = _fs_license - """An existing file containing a FreeSurfer license.""" - fs_subjects_dir = None - """FreeSurfer's subjects directory.""" - layout = None - """A :py:class:`~bids.layout.BIDSLayout` object, see :py:func:`init`.""" - log_dir = None - """The path to a directory that contains execution logs.""" - log_level = 25 - """Output verbosity.""" - low_mem = None - """Utilize uncompressed NIfTIs and other tricks to minimize memory allocation.""" - md_only_boilerplate = False - """Do not convert boilerplate from MarkDown to LaTex and HTML.""" - notrack = False - """Do not monitor *dMRIPrep* using Google Analytics.""" - output_dir = None - """Folder where derivatives will be stored.""" - output_spaces = None - """List of (non)standard spaces designated (with the ``--output-spaces`` flag of - the command line) as spatial references for outputs.""" - reports_only = False - """Only build the reports, based on the reportlets found in a cached working directory.""" - run_uuid = "%s_%s" % (strftime("%Y%m%d-%H%M%S"), uuid4()) - """Unique identifier of this particular run.""" - participant_label = None - """List of participant identifiers that are to be preprocessed.""" - templateflow_home = _templateflow_home - """The root folder of the TemplateFlow client.""" - work_dir = Path("work").absolute() - """Path to a working directory where intermediate results will be available.""" - write_graph = False - """Write out the computational graph corresponding to the planned preprocessing.""" - - _layout = None - - _paths = ( - "anat_derivatives", - "bids_dir", - "fs_license_file", - "fs_subjects_dir", - "layout", - "log_dir", - "output_dir", - "templateflow_home", - "work_dir", - ) - - @classmethod - def init(cls): - """Create a new BIDS Layout accessible with :attr:`~execution.layout`.""" - if cls._layout is None: - import re - from bids.layout import BIDSLayout - - work_dir = cls.work_dir / "bids.db" - work_dir.mkdir(exist_ok=True, parents=True) - cls._layout = BIDSLayout( - str(cls.bids_dir), - validate=False, - # database_path=str(work_dir), - ignore=( - "code", - "stimuli", - "sourcedata", - "models", - "derivatives", - re.compile(r"^\."), - ), - ) - cls.layout = cls._layout - - -# These variables are not necessary anymore -del _fs_license -del _exec_env -del _nipype_ver -del _templateflow_home -del _tf_ver -del _free_mem_at_start -del _oc_limit -del _oc_policy - - -class workflow(_Config): - """Configure the particular execution graph of this workflow.""" - - anat_only = False - """Execute the anatomical preprocessing only.""" - dwi2t1w_init = "register" - """Whether to use standard coregistration ('register') or to initialize coregistration from the - DWI header ('header').""" - fmap_bspline = None - """Regularize fieldmaps with a field of B-Spline basis.""" - fmap_demean = None - """Remove the mean from fieldmaps.""" - force_syn = None - """Run *fieldmap-less* susceptibility-derived distortions estimation.""" - hires = None - """Run FreeSurfer ``recon-all`` with the ``-hires`` flag.""" - ignore = None - """Ignore particular steps for *dMRIPrep*.""" - longitudinal = False - """Run FreeSurfer ``recon-all`` with the ``-logitudinal`` flag.""" - run_reconall = True - """Run FreeSurfer's surface reconstruction.""" - skull_strip_fixed_seed = False - """Fix a seed for skull-stripping.""" - skull_strip_template = "OASIS30ANTs" - """Change default brain extraction template.""" - spaces = None - """Keeps the :py:class:`~niworkflows.utils.spaces.SpatialReferences` - instance keeping standard and nonstandard spaces.""" - use_syn = None - """Run *fieldmap-less* susceptibility-derived distortions estimation - in the absence of any alternatives.""" - - -class loggers: - """Keep loggers easily accessible (see :py:func:`init`).""" - - _fmt = "%(asctime)s,%(msecs)d %(name)-2s " "%(levelname)-2s:\n\t %(message)s" - _datefmt = "%y%m%d-%H:%M:%S" - - default = logging.getLogger() - """The root logger.""" - cli = logging.getLogger("cli") - """Command-line interface logging.""" - workflow = nlogging.getLogger("nipype.workflow") - """NiPype's workflow logger.""" - interface = nlogging.getLogger("nipype.interface") - """NiPype's interface logger.""" - utils = nlogging.getLogger("nipype.utils") - """NiPype's utils logger.""" - - @classmethod - def init(cls): - """ - Set the log level, initialize all loggers into :py:class:`loggers`. - - * Add new logger levels (25: IMPORTANT, and 15: VERBOSE). - * Add a new sub-logger (``cli``). - * Logger configuration. - - """ - from nipype import config as ncfg - - _handler = logging.StreamHandler(stream=sys.stdout) - _handler.setFormatter(logging.Formatter(fmt=cls._fmt, datefmt=cls._datefmt)) - cls.cli.addHandler(_handler) - cls.default.setLevel(execution.log_level) - cls.cli.setLevel(execution.log_level) - cls.interface.setLevel(execution.log_level) - cls.workflow.setLevel(execution.log_level) - cls.utils.setLevel(execution.log_level) - ncfg.update_config( - {"logging": {"log_directory": str(execution.log_dir), "log_to_file": True}} - ) - - -def from_dict(settings): - """Read settings from a flat dictionary.""" - nipype.load(settings) - execution.load(settings) - workflow.load(settings) - loggers.init() - - -def load(filename): - """Load settings from file.""" - from toml import loads - - filename = Path(filename) - settings = loads(filename.read_text()) - for sectionname, configs in settings.items(): - if sectionname != "environment": - section = getattr(sys.modules[__name__], sectionname) - section.load(configs) - init_spaces() - - -def get(flat=False): - """Get config as a dict.""" - settings = { - "environment": environment.get(), - "execution": execution.get(), - "workflow": workflow.get(), - "nipype": nipype.get(), - } - if not flat: - return settings - - return { - ".".join((section, k)): v - for section, configs in settings.items() - for k, v in configs.items() - } - - -def dumps(flat=False): - """Format config into toml.""" - from toml import dumps - - return dumps(get(flat=flat)) - - -def to_filename(filename): - """Write settings to file.""" - filename = Path(filename) - filename.write_text(dumps()) - - -def init_spaces(checkpoint=True): - """Initialize the :attr:`~workflow.spaces` setting.""" - from niworkflows.utils.spaces import Reference, SpatialReferences - - spaces = execution.output_spaces or SpatialReferences() - if not isinstance(spaces, SpatialReferences): - spaces = SpatialReferences( - [ref for s in spaces.split(" ") for ref in Reference.from_string(s)] - ) - - if checkpoint and not spaces.is_cached(): - spaces.checkpoint() - - # Make the SpatialReferences object available - workflow.spaces = spaces +from .config import * # noqa: F401, F403 diff --git a/dmriprep/config/config.py b/dmriprep/config/config.py new file mode 100644 index 000000000..3e2fdce17 --- /dev/null +++ b/dmriprep/config/config.py @@ -0,0 +1,621 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +r""" +A Python module to maintain unique, run-wide *dMRIPrep* settings. + +This module implements the memory structures to keep a consistent, singleton config. +Settings are passed across processes via filesystem, and a copy of the settings for +each run and subject is left under +``/sub-/log//dmriprep.toml``. +Settings are stored using :abbr:`ToML (Tom's Markup Language)`. +The module has a :py:func:`~dmriprep.config.to_filename` function to allow writing out +the settings to hard disk in *ToML* format, which looks like: + +.. literalinclude:: ../../dmriprep/data/tests/config.toml + :language: toml + :name: dmriprep.toml + :caption: **Example file representation of dMRIPrep settings**. + +This config file is used to pass the settings across processes, +using the :py:func:`~dmriprep.config.load` function. + +Configuration sections +---------------------- +.. autoclass:: environment + :members: +.. autoclass:: execution + :members: +.. autoclass:: workflow + :members: +.. autoclass:: nipype + :members: + +Usage +----- +A config file is used to pass settings and collect information as the execution +graph is built across processes. + +.. code-block:: Python + + from dmriprep import config + config_file = config.execution.work_dir / '.dmriprep.toml' + config.to_filename(config_file) + # Call build_workflow(config_file, retval) in a subprocess + with Manager() as mgr: + from .workflow import build_workflow + retval = mgr.dict() + p = Process(target=build_workflow, args=(str(config_file), retval)) + p.start() + p.join() + config.load(config_file) + # Access configs from any code section as: + value = config.section.setting + +Logging +------- +.. autoclass:: loggers + :members: + +Other responsibilities +---------------------- +The :py:mod:`config` is responsible for other convenience actions. + + * Switching Python's :obj:`multiprocessing` to *forkserver* mode. + * Set up a filter for warnings as early as possible. + * Automated I/O magic operations. Some conversions need to happen in the + store/load processes (e.g., from/to :obj:`~pathlib.Path` \<-\> :obj:`str`, + :py:class:`~bids.layout.BIDSLayout`, etc.) + +""" +import warnings +from multiprocessing import set_start_method + +# cmp is not used by dmriprep, so ignore nipype-generated warnings +warnings.filterwarnings("ignore", "cmp not installed") +warnings.filterwarnings( + "ignore", "This has not been fully tested. Please report any failures." +) +warnings.filterwarnings( + "ignore", "sklearn.externals.joblib is deprecated in 0.21" +) +warnings.filterwarnings( + "ignore", "can't resolve package from __spec__ or __package__" +) +warnings.filterwarnings("ignore", category=DeprecationWarning) +warnings.filterwarnings("ignore", category=FutureWarning) +warnings.filterwarnings("ignore", category=ResourceWarning) + + +try: + set_start_method("forkserver") +except RuntimeError: + pass # context has been already set +finally: + # Defer all custom import for after initializing the forkserver and + # ignoring the most annoying warnings + import logging + import os + import sys + from pathlib import Path + from time import strftime + from uuid import uuid4 + + from nipype import __version__ as _nipype_ver + from nipype import logging as nlogging + from niworkflows.utils.spaces import Reference as _Ref + from niworkflows.utils.spaces import SpatialReferences as _SRs + from templateflow import __version__ as _tf_ver + + from .. import __version__ + + +def redirect_warnings( + message, category, filename, lineno, file=None, line=None +): + """Redirect other warnings.""" + logger = logging.getLogger() + logger.debug("Captured warning (%s): %s", category, message) + + +warnings.showwarning = redirect_warnings + +logging.addLevelName( + 25, "IMPORTANT" +) # Add a new level between INFO and WARNING +logging.addLevelName(15, "VERBOSE") # Add a new level between INFO and DEBUG + +DEFAULT_MEMORY_MIN_GB = 0.01 +NONSTANDARD_REFERENCES = ["anat", "T1w", "dwi", "fsnative"] + +_exec_env = os.name +_docker_ver = None +# special variable set in the container +if os.getenv("IS_DOCKER_8395080871"): + _exec_env = "singularity" + _cgroup = Path("/proc/1/cgroup") + if _cgroup.exists() and "docker" in _cgroup.read_text(): + _docker_ver = os.getenv("DOCKER_VERSION_8395080871") + _exec_env = "dmriprep-docker" if _docker_ver else "docker" + del _cgroup + +_fs_license = os.getenv("FS_LICENSE") +if _fs_license is None and os.getenv("FREESURFER_HOME"): + _fs_license = os.path.join(os.getenv("FREESURFER_HOME"), "license.txt") + +_templateflow_home = Path( + os.getenv( + "TEMPLATEFLOW_HOME", + os.path.join(os.getenv("HOME"), ".cache", "templateflow"), + ) +) + +try: + from psutil import virtual_memory + + _free_mem_at_start = round(virtual_memory().free / 1024**3, 1) +except Exception: + _free_mem_at_start = None + +_oc_limit = "n/a" +_oc_policy = "n/a" +try: + # Memory policy may have a large effect on types of errors experienced + _proc_oc_path = Path("/proc/sys/vm/overcommit_memory") + if _proc_oc_path.exists(): + _oc_policy = {"0": "heuristic", "1": "always", "2": "never"}.get( + _proc_oc_path.read_text().strip(), "unknown" + ) + if _oc_policy != "never": + _proc_oc_kbytes = Path("/proc/sys/vm/overcommit_kbytes") + if _proc_oc_kbytes.exists(): + _oc_limit = _proc_oc_kbytes.read_text().strip() + if ( + _oc_limit in ("0", "n/a") + and Path("/proc/sys/vm/overcommit_ratio").exists() + ): + _oc_limit = "{}%".format( + Path("/proc/sys/vm/overcommit_ratio").read_text().strip() + ) +except Exception: + pass + +DEBUG_MODES = ["fieldmaps"] + + +class _Config: + """An abstract class forbidding instantiation.""" + + _paths = tuple() + + def __init__(self): + """Avert instantiation.""" + raise RuntimeError("Configuration type is not instantiable.") + + @classmethod + def load(cls, settings, init=True): + """Store settings from a dictionary.""" + for k, v in settings.items(): + if v is None: + continue + if k in cls._paths: + setattr(cls, k, Path(v).absolute()) + continue + if hasattr(cls, k): + setattr(cls, k, v) + + if init: + try: + cls.init() + except AttributeError: + pass + + @classmethod + def get(cls): + """Return defined settings.""" + out = {} + for k, v in cls.__dict__.items(): + if k.startswith("_") or v is None: + continue + if callable(getattr(cls, k)): + continue + if k in cls._paths: + v = str(v) + if isinstance(v, _SRs): + v = " ".join([str(s) for s in v.references]) or None + if isinstance(v, _Ref): + v = str(v) or None + out[k] = v + return out + + +class environment(_Config): + """ + Read-only options regarding the platform and environment. + + Crawls runtime descriptive settings (e.g., default FreeSurfer license, + execution environment, nipype and *dMRIPrep* versions, etc.). + The ``environment`` section is not loaded in from file, + only written out when settings are exported. + This config section is useful when reporting issues, + and these variables are tracked whenever the user does not + opt-out using the ``--notrack`` argument. + + """ + + cpu_count = os.cpu_count() + """Number of available CPUs.""" + exec_docker_version = _docker_ver + """Version of Docker Engine.""" + exec_env = _exec_env + """A string representing the execution platform.""" + free_mem = _free_mem_at_start + """Free memory at start.""" + overcommit_policy = _oc_policy + """Linux's kernel virtual memory overcommit policy.""" + overcommit_limit = _oc_limit + """Linux's kernel virtual memory overcommit limits.""" + nipype_version = _nipype_ver + """Nipype's current version.""" + templateflow_version = _tf_ver + """The TemplateFlow client version installed.""" + version = __version__ + """*dMRIPrep*'s version.""" + + +class nipype(_Config): + """Nipype settings.""" + + crashfile_format = "txt" + """The file format for crashfiles, either text or pickle.""" + get_linked_libs = False + """Run NiPype's tool to enlist linked libraries for every interface.""" + memory_gb = None + """Estimation in GB of the RAM this workflow can allocate at any given time.""" + nprocs = os.cpu_count() + """Number of processes (compute tasks) that can be run in parallel (multiprocessing only).""" + omp_nthreads = os.cpu_count() + """Number of CPUs a single process can access for multithreaded execution.""" + parameterize_dirs = False + """The node’s output directory will contain full parameterization of any iterable, otherwise + parameterizations over 32 characters will be replaced by their hash.""" + plugin = "MultiProc" + """NiPype's execution plugin.""" + plugin_args = { + "maxtasksperchild": 1, + "raise_insufficient": False, + } + """Settings for NiPype's execution plugin.""" + resource_monitor = False + """Enable resource monitor.""" + stop_on_first_crash = True + """Whether the workflow should stop or continue after the first error.""" + + @classmethod + def get_plugin(cls): + """Format a dictionary for Nipype consumption.""" + nprocs = int(cls.nprocs) + if nprocs == 1: + cls.plugin = "Linear" + return {"plugin": "Linear"} + + out = { + "plugin": cls.plugin, + "plugin_args": cls.plugin_args, + } + if cls.plugin in ("MultiProc", "LegacyMultiProc"): + out["plugin_args"]["n_procs"] = int(cls.nprocs) + if cls.memory_gb: + out["plugin_args"]["memory_gb"] = float(cls.memory_gb) + return out + + @classmethod + def init(cls): + """Set NiPype configurations.""" + from nipype import config as ncfg + + # Configure resource_monitor + if cls.resource_monitor: + ncfg.update_config( + { + "monitoring": { + "enabled": cls.resource_monitor, + "sample_frequency": "0.5", + "summary_append": True, + } + } + ) + ncfg.enable_resource_monitor() + + # Nipype config (logs and execution) + ncfg.update_config( + { + "execution": { + "crashdump_dir": str(execution.log_dir), + "crashfile_format": cls.crashfile_format, + "get_linked_libs": cls.get_linked_libs, + "stop_on_first_crash": cls.stop_on_first_crash, + "parameterize_dirs": cls.parameterize_dirs, + } + } + ) + + +class execution(_Config): + """Configure run-level settings.""" + + anat_derivatives = None + """A path where anatomical derivatives are found to fast-track *sMRIPrep*.""" + bids_dir = None + """An existing path to the dataset, which must be BIDS-compliant.""" + bids_description_hash = None + """Checksum (SHA256) of the ``dataset_description.json`` of the BIDS dataset.""" + bids_filters = None + """A dictionary of BIDS selection filters.""" + boilerplate_only = False + """Only generate a boilerplate.""" + debug = False + """Run in sloppy mode (meaning, suboptimal parameters that minimize run-time).""" + dmriprep_dir = None + """Root of dMRIPrep BIDS Derivatives dataset. Depends on output_layout.""" + fs_license_file = _fs_license + """An existing file containing a FreeSurfer license.""" + fs_subjects_dir = None + """FreeSurfer's subjects directory.""" + layout = None + """A :py:class:`~bids.layout.BIDSLayout` object, see :py:func:`init`.""" + log_dir = None + """The path to a directory that contains execution logs.""" + log_level = 25 + """Output verbosity.""" + low_mem = None + """Utilize uncompressed NIfTIs and other tricks to minimize memory allocation.""" + md_only_boilerplate = False + """Do not convert boilerplate from MarkDown to LaTex and HTML.""" + notrack = False + """Do not monitor *dMRIPrep* using Google Analytics.""" + output_dir = None + """Folder where derivatives will be stored.""" + output_layout = "bids" + """Layout of derivatives within output_dir.""" + output_spaces = None + """List of (non)standard spaces designated (with the ``--output-spaces`` flag of + the command line) as spatial references for outputs.""" + reports_only = False + """Only build the reports, based on the reportlets found in a cached working directory.""" + run_uuid = "%s_%s" % (strftime("%Y%m%d-%H%M%S"), uuid4()) + """Unique identifier of this particular run.""" + participant_label = None + """List of participant identifiers that are to be preprocessed.""" + templateflow_home = _templateflow_home + """The root folder of the TemplateFlow client.""" + work_dir = Path("work").absolute() + """Path to a working directory where intermediate results will be available.""" + write_graph = False + """Write out the computational graph corresponding to the planned preprocessing.""" + + _layout = None + + _paths = ( + "anat_derivatives", + "bids_dir", + "fs_license_file", + "fs_subjects_dir", + "layout", + "log_dir", + "output_dir", + "templateflow_home", + "work_dir", + ) + + @classmethod + def init(cls): + """Create a new BIDS Layout accessible with :attr:`~execution.layout`.""" + if cls.fs_license_file and Path(cls.fs_license_file).is_file(): + os.environ["FS_LICENSE"] = str(cls.fs_license_file) + if cls._layout is None: + import re + + from bids.layout import BIDSLayout + + work_dir = cls.work_dir / "bids.db" + work_dir.mkdir(exist_ok=True, parents=True) + cls._layout = BIDSLayout( + str(cls.bids_dir), + validate=False, + # database_path=str(work_dir), + ignore=( + "code", + "stimuli", + "sourcedata", + "models", + "derivatives", + re.compile(r"^\."), + ), + ) + cls.layout = cls._layout + if cls.bids_filters: + from bids.layout import Query + + # unserialize pybids Query enum values + for acq, filters in cls.bids_filters.items(): + cls.bids_filters[acq] = { + k: getattr(Query, v[7:-4]) + if not isinstance(v, Query) and "Query" in v + else v + for k, v in filters.items() + } + + if "all" in cls.debug: + cls.debug = list(DEBUG_MODES) + + +# These variables are not necessary anymore +del _fs_license +del _exec_env +del _nipype_ver +del _templateflow_home +del _tf_ver +del _free_mem_at_start +del _oc_limit +del _oc_policy + + +class workflow(_Config): + """Configure the particular execution graph of this workflow.""" + + anat_only = False + """Execute the anatomical preprocessing only.""" + dwi2t1w_init = "register" + """Whether to use standard coregistration ('register') or to initialize coregistration from the + DWI header ('header').""" + fmap_bspline = None + """Regularize fieldmaps with a field of B-Spline basis.""" + fmap_demean = None + """Remove the mean from fieldmaps.""" + force_syn = None + """Run *fieldmap-less* susceptibility-derived distortions estimation.""" + hires = None + """Run FreeSurfer ``recon-all`` with the ``-hires`` flag.""" + ignore = None + """Ignore particular steps for *dMRIPrep*.""" + longitudinal = False + """Run FreeSurfer ``recon-all`` with the ``-logitudinal`` flag.""" + run_reconall = True + """Run FreeSurfer's surface reconstruction.""" + skull_strip_fixed_seed = False + """Fix a seed for skull-stripping.""" + skull_strip_template = "OASIS30ANTs" + """Change default brain extraction template.""" + spaces = None + """Keeps the :py:class:`~niworkflows.utils.spaces.SpatialReferences` + instance keeping standard and nonstandard spaces.""" + use_syn = None + """Run *fieldmap-less* susceptibility-derived distortions estimation + in the absence of any alternatives.""" + use_syn_sdc = None + """Run *fieldmap-less* susceptibility-derived distortions estimation + in the absence of any alternatives.""" + + +class loggers: + """Keep loggers easily accessible (see :py:func:`init`).""" + + _fmt = ( + "%(asctime)s,%(msecs)d %(name)-2s " "%(levelname)-2s:\n\t %(message)s" + ) + _datefmt = "%y%m%d-%H:%M:%S" + + default = logging.getLogger() + """The root logger.""" + cli = logging.getLogger("cli") + """Command-line interface logging.""" + workflow = nlogging.getLogger("nipype.workflow") + """NiPype's workflow logger.""" + interface = nlogging.getLogger("nipype.interface") + """NiPype's interface logger.""" + utils = nlogging.getLogger("nipype.utils") + """NiPype's utils logger.""" + + @classmethod + def init(cls): + """ + Set the log level, initialize all loggers into :py:class:`loggers`. + + * Add new logger levels (25: IMPORTANT, and 15: VERBOSE). + * Add a new sub-logger (``cli``). + * Logger configuration. + + """ + from nipype import config as ncfg + + _handler = logging.StreamHandler(stream=sys.stdout) + _handler.setFormatter( + logging.Formatter(fmt=cls._fmt, datefmt=cls._datefmt) + ) + cls.cli.addHandler(_handler) + cls.default.setLevel(execution.log_level) + cls.cli.setLevel(execution.log_level) + cls.interface.setLevel(execution.log_level) + cls.workflow.setLevel(execution.log_level) + cls.utils.setLevel(execution.log_level) + ncfg.update_config( + { + "logging": { + "log_directory": str(execution.log_dir), + "log_to_file": True, + } + } + ) + + +def from_dict(settings): + """Read settings from a flat dictionary.""" + nipype.load(settings) + execution.load(settings) + workflow.load(settings) + loggers.init() + + +def load(filename): + """Load settings from file.""" + from toml import loads + + filename = Path(filename) + settings = loads(filename.read_text()) + for sectionname, configs in settings.items(): + if sectionname != "environment": + section = getattr(sys.modules[__name__], sectionname) + section.load(configs) + init_spaces() + + +def get(flat=False): + """Get config as a dict.""" + settings = { + "environment": environment.get(), + "execution": execution.get(), + "workflow": workflow.get(), + "nipype": nipype.get(), + } + if not flat: + return settings + + return { + ".".join((section, k)): v + for section, configs in settings.items() + for k, v in configs.items() + } + + +def dumps(flat=False): + """Format config into toml.""" + from toml import dumps + + return dumps(get(flat=flat)) + + +def to_filename(filename): + """Write settings to file.""" + filename = Path(filename) + filename.write_text(dumps()) + + +def init_spaces(checkpoint=True): + """Initialize the :attr:`~workflow.spaces` setting.""" + from niworkflows.utils.spaces import Reference, SpatialReferences + + spaces = execution.output_spaces or SpatialReferences() + if not isinstance(spaces, SpatialReferences): + spaces = SpatialReferences( + [ + ref + for s in spaces.split(" ") + for ref in Reference.from_string(s) + ] + ) + + if checkpoint and not spaces.is_cached(): + spaces.checkpoint() + + # Make the SpatialReferences object available + workflow.spaces = spaces diff --git a/dmriprep/data/tests/config.toml b/dmriprep/data/tests/config.toml index 6baaf7340..4ecc58d15 100644 --- a/dmriprep/data/tests/config.toml +++ b/dmriprep/data/tests/config.toml @@ -6,13 +6,13 @@ overcommit_policy = "heuristic" overcommit_limit = "50%" nipype_version = "1.4.2" templateflow_version = "0.4.2" -version = "0.2.2" +version = "0.4.0" [execution] -bids_dir = "THP002/" +bids_dir = "THP" bids_description_hash = "b07ee615a588acf67967d70f5b2402ffbe477b99a316433fa7fdaff4f9dad5c1" boilerplate_only = false -debug = false +debug = "false" fs_license_file = "/opt/freesurfer/license.txt" fs_subjects_dir = "opt/freesurfer/subjects" layout = "BIDS Layout: .../home/oesteban/Data/THP002 | Subjects: 1 | Sessions: 2 | Runs: 2" @@ -22,6 +22,7 @@ low_mem = false md_only_boilerplate = false notrack = true output_dir = "/tmp" +dmriprep_dir = "/tmp/dmriprep" output_spaces = "run" reports_only = false run_uuid = "20200311-121754_aa0b4fa9-6b60-4a11-af7d-02deb54a823f" @@ -32,6 +33,7 @@ write_graph = false [workflow] anat_only = false +bold2t1w_dof = 6 fmap_bspline = false force_syn = false hires = true @@ -40,11 +42,13 @@ longitudinal = false run_reconall = true skull_strip_fixed_seed = false skull_strip_template = "OASIS30ANTs" -spaces = "run" +t2s_coreg = false +use_syn_sdc = false [nipype] crashfile_format = "txt" get_linked_libs = false +memory_gb = 32 nprocs = 8 omp_nthreads = 8 plugin = "MultiProc" diff --git a/dmriprep/interfaces/__init__.py b/dmriprep/interfaces/__init__.py index 7a47cb868..b2de99be3 100644 --- a/dmriprep/interfaces/__init__.py +++ b/dmriprep/interfaces/__init__.py @@ -2,11 +2,11 @@ # vi: set ft=python sts=4 ts=4 sw=4 et: """Custom Nipype interfaces for dMRIPrep.""" from nipype.interfaces.base import OutputMultiObject, SimpleInterface +from niworkflows.interfaces.bids import LOGGER as _LOGGER +from niworkflows.interfaces.bids import DerivativesDataSink as _DDS from niworkflows.interfaces.bids import ( - DerivativesDataSink as _DDS, - _BIDSDataGrabberOutputSpec, _BIDSDataGrabberInputSpec, - LOGGER as _LOGGER, + _BIDSDataGrabberOutputSpec, ) @@ -39,7 +39,9 @@ def _run_interface(self, runtime): if not bids_dict["t1w"]: raise FileNotFoundError( - "No T1w images found for subject sub-{}".format(self.inputs.subject_id) + "No T1w images found for subject sub-{}".format( + self.inputs.subject_id + ) ) if self._require_dwis and not bids_dict["dwi"]: @@ -52,7 +54,9 @@ def _run_interface(self, runtime): for imtype in ["dwi", "t2w", "flair", "fmap", "roi"]: if not bids_dict[imtype]: _LOGGER.warning( - 'No "%s" images found for sub-%s', imtype, self.inputs.subject_id + 'No "%s" images found for sub-%s', + imtype, + self.inputs.subject_id, ) return runtime diff --git a/dmriprep/interfaces/bids.py b/dmriprep/interfaces/bids.py new file mode 100644 index 000000000..30715d61f --- /dev/null +++ b/dmriprep/interfaces/bids.py @@ -0,0 +1,88 @@ +from nipype import logging +from nipype.interfaces.base import ( + BaseInterfaceInputSpec, + OutputMultiObject, + SimpleInterface, + Str, + TraitedSpec, + traits, +) + +LOGGER = logging.getLogger("nipype.interface") + + +class _BIDSDataGrabberInputSpec(BaseInterfaceInputSpec): + subject_data = traits.Dict(Str, traits.Any) + subject_id = Str() + + +class _BIDSDataGrabberOutputSpec(TraitedSpec): + out_dict = traits.Dict(desc="output data structure") + fmap = OutputMultiObject(desc="output fieldmaps") + dwi = OutputMultiObject(desc="output DWI images") + t1w = OutputMultiObject(desc="output T1w images") + roi = OutputMultiObject(desc="output ROI images") + t2w = OutputMultiObject(desc="output T2w images") + flair = OutputMultiObject(desc="output FLAIR images") + + +class BIDSDataGrabber(SimpleInterface): + """ + Collect files from a BIDS directory structure. + + .. testsetup:: + + >>> data_dir_canary() + + >>> bids_src = BIDSDataGrabber(anat_only=False) + >>> bids_src.inputs.subject_data = bids_collect_data( + ... str(datadir / 'ds114'), '01', bids_validate=False)[0] + >>> bids_src.inputs.subject_id = '01' + >>> res = bids_src.run() + >>> res.outputs.t1w # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE + ['.../ds114/sub-01/ses-retest/anat/sub-01_ses-retest_T1w.nii.gz', + '.../ds114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz'] + + """ + + input_spec = _BIDSDataGrabberInputSpec + output_spec = _BIDSDataGrabberOutputSpec + _require_dwis = True + + def __init__(self, *args, **kwargs): + anat_only = kwargs.pop("anat_only") + anat_derivatives = kwargs.pop("anat_derivatives", None) + super(BIDSDataGrabber, self).__init__(*args, **kwargs) + if anat_only is not None: + self._require_dwis = not anat_only + self._require_t1w = anat_derivatives is None + + def _run_interface(self, runtime): + bids_dict = self.inputs.subject_data + + self._results["out_dict"] = bids_dict + self._results.update(bids_dict) + + if self._require_t1w and not bids_dict["t1w"]: + raise FileNotFoundError( + "No T1w images found for subject sub-{}".format( + self.inputs.subject_id + ) + ) + + if self._require_dwis and not bids_dict["dwi"]: + raise FileNotFoundError( + "No functional images found for subject sub-{}".format( + self.inputs.subject_id + ) + ) + + for imtype in ["dwi", "t2w", "flair", "fmap", "roi"]: + if not bids_dict[imtype]: + LOGGER.info( + 'No "%s" images found for sub-%s', + imtype, + self.inputs.subject_id, + ) + + return runtime diff --git a/dmriprep/interfaces/reports.py b/dmriprep/interfaces/reports.py index a6616526c..31becb82d 100644 --- a/dmriprep/interfaces/reports.py +++ b/dmriprep/interfaces/reports.py @@ -22,22 +22,25 @@ # """Interfaces to generate reportlets.""" +import logging import os +import re import time +from nipype.interfaces import freesurfer as fs from nipype.interfaces.base import ( - traits, - TraitedSpec, BaseInterfaceInputSpec, - File, Directory, + File, InputMultiObject, + SimpleInterface, Str, + TraitedSpec, isdefined, - SimpleInterface, + traits, ) -from nipype.interfaces import freesurfer as fs +LOGGER = logging.getLogger("nipype.interface") SUBJECT_TEMPLATE = """\ \t
    @@ -50,6 +53,22 @@ \t
""" +DWI_TEMPLATE = """\ +\t\t
+\t\tSummary +\t\t
    +\t\t\t
  • Original orientation: {ornt}
  • +\t\t\t
  • Phase-encoding (PE) direction: {pedir}
  • +\t\t\t
  • Susceptibility distortion correction: {sdc}
  • +\t\t\t
  • Registration: {registration}
  • +\t\t
+\t\t
+\t\t
+\t\t\tConfounds collected
+\t\t\t

{confounds}.

+\t\t
+""" + ABOUT_TEMPLATE = """\t
    \t\t
  • dMRIPrep version: {version}
  • \t\t
  • dMRIPrep command: {command}
  • @@ -139,6 +158,79 @@ def _generate_segment(self): ) +class DwiSummaryInputSpec(BaseInterfaceInputSpec): + distortion_correction = traits.Str( + desc="Susceptibility distortion correction method", mandatory=True + ) + pe_direction = traits.Enum( + None, + "i", + "i-", + "j", + "j-", + "k", + "k-", + mandatory=True, + desc="Phase-encoding direction detected", + ) + registration = traits.Enum( + "FSL", + "FreeSurfer", + mandatory=True, + desc="Diffusion/anatomical registration method", + ) + fallback = traits.Bool(desc="Boundary-based registration rejected") + registration_dof = traits.Enum( + 6, 9, 12, desc="Registration degrees of freedom", mandatory=True + ) + registration_init = traits.Enum( + "register", + "header", + mandatory=True, + desc='Whether to initialize registration with the "header"' + ' or by centering the volumes ("register")', + ) + confounds_file = File(exists=True, desc="Confounds file") + orientation = traits.Str( + mandatory=True, desc="Orientation of the voxel axes" + ) + + +class FunctionalSummary(SummaryInterface): + input_spec = DwiSummaryInputSpec + + def _generate_segment(self): + dof = self.inputs.registration_dof + reg = { + "FSL": [ + "FSL flirt with boundary-based registration" + " (BBR) metric - %d dof" % dof, + "FSL flirt rigid registration - 6 dof", + ], + "FreeSurfer": [ + "FreeSurfer bbregister " + "(boundary-based registration, BBR) - %d dof" % dof, + "FreeSurfer mri_coreg - %d dof" % dof, + ], + }[self.inputs.registration][self.inputs.fallback] + + pedir = get_world_pedir( + self.inputs.orientation, self.inputs.pe_direction + ) + + if isdefined(self.inputs.confounds_file): + with open(self.inputs.confounds_file) as cfh: + conflist = cfh.readline().strip("\n").strip() + + return DWI_TEMPLATE.format( + pedir=pedir, + sdc=self.inputs.distortion_correction, + registration=reg, + confounds=re.sub(r"[\t ]+", ", ", conflist), + ornt=self.inputs.orientation, + ) + + class AboutSummaryInputSpec(BaseInterfaceInputSpec): version = Str(desc="dMRIPrep version") command = Str(desc="dMRIPrep command") @@ -154,3 +246,27 @@ def _generate_segment(self): command=self.inputs.command, date=time.strftime("%Y-%m-%d %H:%M:%S %z"), ) + + +def get_world_pedir(ornt, pe_direction): + """Return world direction of phase encoding""" + axes = ( + ("Right", "Left"), + ("Anterior", "Posterior"), + ("Superior", "Inferior"), + ) + ax_idcs = {"i": 0, "j": 1, "k": 2} + + if pe_direction is not None: + axcode = ornt[ax_idcs[pe_direction[0]]] + inv = pe_direction[1:] == "-" + + for ax in axes: + for flip in (ax, ax[::-1]): + if flip[not inv].startswith(axcode): + return "-".join(flip) + LOGGER.warning( + "Cannot determine world direction of phase encoding. " + f"Orientation: {ornt}; PE dir: {pe_direction}" + ) + return "Could not be determined - assuming Anterior-Posterior" diff --git a/dmriprep/reports/__init__.py b/dmriprep/reports/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/dmriprep/reports/conftest.py b/dmriprep/reports/conftest.py new file mode 100644 index 000000000..366f68bac --- /dev/null +++ b/dmriprep/reports/conftest.py @@ -0,0 +1,43 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +# +# Copyright 2021 The NiPreps Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# We support and encourage derived works from this project, please read +# about our expectations at +# +# https://www.nipreps.org/community/licensing/ +# +"""py.test configuration""" +import os +import tempfile +from pathlib import Path + +import pytest + + +@pytest.fixture(autouse=True) +def populate_doctest_namespace(doctest_namespace): + doctest_namespace["os"] = os + doctest_namespace["Path"] = Path + tmpdir = tempfile.TemporaryDirectory() + + doctest_namespace["tmpdir"] = tmpdir.name + + cwd = os.getcwd() + os.chdir(tmpdir.name) + yield + os.chdir(cwd) + tmpdir.cleanup() diff --git a/dmriprep/reports/core.py b/dmriprep/reports/core.py new file mode 100644 index 000000000..f5f75770c --- /dev/null +++ b/dmriprep/reports/core.py @@ -0,0 +1,139 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +# +# Copyright 2021 The NiPreps Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# We support and encourage derived works from this project, please read +# about our expectations at +# +# https://www.nipreps.org/community/licensing/ +# +from pathlib import Path + +from niworkflows.reports.core import Report as _Report + +# This patch is intended to permit fMRIPrep 20.2.0 LTS to use the YODA-style +# derivatives directory. Ideally, we will remove this in 20.3.x and use an +# updated niworkflows. + + +class Report(_Report): + def _load_config(self, config): + from yaml import safe_load as load + + settings = load(config.read_text()) + self.packagename = self.packagename or settings.get("package", None) + + # Removed from here: Appending self.packagename to self.root and self.out_dir + # In this version, pass reportlets_dir and out_dir with fmriprep in the path. + + if self.subject_id is not None: + self.root = self.root / "sub-{}".format(self.subject_id) + + if "template_path" in settings: + self.template_path = config.parent / settings["template_path"] + + self.index(settings["sections"]) + + +# +# The following are the interface used directly by fMRIPrep +# + + +def run_reports( + out_dir, + subject_label, + run_uuid, + config=None, + reportlets_dir=None, + packagename=None, +): + """ + Run the reports. + + .. testsetup:: + + >>> cwd = os.getcwd() + >>> os.chdir(tmpdir) + + >>> from pkg_resources import resource_filename + >>> from shutil import copytree + >>> test_data_path = resource_filename('fmriprep', 'data/tests/work') + >>> testdir = Path(tmpdir) + >>> data_dir = copytree(test_data_path, str(testdir / 'work')) + >>> (testdir / 'fmriprep').mkdir(parents=True, exist_ok=True) + + .. doctest:: + + >>> run_reports(testdir / 'out', '01', 'madeoutuuid', packagename='fmriprep', + ... reportlets_dir=testdir / 'work' / 'reportlets' / 'fmriprep') + 0 + + .. testcleanup:: + + >>> os.chdir(cwd) + + """ + return Report( + out_dir, + run_uuid, + config=config, + subject_id=subject_label, + packagename=packagename, + reportlets_dir=reportlets_dir, + ).generate_report() + + +def generate_reports( + subject_list, + output_dir, + run_uuid, + config=None, + work_dir=None, + packagename=None, +): + """Execute run_reports on a list of subjects.""" + reportlets_dir = None + if work_dir is not None: + reportlets_dir = Path(work_dir) / "reportlets" + report_errors = [ + run_reports( + output_dir, + subject_label, + run_uuid, + config=config, + packagename=packagename, + reportlets_dir=reportlets_dir, + ) + for subject_label in subject_list + ] + + errno = sum(report_errors) + if errno: + import logging + + logger = logging.getLogger("cli") + error_list = ", ".join( + "%s (%d)" % (subid, err) + for subid, err in zip(subject_list, report_errors) + if err + ) + logger.error( + "Preprocessing did not finish successfully. Errors occurred while processing " + "data from participants: %s. Check the HTML reports for details.", + error_list, + ) + return errno diff --git a/dmriprep/tests/__init__.py b/dmriprep/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/dmriprep/tests/test_config.py b/dmriprep/tests/test_config.py new file mode 100644 index 000000000..5d54e554e --- /dev/null +++ b/dmriprep/tests/test_config.py @@ -0,0 +1,121 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +# +# Copyright 2021 The NiPreps Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# We support and encourage derived works from this project, please read +# about our expectations at +# +# https://www.nipreps.org/community/licensing/ +# +"""Check the configuration module and file.""" +import os +from pathlib import Path +from pkg_resources import resource_filename as pkgrf +from unittest.mock import patch + +import pytest +from toml import loads +from niworkflows.utils.spaces import format_reference + +from .. import config + + +def _reset_config(): + """ + Forcibly reload the configuration module to restore defaults. + + .. caution:: + `importlib.reload` creates new sets of objects, but will not remove + previous references to those objects.""" + import importlib + importlib.reload(config) + + +def test_reset_config(): + execution = config.execution + setattr(execution, 'bids_dir', 'TESTING') + assert config.execution.bids_dir == 'TESTING' + _reset_config() + assert config.execution.bids_dir is None + # Even though the config module was reset, + # previous references to config classes + # have not been touched. + assert execution.bids_dir == 'TESTING' + + +def test_config_spaces(): + """Check that all necessary spaces are recorded in the config.""" + filename = Path(pkgrf('fmriprep', 'data/tests/config.toml')) + settings = loads(filename.read_text()) + for sectionname, configs in settings.items(): + if sectionname != 'environment': + section = getattr(config, sectionname) + section.load(configs, init=False) + config.nipype.init() + config.loggers.init() + config.init_spaces() + + spaces = config.workflow.spaces + assert "MNI152NLin6Asym:res-2" not in [ + str(s) for s in spaces.get_standard(full_spec=True)] + + assert "MNI152NLin6Asym_res-2" not in [ + format_reference((s.fullname, s.spec)) + for s in spaces.references if s.standard and s.dim == 3 + ] + + config.workflow.use_aroma = True + config.init_spaces() + spaces = config.workflow.spaces + + assert "MNI152NLin6Asym:res-2" in [ + str(s) for s in spaces.get_standard(full_spec=True)] + + assert "MNI152NLin6Asym_res-2" in [ + format_reference((s.fullname, s.spec)) + for s in spaces.references if s.standard and s.dim == 3 + ] + + config.execution.output_spaces = None + config.workflow.use_aroma = False + config.init_spaces() + spaces = config.workflow.spaces + + assert [str(s) for s in spaces.get_standard(full_spec=True)] == [] + + assert [ + format_reference((s.fullname, s.spec)) + for s in spaces.references if s.standard and s.dim == 3 + ] == ['MNI152NLin2009cAsym'] + _reset_config() + + +@pytest.mark.parametrize("master_seed,ants_seed,numpy_seed", [ + (1, 17612, 8272), (100, 19094, 60232) +]) +def test_prng_seed(master_seed, ants_seed, numpy_seed): + """Ensure seeds are properly tracked""" + seeds = config.seeds + with patch.dict(os.environ, {}): + seeds.load({'_random_seed': master_seed}, init=True) + assert getattr(seeds, 'master') == master_seed + assert seeds.ants == ants_seed + assert seeds.numpy == numpy_seed + assert os.getenv("ANTS_RANDOM_SEED") == str(ants_seed) + + _reset_config() + for seed in ('_random_seed', 'master', 'ants', 'numpy'): + assert getattr(config.seeds, seed) is None diff --git a/dmriprep/tests/test_fsl6.py b/dmriprep/tests/test_fsl6.py new file mode 100644 index 000000000..987452fd1 --- /dev/null +++ b/dmriprep/tests/test_fsl6.py @@ -0,0 +1,36 @@ +from packaging.version import LegacyVersion +from pathlib import Path +import shutil + +from nipype.interfaces import fsl +import pytest +import templateflow.api as tf + + +fslversion = fsl.Info.version() +TEMPLATE = tf.get("MNI152NLin2009cAsym", resolution=2, desc=None, suffix="T1w") + + +@pytest.mark.skipif(fslversion is None, reason="fsl required") +@pytest.mark.skipif(LegacyVersion(fslversion) < LegacyVersion("6.0.0"), reason="FSL6 test") +@pytest.mark.parametrize("path_parent,filename", [ + (".", "brain.nii.gz"), + ( + "pneumonoultramicroscopicsilicovolcanoconiosis/floccinaucinihilipilification", + "supercalifragilisticexpialidocious.nii.gz", + ), + ( + "pneumonoultramicroscopicsilicovolcanoconiosis/floccinaucinihilipilification/" + "antidisestablishmentarianism/pseudopseudohypoparathyroidism/sesquipedalian", + "brain.nii.gz" + ) +]) +def test_fsl6_long_filenames(tmp_path, path_parent, filename): + test_dir = tmp_path / path_parent + test_dir.mkdir(parents=True, exist_ok=True) + in_file = test_dir / filename + out_file = test_dir / "output.nii.gz" + shutil.copy(TEMPLATE, in_file) + + bet = fsl.BET(in_file=in_file, out_file=out_file).run() + assert Path(bet.outputs.out_file).exists() diff --git a/dmriprep/utils/bids.py b/dmriprep/utils/bids.py index 36057d1bc..5bbdf14a4 100644 --- a/dmriprep/utils/bids.py +++ b/dmriprep/utils/bids.py @@ -21,15 +21,24 @@ # https://www.nipreps.org/community/licensing/ # """Utilities to handle BIDS inputs.""" +import json import os import sys -import json from pathlib import Path + from bids import BIDSLayout -def collect_data(bids_dir, participant_label, bids_validate=True): - """Replacement for niworkflows' version.""" +def collect_data( + bids_dir, + participant_label, + bids_validate=True, + bids_filters=None, +): + """ + Replacement for niworkflows' version. + Uses pybids to retrieve the input data for a given participant. + """ if isinstance(bids_dir, BIDSLayout): layout = bids_dir else: @@ -43,6 +52,9 @@ def collect_data(bids_dir, participant_label, bids_validate=True): "t1w": {"datatype": "anat", "suffix": "T1w"}, "roi": {"datatype": "anat", "suffix": "roi"}, } + bids_filters = bids_filters or {} + for acq, entities in bids_filters.items(): + queries[acq].update(entities) subj_data = { dtype: sorted( @@ -50,7 +62,7 @@ def collect_data(bids_dir, participant_label, bids_validate=True): return_type="file", subject=participant_label, extension=["nii", "nii.gz"], - **query + **query, ) ) for dtype, query in queries.items() @@ -60,7 +72,7 @@ def collect_data(bids_dir, participant_label, bids_validate=True): def write_derivative_description(bids_dir, deriv_dir): - from ..__about__ import __version__, __url__, DOWNLOAD_URL + from ..__about__ import DOWNLOAD_URL, __url__, __version__ bids_dir = Path(bids_dir) deriv_dir = Path(deriv_dir) @@ -85,7 +97,9 @@ def write_derivative_description(bids_dir, deriv_dir): singularity_md5 = _get_shub_version(singularity_url) if singularity_md5 and singularity_md5 is not NotImplemented: - desc["SingularityContainerMD5"] = _get_shub_version(singularity_url) + desc["SingularityContainerMD5"] = _get_shub_version( + singularity_url + ) # Keys deriving from source dataset orig_desc = {} @@ -107,8 +121,8 @@ def write_derivative_description(bids_dir, deriv_dir): def validate_input_dir(exec_env, bids_dir, participant_label): # Ignore issues and warnings that should not influence dMRIPrep - import tempfile import subprocess + import tempfile validator_config_dict = { "ignore": [ @@ -183,14 +197,21 @@ def validate_input_dir(exec_env, bids_dir, participant_label): ignored_subs = all_subs.difference(selected_subs) if ignored_subs: for sub in ignored_subs: - validator_config_dict["ignoredFiles"].append("/sub-%s/**" % sub) + validator_config_dict["ignoredFiles"].append( + "/sub-%s/**" % sub + ) with tempfile.NamedTemporaryFile("w+") as temp: temp.write(json.dumps(validator_config_dict)) temp.flush() try: - subprocess.check_call(["bids-validator", bids_dir, "-c", temp.name]) + subprocess.check_call( + ["bids-validator", bids_dir, "-c", temp.name] + ) except FileNotFoundError: - print("bids-validator does not appear to be installed", file=sys.stderr) + print( + "bids-validator does not appear to be installed", + file=sys.stderr, + ) def _get_shub_version(singularity_url): diff --git a/dmriprep/utils/sentry.py b/dmriprep/utils/sentry.py new file mode 100644 index 000000000..d0e253593 --- /dev/null +++ b/dmriprep/utils/sentry.py @@ -0,0 +1,194 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +# +# Copyright 2021 The NiPreps Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# We support and encourage derived works from this project, please read +# about our expectations at +# +# https://www.nipreps.org/community/licensing/ +# +"""Stripped out routines for Sentry.""" +import os +import re + +from nibabel.optpkg import optional_package +from niworkflows.utils.misc import read_crashfile + +from .. import config + +sentry_sdk = optional_package("sentry_sdk")[0] + +CHUNK_SIZE = 16384 +# Group common events with pre specified fingerprints +KNOWN_ERRORS = { + "permission-denied": ["PermissionError: [Errno 13] Permission denied"], + "memory-error": [ + "MemoryError", + "Cannot allocate memory", + "Return code: 134", + ], + "reconall-already-running": [ + "ERROR: it appears that recon-all is already running" + ], + "no-disk-space": [ + "[Errno 28] No space left on device", + "[Errno 122] Disk quota exceeded", + ], + "segfault": [ + "Segmentation Fault", + "Segfault", + "Return code: 139", + ], + "potential-race-condition": [ + "[Errno 39] Directory not empty", + "_unfinished.json", + ], + "keyboard-interrupt": [ + "KeyboardInterrupt", + ], +} + + +def sentry_setup(): + """Set-up sentry.""" + release = config.environment.version or "dev" + environment = ( + "dev" + if ( + os.getenv("DMRIPREP_DEV", "").lower + in ("1", "on", "yes", "y", "true") + or ("+" in release) + ) + else "prod" + ) + + sentry_sdk.init( + "https://d5a16b0c38d84d1584dfc93b9fb1ade6@sentry.io/1137693", + release=release, + environment=environment, + before_send=before_send, + ) + with sentry_sdk.configure_scope() as scope: + for k, v in config.get(flat=True).items(): + scope.set_tag(k, v) + + +def process_crashfile(crashfile): + """Parse the contents of a crashfile and submit sentry messages.""" + crash_info = read_crashfile(str(crashfile)) + with sentry_sdk.push_scope() as scope: + scope.level = "fatal" + + # Extract node name + node_name = crash_info.pop("node").split(".")[-1] + scope.set_tag("node_name", node_name) + + # Massage the traceback, extract the gist + traceback = crash_info.pop("traceback") + # last line is probably most informative summary + gist = traceback.splitlines()[-1] + exception_text_start = 1 + for line in traceback.splitlines()[1:]: + if not line[0].isspace(): + break + exception_text_start += 1 + + exception_text = "\n".join( + traceback.splitlines()[exception_text_start:] + ) + + # Extract inputs, if present + inputs = crash_info.pop("inputs", None) + if inputs: + scope.set_extra("inputs", dict(inputs)) + + # Extract any other possible metadata in the crash file + for k, v in crash_info.items(): + strv = list(_chunks(str(v))) + if len(strv) == 1: + scope.set_extra(k, strv[0]) + else: + for i, chunk in enumerate(strv): + scope.set_extra("%s_%02d" % (k, i), chunk) + + fingerprint = "" + issue_title = "{}: {}".format(node_name, gist) + for new_fingerprint, error_snippets in KNOWN_ERRORS.items(): + for error_snippet in error_snippets: + if error_snippet in traceback: + fingerprint = new_fingerprint + issue_title = new_fingerprint + break + if fingerprint: + break + + message = issue_title + "\n\n" + message += exception_text[-(8192 - len(message)) :] + if fingerprint: + sentry_sdk.add_breadcrumb(message=fingerprint, level="fatal") + else: + # remove file paths + fingerprint = re.sub(r"(/[^/ ]*)+/?", "", message) + # remove words containing numbers + fingerprint = re.sub( + r"([a-zA-Z]*[0-9]+[a-zA-Z]*)+", "", fingerprint + ) + # adding the return code if it exists + for line in message.splitlines(): + if line.startswith("Return code"): + fingerprint += line + break + + scope.fingerprint = [fingerprint] + sentry_sdk.capture_message(message, "fatal") + + +def before_send(event, hints): + """Filter log messages about crashed nodes.""" + if "logentry" in event and "message" in event["logentry"]: + msg = event["logentry"]["message"] + if msg.startswith("could not run node:"): + return None + if msg.startswith("Saving crash info to "): + return None + if re.match("Node .+ failed to run on host .+", msg): + return None + + if "breadcrumbs" in event and isinstance(event["breadcrumbs"], list): + fingerprints_to_propagate = [ + "no-disk-space", + "memory-error", + "permission-denied", + "keyboard-interrupt", + ] + for bc in event["breadcrumbs"]: + msg = bc.get("message", "empty-msg") + if msg in fingerprints_to_propagate: + event["fingerprint"] = [msg] + break + + return event + + +def _chunks(string, length=CHUNK_SIZE): + """ + Split a string into smaller chunks. + + >>> list(_chunks('some longer string.', length=3)) + ['som', 'e l', 'ong', 'er ', 'str', 'ing', '.'] + + """ + return (string[i : i + length] for i in range(0, len(string), length)) diff --git a/dmriprep/utils/testing.py b/dmriprep/utils/testing.py new file mode 100644 index 000000000..609cabe67 --- /dev/null +++ b/dmriprep/utils/testing.py @@ -0,0 +1,129 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +# +# Copyright 2021 The NiPreps Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# We support and encourage derived works from this project, please read +# about our expectations at +# +# https://www.nipreps.org/community/licensing/ +# +""" +Class and utilities for testing the workflows module +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +""" + +import unittest +import logging +from networkx.exception import NetworkXUnfeasible + +from nipype.pipeline import engine as pe +from nipype.interfaces.base import isdefined +from nipype.interfaces import utility as niu + +logging.disable(logging.INFO) # <- do we really want to do this? + + +class TestWorkflow(unittest.TestCase): + ''' Subclass for test within the workflow module. + invoke tests with ``python -m unittest discover test''' + + def assertIsAlmostExpectedWorkflow(self, expected_name, expected_interfaces, + expected_inputs, expected_outputs, + actual): + ''' somewhat hacky way to confirm workflows are as expected, but with low confidence ''' + self.assertIsInstance(actual, pe.Workflow) + self.assertEqual(expected_name, actual.name) + + # assert it has the same nodes + actual_nodes = [actual.get_node(name) + for name in actual.list_node_names()] + actual_interfaces = [node.interface.__class__.__name__ + for node in actual_nodes] + + # assert lists equal + self.assertIsSubsetOfList(expected_interfaces, actual_interfaces) + self.assertIsSubsetOfList(actual_interfaces, expected_interfaces) + + # assert expected inputs, outputs exist + actual_inputs, actual_outputs = self.get_inputs_outputs(actual_nodes) + + self.assertIsSubsetOfList(expected_outputs, actual_outputs) + self.assertIsSubsetOfList(expected_inputs, actual_inputs) + + def assertIsSubsetOfList(self, expecteds, actuals): + for expected in expecteds: + self.assertIn(expected, actuals) + + def get_inputs_outputs(self, nodes): + def get_io_names(pre, ios): + return [pre + str(io[0]) for io in ios] + + actual_inputs = [] + actual_outputs = [] + node_tuples = [(node.name, node.inputs.items(), node.outputs.items()) + for node in nodes] + for name, inputs, outputs in node_tuples: + pre = str(name) + "." + actual_inputs += get_io_names(pre, inputs) + + pre = pre if pre[0:-1] != 'inputnode' else "" + actual_outputs += get_io_names(pre, outputs) + + return actual_inputs, actual_outputs + + def assert_circular(self, workflow, circular_connections): + ''' check key paths in workflow by specifying some connections that should induce + circular paths, which trips a NetworkX error. + circular_connections is a list of tuples: + [('from_node_name', 'to_node_name', ('from_node.output_field','to_node.input_field'))] + ''' + + for from_node, to_node, fields in circular_connections: + from_node = workflow.get_node(from_node) + to_node = workflow.get_node(to_node) + workflow.connect([(from_node, to_node, fields)]) + + self.assertRaises(NetworkXUnfeasible, workflow.write_graph) + + workflow.disconnect([(from_node, to_node, fields)]) + + def assert_inputs_set(self, workflow, additional_inputs=None): + """Check that all mandatory inputs of nodes in the workflow (at the first level) are + already set. Additionally, check that inputs in additional_inputs are set. An input is + "set" if it is + a) defined explicitly (e.g., in the Interface declaration) + OR + b) connected to another node's output (e.g., using the workflow.connect method) + additional_inputs is a dict: + {'node_name': ['mandatory', 'input', 'fields']} + """ + + additional_inputs = additional_inputs or {} + dummy_node = pe.Node(niu.IdentityInterface(fields=['dummy']), name='DummyNode') + node_names = [name for name in workflow.list_node_names() if name.count('.') == 0] + for node_name in set(node_names + list(additional_inputs.keys())): + node = workflow.get_node(node_name) + mandatory_inputs = list(node.inputs.traits(mandatory=True).keys()) + other_inputs = additional_inputs[node_name] if node_name in additional_inputs else [] + for field in set(mandatory_inputs + other_inputs): + if isdefined(getattr(node.inputs, field)): + pass + else: # not explicitly defined + # maybe it is connected to an output + with self.assertRaises(Exception): + # throws an error if the input is already connected + workflow.connect([(dummy_node, node, [('dummy', field)])]) diff --git a/dmriprep/workflows/base.py b/dmriprep/workflows/base.py index 01413b12f..7b6835eff 100755 --- a/dmriprep/workflows/base.py +++ b/dmriprep/workflows/base.py @@ -20,29 +20,33 @@ # # https://www.nipreps.org/community/licensing/ # -"""dMRIPrep base processing workflows.""" -from .. import config -import sys +""" +dMRIPrep base processing workflows +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autofunction:: init_dmriprep_wf +.. autofunction:: init_single_subject_wf + +""" import os +import sys from copy import deepcopy +from pathlib import Path -from nipype.pipeline import engine as pe from nipype.interfaces import utility as niu - -from niworkflows.engine.workflows import LiterateWorkflow as Workflow -from niworkflows.interfaces.bids import BIDSInfo, BIDSFreeSurferDir +from nipype.pipeline import engine as pe from niworkflows.utils.misc import fix_multi_T1w_source_name from niworkflows.utils.spaces import Reference -from smriprep.workflows.anatomical import init_anat_preproc_wf +from packaging.version import Version -from ..interfaces import DerivativesDataSink, BIDSDataGrabber -from ..interfaces.reports import SubjectSummary, AboutSummary +from .. import config +from ..interfaces.reports import AboutSummary, SubjectSummary from ..utils.bids import collect_data def init_dmriprep_wf(): """ - Create the base workflow. + Build *dMRIPrep*'s pipeline. This workflow organizes the execution of *dMRIPrep*, with a sub-workflow for each subject. If FreeSurfer's recon-all is to be run, a FreeSurfer derivatives folder is @@ -59,7 +63,12 @@ def init_dmriprep_wf(): wf = init_dmriprep_wf() """ - dmriprep_wf = Workflow(name="dmriprep_wf") + from niworkflows.engine.workflows import LiterateWorkflow as Workflow + from niworkflows.interfaces.bids import BIDSFreeSurferDir + + ver = Version(config.environment.version) + + dmriprep_wf = Workflow(name=f"dmriprep_{ver.major}_{ver.minor}_wf") dmriprep_wf.base_dir = config.execution.work_dir freesurfer = config.workflow.run_reconall @@ -74,14 +83,15 @@ def init_dmriprep_wf(): run_without_submitting=True, ) if config.execution.fs_subjects_dir is not None: - fsdir.inputs.subjects_dir = str(config.execution.fs_subjects_dir.absolute()) + fsdir.inputs.subjects_dir = str( + config.execution.fs_subjects_dir.absolute() + ) for subject_id in config.execution.participant_label: single_subject_wf = init_single_subject_wf(subject_id) single_subject_wf.config["execution"]["crashdump_dir"] = str( - config.execution.output_dir - / "dmriprep" + Path(config.execution.dmriprep_dir) / f"sub-{subject_id}" / "log" / config.execution.run_uuid @@ -91,15 +101,17 @@ def init_dmriprep_wf(): node.config = deepcopy(single_subject_wf.config) if freesurfer: dmriprep_wf.connect( - fsdir, "subjects_dir", single_subject_wf, "fsinputnode.subjects_dir" + fsdir, + "subjects_dir", + single_subject_wf, + "fsinputnode.subjects_dir", ) else: dmriprep_wf.add_nodes([single_subject_wf]) # Dump a copy of the config file into the log directory log_dir = ( - config.execution.output_dir - / "dmriprep" + Path(config.execution.dmriprep_dir) / f"sub-{subject_id}" / "log" / config.execution.run_uuid @@ -145,10 +157,20 @@ def init_single_subject_wf(subject_id): FreeSurfer's ``$SUBJECTS_DIR`` """ + from dmriprep.interfaces.bids import BIDSDataGrabber + from niworkflows.engine.workflows import LiterateWorkflow as Workflow + from niworkflows.interfaces.bids import BIDSInfo + from smriprep.workflows.anatomical import init_anat_preproc_wf + + from ..interfaces import DerivativesDataSink from ..utils.misc import sub_prefix as _prefix name = f"single_subject_{subject_id}_wf" - subject_data = collect_data(config.execution.layout, subject_id)[0] + subject_data = collect_data( + config.execution.layout, + subject_id, + bids_filters=config.execution.bids_filters, + )[0] if "flair" in config.workflow.ignore: subject_data["flair"] = [] @@ -156,18 +178,36 @@ def init_single_subject_wf(subject_id): subject_data["t2w"] = [] anat_only = config.workflow.anat_only - + anat_derivatives = config.execution.anat_derivatives + spaces = config.workflow.spaces # Make sure we always go through these two checks if not anat_only and not subject_data["dwi"]: raise Exception( f"No DWI data found for participant {subject_id}. " "All workflows require DWI images." ) + if anat_derivatives: + from smriprep.utils.bids import collect_derivatives - if not subject_data["t1w"]: + std_spaces = spaces.get_spaces(nonstandard=False, dim=(3,)) + anat_derivatives = collect_derivatives( + anat_derivatives.absolute(), + subject_id, + std_spaces, + config.workflow.run_reconall, + ) + if anat_derivatives is None: + config.loggers.workflow.warning( + f"""\ +Attempted to access pre-existing anatomical derivatives at \ +<{config.execution.anat_derivatives}>, however not all expectations of fMRIPrep \ +were met (for participant <{subject_id}>, spaces <{', '.join(std_spaces)}>, \ +reconall <{config.workflow.run_reconall}>).""" + ) + if not anat_derivatives and not subject_data["t1w"]: raise Exception( - f"No T1w images found for participant {subject_id}. " - "All workflows require T1w images." + "No T1w images found for participant {}. " + "All workflows require T1w images.".format(subject_id) ) workflow = Workflow(name=name) @@ -198,15 +238,20 @@ def init_single_subject_wf(subject_id): ### References """ - spaces = config.workflow.spaces - output_dir = config.execution.output_dir + dmriprep_dir = str(config.execution.dmriprep_dir) fsinputnode = pe.Node( niu.IdentityInterface(fields=["subjects_dir"]), name="fsinputnode" ) bidssrc = pe.Node( - BIDSDataGrabber(subject_data=subject_data, anat_only=anat_only), name="bidssrc" + BIDSDataGrabber( + subject_data=subject_data, + anat_only=anat_only, + anat_derivatives=anat_derivatives, + subject_id=subject_id, + ), + name="bidssrc", ) bids_info = pe.Node( @@ -224,14 +269,18 @@ def init_single_subject_wf(subject_id): ) about = pe.Node( - AboutSummary(version=config.environment.version, command=" ".join(sys.argv)), + AboutSummary( + version=config.environment.version, command=" ".join(sys.argv) + ), name="about", run_without_submitting=True, ) ds_report_summary = pe.Node( DerivativesDataSink( - base_directory=str(output_dir), desc="summary", datatype="figures" + base_directory=dmriprep_dir, + desc="summary", + datatype="figures", ), name="ds_report_summary", run_without_submitting=True, @@ -239,24 +288,12 @@ def init_single_subject_wf(subject_id): ds_report_about = pe.Node( DerivativesDataSink( - base_directory=str(output_dir), desc="about", datatype="figures" + base_directory=dmriprep_dir, desc="about", datatype="figures" ), name="ds_report_about", run_without_submitting=True, ) - anat_derivatives = config.execution.anat_derivatives - if anat_derivatives: - from smriprep.utils.bids import collect_derivatives - - std_spaces = spaces.get_spaces(nonstandard=False, dim=(3,)) - anat_derivatives = collect_derivatives( - anat_derivatives.absolute(), - subject_id, - std_spaces, - config.workflow.run_reconall, - ) - # Preprocessing of T1w (includes registration to MNI) anat_preproc_wf = init_anat_preproc_wf( bids_root=str(config.execution.bids_dir), @@ -266,7 +303,7 @@ def init_single_subject_wf(subject_id): hires=config.workflow.hires, longitudinal=config.workflow.longitudinal, omp_nthreads=config.nipype.omp_nthreads, - output_dir=str(output_dir), + output_dir=dmriprep_dir, skull_strip_fixed_seed=config.workflow.skull_strip_fixed_seed, skull_strip_mode="force", skull_strip_template=Reference.from_string( @@ -275,56 +312,98 @@ def init_single_subject_wf(subject_id): spaces=spaces, t1w=subject_data["t1w"], ) - anat_preproc_wf.__desc__ = f"\n\n{anat_preproc_wf.__desc__}" - - # fmt:off - workflow.connect([ - (fsinputnode, anat_preproc_wf, [("subjects_dir", "inputnode.subjects_dir")]), - (bidssrc, bids_info, [(("t1w", fix_multi_T1w_source_name), "in_file")]), - (fsinputnode, summary, [("subjects_dir", "subjects_dir")]), - (bidssrc, summary, [("t1w", "t1w"), ("t2w", "t2w"), ("dwi", "dwi")]), - (bids_info, summary, [("subject", "subject_id")]), - (bids_info, anat_preproc_wf, [(("subject", _prefix), "inputnode.subject_id")]), - (bidssrc, anat_preproc_wf, [ - ("t1w", "inputnode.t1w"), - ("t2w", "inputnode.t2w"), - ("roi", "inputnode.roi"), - ("flair", "inputnode.flair"), - ]), - (bidssrc, ds_report_summary, [ - (("t1w", fix_multi_T1w_source_name), "source_file"), - ]), - (summary, ds_report_summary, [("out_report", "in_file")]), - (bidssrc, ds_report_about, [ - (("t1w", fix_multi_T1w_source_name), "source_file") - ]), - (about, ds_report_about, [("out_report", "in_file")]), - ]) # fmt:off + workflow.connect( + [ + ( + fsinputnode, + anat_preproc_wf, + [("subjects_dir", "inputnode.subjects_dir")], + ), + (fsinputnode, summary, [("subjects_dir", "subjects_dir")]), + (bidssrc, summary, [("dwi", "dwi")]), + (bids_info, summary, [("subject", "subject_id")]), + ( + bids_info, + anat_preproc_wf, + [(("subject", _prefix), "inputnode.subject_id")], + ), + ( + bidssrc, + anat_preproc_wf, + [ + ("t1w", "inputnode.t1w"), + ("t2w", "inputnode.t2w"), + ("roi", "inputnode.roi"), + ("flair", "inputnode.flair"), + ], + ), + (summary, ds_report_summary, [("out_report", "in_file")]), + (about, ds_report_about, [("out_report", "in_file")]), + ] + ) + + if not anat_derivatives: + workflow.connect( + [ + ( + bidssrc, + bids_info, + [(("t1w", fix_multi_T1w_source_name), "in_file")], + ), + (bidssrc, summary, [("t1w", "t1w"), ("t2w", "t2w")]), + ( + bidssrc, + ds_report_summary, + [(("t1w", fix_multi_T1w_source_name), "source_file")], + ), + ( + bidssrc, + ds_report_about, + [(("t1w", fix_multi_T1w_source_name), "source_file")], + ), + ] + ) + else: + workflow.connect( + [ + ( + anat_preproc_wf, + summary, + [("outputnode.t1w_preproc", "t1w")], + ), + ( + anat_preproc_wf, + ds_report_summary, + [("outputnode.t1w_preproc", "source_file")], + ), + ( + anat_preproc_wf, + ds_report_about, + [("outputnode.t1w_preproc", "source_file")], + ), + ] + ) + # Overwrite ``out_path_base`` of smriprep's DataSinks for node in workflow.list_node_names(): if node.split(".")[-1].startswith("ds_"): - workflow.get_node(node).interface.out_path_base = "dmriprep" + workflow.get_node(node).interface.out_path_base = "" if anat_only: return workflow - + # fmt:on from .dwi.base import init_dwi_preproc_wf # Append the dMRI section to the existing anatomical excerpt # That way we do not need to stream down the number of DWI datasets - anat_preproc_wf.__postdesc__ = f"""\ -{anat_preproc_wf.__postdesc__ or ''} - + dwi_pre_desc = f""" Diffusion data preprocessing - : For each of the {len(subject_data["dwi"])} DWI scans found per subject - (across all sessions), the gradient table was vetted and converted into the *RASb* +(across all sessions), the gradient table was vetted and converted into the *RASb* format (i.e., given in RAS+ scanner coordinates, normalized b-vectors and scaled b-values), and a *b=0* average for reference to the subsequent steps of preprocessing was calculated. """ - - # SDC Step 0: Determine whether fieldmaps can/should be estimated fmap_estimators = None if "fieldmap" not in config.workflow.ignore: from sdcflows import fieldmaps as fm @@ -348,16 +427,18 @@ def init_single_subject_wf(subject_id): raise RuntimeError("""\ Argument '--use-sdc-syn' requires having 'MNI152NLin2009cAsym' as one output standard space. \ Please add the 'MNI152NLin2009cAsym' keyword to the '--output-spaces' argument""") - # Nuts and bolts: initialize individual run's pipeline dwi_preproc_list = [] for dwi_file in subject_data["dwi"]: dwi_preproc_wf = init_dwi_preproc_wf( dwi_file, - has_fieldmap=bool(fmap_estimators), + has_fieldmap=True, ) + if dwi_preproc_wf is None: + continue + dwi_preproc_wf.__desc__ = dwi_pre_desc + (dwi_preproc_wf.__desc__ or "") - # fmt: off + # fmt: off workflow.connect([ (anat_preproc_wf, dwi_preproc_wf, [ ("outputnode.t1w_preproc", "inputnode.t1w_preproc"), @@ -376,9 +457,9 @@ def init_single_subject_wf(subject_id): ]), (bids_info, dwi_preproc_wf, [("subject", "inputnode.subject_id")]), ]) - # fmt: on + # # fmt: on - # Keep a handle to each workflow + # # Keep a handle to each workflow dwi_preproc_list.append(dwi_preproc_wf) if not fmap_estimators: @@ -393,7 +474,7 @@ def init_single_subject_wf(subject_id): debug=config.execution.debug is True, estimators=fmap_estimators, omp_nthreads=config.nipype.omp_nthreads, - output_dir=str(output_dir), + output_dir=dmriprep_dir, subject=subject_id, ) fmap_wf.__desc__ = f""" @@ -423,15 +504,22 @@ def init_single_subject_wf(subject_id): # Step 3: Manually connect PEPOLAR for estimator in fmap_estimators: - config.loggers.workflow.info(f"""\ + config.loggers.workflow.info( + f"""\ Setting-up fieldmap "{estimator.bids_id}" ({estimator.method}) with \ -<{', '.join(s.path.name for s in estimator.sources)}>""") - if estimator.method in (fm.EstimatorType.MAPPED, fm.EstimatorType.PHASEDIFF): +<{', '.join(s.path.name for s in estimator.sources)}>""" + ) + if estimator.method in ( + fm.EstimatorType.MAPPED, + fm.EstimatorType.PHASEDIFF, + ): continue suffices = set(s.suffix for s in estimator.sources) - if estimator.method == fm.EstimatorType.PEPOLAR and sorted(suffices) == ["epi"]: + if estimator.method == fm.EstimatorType.PEPOLAR and sorted( + suffices + ) == ["epi"]: getattr(fmap_wf.inputs, f"in_{estimator.bids_id}").in_data = [ str(s.path) for s in estimator.sources ] @@ -447,11 +535,11 @@ def init_single_subject_wf(subject_id): if estimator.method == fm.EstimatorType.ANAT: from sdcflows.workflows.fit.syn import init_syn_preprocessing_wf + from ..interfaces.vectors import CheckGradientTable sources = [ - str(s.path) for s in estimator.sources - if s.suffix in ("dwi",) + str(s.path) for s in estimator.sources if s.suffix in ("dwi",) ] layout = config.execution.layout syn_preprocessing_wf = init_syn_preprocessing_wf( @@ -465,11 +553,18 @@ def init_single_subject_wf(subject_id): syn_preprocessing_wf.inputs.inputnode.in_meta = [ layout.get_metadata(s) for s in sources ] - b0_masks = pe.MapNode(CheckGradientTable(), name=f"b0_masks_{estimator.bids_id}", - iterfield=("dwi_file", "in_bvec", "in_bval")) + b0_masks = pe.MapNode( + CheckGradientTable(), + name=f"b0_masks_{estimator.bids_id}", + iterfield=("dwi_file", "in_bvec", "in_bval"), + ) b0_masks.inputs.dwi_file = sources - b0_masks.inputs.in_bvec = [str(layout.get_bvec(s)) for s in sources] - b0_masks.inputs.in_bval = [str(layout.get_bval(s)) for s in sources] + b0_masks.inputs.in_bvec = [ + str(layout.get_bvec(s)) for s in sources + ] + b0_masks.inputs.in_bval = [ + str(layout.get_bval(s)) for s in sources + ] # fmt:off workflow.connect([ diff --git a/dmriprep/workflows/dwi/__init__.py b/dmriprep/workflows/dwi/__init__.py index e69de29bb..dfabed50a 100644 --- a/dmriprep/workflows/dwi/__init__.py +++ b/dmriprep/workflows/dwi/__init__.py @@ -0,0 +1 @@ +from .base import init_dwi_preproc_wf # noqa F401 diff --git a/dmriprep/workflows/dwi/base.py b/dmriprep/workflows/dwi/base.py index 2fff1bdda..6be0abbe1 100755 --- a/dmriprep/workflows/dwi/base.py +++ b/dmriprep/workflows/dwi/base.py @@ -21,12 +21,13 @@ # https://www.nipreps.org/community/licensing/ # """Orchestrating the dMRI-preprocessing workflow.""" -from ... import config from pathlib import Path -from nipype.pipeline import engine as pe + from nipype.interfaces import utility as niu +from nipype.pipeline import engine as pe +from niworkflows.utils.connections import listify -from niworkflows.engine.workflows import LiterateWorkflow as Workflow +from ... import config from ...interfaces import DerivativesDataSink @@ -90,6 +91,7 @@ def init_dwi_preproc_wf(dwi_file, has_fieldmap=False): * :py:func:`~dmriprep.workflows.dwi.outputs.init_reportlets_wf` """ + from niworkflows.engine.workflows import LiterateWorkflow as Workflow from niworkflows.interfaces.reportlets.registration import ( SimpleBeforeAfterRPT as SimpleBeforeAfter, ) @@ -97,18 +99,24 @@ def init_dwi_preproc_wf(dwi_file, has_fieldmap=False): from sdcflows.workflows.ancillary import init_brainextraction_wf from ...interfaces.vectors import CheckGradientTable - from .outputs import init_dwi_derivatives_wf, init_reportlets_wf from .eddy import init_eddy_wf + from .outputs import init_dwi_derivatives_wf, init_reportlets_wf + # Have some options handy + # omp_nthreads = config.nipype.omp_nthreads + # freesurfer = config.workflow.run_reconall + # spaces = config.workflow.spaces + # dmriprep_dir = str(config.execution.dmriprep_dir) + # # Extract BIDS entities and metadata from BOLD file(s) + # entities = extract_entities(dwi_file) layout = config.execution.layout - dwi_file = Path(dwi_file) config.loggers.workflow.debug( f"Creating DWI preprocessing workflow for <{dwi_file.name}>" ) - if has_fieldmap: import re + from sdcflows.fieldmaps import get_identifier dwi_rel = re.sub( @@ -160,7 +168,9 @@ def init_dwi_preproc_wf(dwi_file, has_fieldmap=False): inputnode.inputs.in_bval = str(layout.get_bval(dwi_file)) outputnode = pe.Node( - niu.IdentityInterface(fields=["dwi_reference", "dwi_mask", "gradients_rasb"]), + niu.IdentityInterface( + fields=["dwi_reference", "dwi_mask", "gradients_rasb"] + ), name="outputnode", ) @@ -204,8 +214,9 @@ def init_dwi_preproc_wf(dwi_file, has_fieldmap=False): # fmt: on if config.workflow.run_reconall: - from niworkflows.interfaces.nibabel import ApplyMask from niworkflows.anat.coregistration import init_bbreg_wf + from niworkflows.interfaces.nibabel import ApplyMask + from ...utils.misc import sub_prefix as _prefix # Mask the T1w @@ -315,11 +326,11 @@ def _bold_reg_suffix(fallback): return workflow from niworkflows.interfaces.utility import KeySelect - from sdcflows.workflows.apply.registration import init_coeff2epi_wf from sdcflows.workflows.apply.correction import init_unwarp_wf + from sdcflows.workflows.apply.registration import init_coeff2epi_wf coeff2epi_wf = init_coeff2epi_wf( - debug=config.execution.debug, + debug="fieldmaps" in config.execution.debug, omp_nthreads=config.nipype.omp_nthreads, write_coeff=True, ) @@ -399,3 +410,40 @@ def _get_wf_name(filename): def _aslist(value): return [value] + + +def extract_entities(file_list): + """ + Return a dictionary of common entities given a list of files. + + Examples + -------- + >>> extract_entities("sub-01/anat/sub-01_T1w.nii.gz") + {'subject': '01', 'suffix': 'T1w', 'datatype': 'anat', 'extension': '.nii.gz'} + >>> extract_entities(["sub-01/anat/sub-01_T1w.nii.gz"] * 2) + {'subject': '01', 'suffix': 'T1w', 'datatype': 'anat', 'extension': '.nii.gz'} + >>> extract_entities(["sub-01/anat/sub-01_run-1_T1w.nii.gz", + ... "sub-01/anat/sub-01_run-2_T1w.nii.gz"]) + {'subject': '01', 'run': [1, 2], 'suffix': 'T1w', 'datatype': 'anat', + 'extension': '.nii.gz'} + + """ + from collections import defaultdict + + from bids.layout import parse_file_entities + + entities = defaultdict(list) + for e, v in [ + ev_pair + for f in listify(file_list) + for ev_pair in parse_file_entities(f).items() + ]: + entities[e].append(v) + + def _unique(inlist): + inlist = sorted(set(inlist)) + if len(inlist) == 1: + return inlist[0] + return inlist + + return {k: _unique(v) for k, v in entities.items()} diff --git a/docs/conf.py b/docs/conf.py index 8f1bcc992..7bcebe693 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -11,13 +11,15 @@ # documentation root, use os.path.abspath to make it absolute, like shown here. # import os -import sys import re -from packaging.version import Version +import sys -from dmriprep import __version__, __copyright__, __packagename__ +from dmriprep import __copyright__, __packagename__, __version__ +from packaging.version import Version -sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "sphinxext"))) +sys.path.append( + os.path.abspath(os.path.join(os.path.dirname(__file__), "sphinxext")) +) from github_link import make_linkcode_resolve # -- Project information -----------------------------------------------------