Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keeping track of fMRIPrep. #192

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
started updating the main part 'base'
GalKepler committed May 1, 2022
commit bc3c58698108ca17653fbff647babd42e9acbc42
67 changes: 50 additions & 17 deletions dmriprep/cli/parser.py
Original file line number Diff line number Diff line change
@@ -23,20 +23,20 @@
"""Parser."""
import os
import sys

from .. import config


def _build_parser():
"""Build parser object."""
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
from functools import partial
from pathlib import Path
from argparse import (
ArgumentParser,
ArgumentDefaultsHelpFormatter,
)

from niworkflows.utils.spaces import OutputReferencesAction, Reference
from packaging.version import Version

from .version import check_latest, is_flagged
from niworkflows.utils.spaces import Reference, OutputReferencesAction

def _path_exists(path, parser):
"""Ensure a given path exists."""
@@ -52,7 +52,7 @@ def _min_one(value, parser):
return value

def _to_gb(value):
scale = {"G": 1, "T": 10 ** 3, "M": 1e-3, "K": 1e-6, "B": 1e-9}
scale = {"G": 1, "T": 10**3, "M": 1e-3, "K": 1e-6, "B": 1e-9}
digits = "".join([c for c in value if c.isdigit()])
units = value[len(digits) :] or "M"
return int(digits) * scale[units[0]]
@@ -70,7 +70,11 @@ def _bids_filter(value):
verstr = f"dMRIPrep v{config.environment.version}"
currentv = Version(config.environment.version)
is_release = not any(
(currentv.is_devrelease, currentv.is_prerelease, currentv.is_postrelease)
(
currentv.is_devrelease,
currentv.is_prerelease,
currentv.is_postrelease,
)
)

parser = ArgumentParser(
@@ -96,7 +100,8 @@ def _bids_filter(value):
"output_dir",
action="store",
type=Path,
help="the output path for the outcomes of preprocessing and visual " "reports",
help="the output path for the outcomes of preprocessing and visual "
"reports",
)
parser.add_argument(
"analysis_level",
@@ -181,7 +186,9 @@ def _bids_filter(value):
help="nipype plugin configuration file",
)
g_perfm.add_argument(
"--anat-only", action="store_true", help="run anatomical workflows only"
"--anat-only",
action="store_true",
help="run anatomical workflows only",
)
g_perfm.add_argument(
"--boilerplate_only",
@@ -249,7 +256,9 @@ def _bids_filter(value):
)

# ANTs options
g_ants = parser.add_argument_group("Specific options for ANTs registrations")
g_ants = parser.add_argument_group(
"Specific options for ANTs registrations"
)
g_ants.add_argument(
"--skull-strip-template",
default="OASIS30ANTs",
@@ -264,7 +273,9 @@ def _bids_filter(value):
)

# SyN-unwarp options
g_syn = parser.add_argument_group("Specific options for SyN distortion correction")
g_syn = parser.add_argument_group(
"Specific options for SyN distortion correction"
)
g_syn.add_argument(
"--use-syn-sdc",
action="store_true",
@@ -287,7 +298,9 @@ def _bids_filter(value):
)

# FreeSurfer options
g_fs = parser.add_argument_group("Specific options for FreeSurfer preprocessing")
g_fs = parser.add_argument_group(
"Specific options for FreeSurfer preprocessing"
)
g_fs.add_argument(
"--fs-license-file",
metavar="PATH",
@@ -415,11 +428,14 @@ def _bids_filter(value):
def parse_args(args=None, namespace=None):
"""Parse args and run further checks on the command line."""
import logging

from niworkflows.utils.spaces import Reference, SpatialReferences

parser = _build_parser()
opts = parser.parse_args(args, namespace)
config.execution.log_level = int(max(25 - 5 * opts.verbose_count, logging.DEBUG))
config.execution.log_level = int(
max(25 - 5 * opts.verbose_count, logging.DEBUG)
)
config.from_dict(vars(opts))
config.loggers.init()

@@ -469,18 +485,33 @@ def parse_args(args=None, namespace=None):
output_dir = config.execution.output_dir
work_dir = config.execution.work_dir
version = config.environment.version
output_layout = config.execution.output_layout

if config.execution.fs_subjects_dir is None:
config.execution.fs_subjects_dir = output_dir / "freesurfer"

if config.execution.fs_subjects_dir is None:
if output_layout == "bids":
config.execution.fs_subjects_dir = (
output_dir / "sourcedata" / "freesurfer"
)
elif output_layout == "legacy":
config.execution.fs_subjects_dir = output_dir / "freesurfer"
if config.execution.fmriprep_dir is None:
if output_layout == "bids":
config.execution.fmriprep_dir = output_dir
elif output_layout == "legacy":
config.execution.fmriprep_dir = output_dir / "dmriprep"
# Wipe out existing work_dir
if opts.clean_workdir and work_dir.exists():
from niworkflows.utils.misc import clean_directory

build_log.log("Clearing previous dMRIPrep working directory: %s", work_dir)
build_log.log(
"Clearing previous dMRIPrep working directory: %s", work_dir
)
if not clean_directory(work_dir):
build_log.warning(
"Could not clear all contents of working directory: %s", work_dir
"Could not clear all contents of working directory: %s",
work_dir,
)

# Ensure input and output folders are not the same
@@ -533,4 +564,6 @@ def parse_args(args=None, namespace=None):
)

config.execution.participant_label = sorted(participant_label)
config.workflow.skull_strip_template = config.workflow.skull_strip_template[0]
config.workflow.skull_strip_template = (
config.workflow.skull_strip_template[0]
)
78 changes: 62 additions & 16 deletions dmriprep/config/__init__.py
Original file line number Diff line number Diff line change
@@ -67,16 +67,20 @@
:py:class:`~bids.layout.BIDSLayout`, etc.)

"""
from multiprocessing import set_start_method
import warnings
from multiprocessing import set_start_method

# cmp is not used by dmriprep, so ignore nipype-generated warnings
warnings.filterwarnings("ignore", "cmp not installed")
warnings.filterwarnings(
"ignore", "This has not been fully tested. Please report any failures."
)
warnings.filterwarnings("ignore", "sklearn.externals.joblib is deprecated in 0.21")
warnings.filterwarnings("ignore", "can't resolve package from __spec__ or __package__")
warnings.filterwarnings(
"ignore", "sklearn.externals.joblib is deprecated in 0.21"
)
warnings.filterwarnings(
"ignore", "can't resolve package from __spec__ or __package__"
)
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=ResourceWarning)
@@ -89,28 +93,35 @@
finally:
# Defer all custom import for after initializing the forkserver and
# ignoring the most annoying warnings
import logging
import os
import sys
import logging

from uuid import uuid4
from pathlib import Path
from time import strftime
from niworkflows.utils.spaces import SpatialReferences as _SRs, Reference as _Ref
from nipype import logging as nlogging, __version__ as _nipype_ver
from uuid import uuid4

from nipype import __version__ as _nipype_ver
from nipype import logging as nlogging
from niworkflows.utils.spaces import Reference as _Ref
from niworkflows.utils.spaces import SpatialReferences as _SRs
from templateflow import __version__ as _tf_ver

from .. import __version__


def redirect_warnings(message, category, filename, lineno, file=None, line=None):
def redirect_warnings(
message, category, filename, lineno, file=None, line=None
):
"""Redirect other warnings."""
logger = logging.getLogger()
logger.debug("Captured warning (%s): %s", category, message)


warnings.showwarning = redirect_warnings

logging.addLevelName(25, "IMPORTANT") # Add a new level between INFO and WARNING
logging.addLevelName(
25, "IMPORTANT"
) # Add a new level between INFO and WARNING
logging.addLevelName(15, "VERBOSE") # Add a new level between INFO and DEBUG

DEFAULT_MEMORY_MIN_GB = 0.01
@@ -133,14 +144,15 @@ def redirect_warnings(message, category, filename, lineno, file=None, line=None)

_templateflow_home = Path(
os.getenv(
"TEMPLATEFLOW_HOME", os.path.join(os.getenv("HOME"), ".cache", "templateflow")
"TEMPLATEFLOW_HOME",
os.path.join(os.getenv("HOME"), ".cache", "templateflow"),
)
)

try:
from psutil import virtual_memory

_free_mem_at_start = round(virtual_memory().free / 1024 ** 3, 1)
_free_mem_at_start = round(virtual_memory().free / 1024**3, 1)
except Exception:
_free_mem_at_start = None

@@ -341,6 +353,8 @@ class execution(_Config):
"""Only generate a boilerplate."""
debug = False
"""Run in sloppy mode (meaning, suboptimal parameters that minimize run-time)."""
dmriprep_dir = None
"""Root of dMRIPrep BIDS Derivatives dataset. Depends on output_layout."""
fs_license_file = _fs_license
"""An existing file containing a FreeSurfer license."""
fs_subjects_dir = None
@@ -359,6 +373,8 @@ class execution(_Config):
"""Do not monitor *dMRIPrep* using Google Analytics."""
output_dir = None
"""Folder where derivatives will be stored."""
output_layout = "bids"
"""Layout of derivatives within output_dir."""
output_spaces = None
"""List of (non)standard spaces designated (with the ``--output-spaces`` flag of
the command line) as spatial references for outputs."""
@@ -392,8 +408,11 @@ class execution(_Config):
@classmethod
def init(cls):
"""Create a new BIDS Layout accessible with :attr:`~execution.layout`."""
if cls.fs_license_file and Path(cls.fs_license_file).is_file():
os.environ["FS_LICENSE"] = str(cls.fs_license_file)
if cls._layout is None:
import re

from bids.layout import BIDSLayout

work_dir = cls.work_dir / "bids.db"
@@ -412,6 +431,20 @@ def init(cls):
),
)
cls.layout = cls._layout
if cls.bids_filters:
from bids.layout import Query

# unserialize pybids Query enum values
for acq, filters in cls.bids_filters.items():
cls.bids_filters[acq] = {
k: getattr(Query, v[7:-4])
if not isinstance(v, Query) and "Query" in v
else v
for k, v in filters.items()
}

if "all" in cls.debug:
cls.debug = list(DEBUG_MODES)


# These variables are not necessary anymore
@@ -462,7 +495,9 @@ class workflow(_Config):
class loggers:
"""Keep loggers easily accessible (see :py:func:`init`)."""

_fmt = "%(asctime)s,%(msecs)d %(name)-2s " "%(levelname)-2s:\n\t %(message)s"
_fmt = (
"%(asctime)s,%(msecs)d %(name)-2s " "%(levelname)-2s:\n\t %(message)s"
)
_datefmt = "%y%m%d-%H:%M:%S"

default = logging.getLogger()
@@ -489,15 +524,22 @@ def init(cls):
from nipype import config as ncfg

_handler = logging.StreamHandler(stream=sys.stdout)
_handler.setFormatter(logging.Formatter(fmt=cls._fmt, datefmt=cls._datefmt))
_handler.setFormatter(
logging.Formatter(fmt=cls._fmt, datefmt=cls._datefmt)
)
cls.cli.addHandler(_handler)
cls.default.setLevel(execution.log_level)
cls.cli.setLevel(execution.log_level)
cls.interface.setLevel(execution.log_level)
cls.workflow.setLevel(execution.log_level)
cls.utils.setLevel(execution.log_level)
ncfg.update_config(
{"logging": {"log_directory": str(execution.log_dir), "log_to_file": True}}
{
"logging": {
"log_directory": str(execution.log_dir),
"log_to_file": True,
}
}
)


@@ -560,7 +602,11 @@ def init_spaces(checkpoint=True):
spaces = execution.output_spaces or SpatialReferences()
if not isinstance(spaces, SpatialReferences):
spaces = SpatialReferences(
[ref for s in spaces.split(" ") for ref in Reference.from_string(s)]
[
ref
for s in spaces.split(" ")
for ref in Reference.from_string(s)
]
)

if checkpoint and not spaces.is_cached():
2 changes: 1 addition & 1 deletion dmriprep/data/tests/config.toml
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@ overcommit_policy = "heuristic"
overcommit_limit = "50%"
nipype_version = "1.4.2"
templateflow_version = "0.4.2"
version = "0.2.2"
version = "0.4.0"

[execution]
bids_dir = "THP002/"
213 changes: 155 additions & 58 deletions dmriprep/workflows/base.py
Original file line number Diff line number Diff line change
@@ -31,24 +31,24 @@
import os
import sys
from copy import deepcopy
from pathlib import Path

from nipype.interfaces import utility as niu
from nipype.pipeline import engine as pe
from niworkflows.utils.misc import fix_multi_T1w_source_name
from niworkflows.utils.spaces import Reference
from packaging.version import Version
from smriprep.workflows.anatomical import init_anat_preproc_wf

from .. import config
from ..interfaces import BIDSDataGrabber, DerivativesDataSink
from ..interfaces import DerivativesDataSink
from ..interfaces.reports import AboutSummary, SubjectSummary
from ..utils.bids import collect_data
from .dwi import init_dwi_preproc_wf


def init_dmriprep_wf():
"""
Create the base workflow.
Build *dMRIPrep*'s pipeline.
This workflow organizes the execution of *dMRIPrep*, with a sub-workflow for
each subject. If FreeSurfer's recon-all is to be run, a FreeSurfer derivatives folder is
@@ -66,9 +66,11 @@ def init_dmriprep_wf():
"""
from niworkflows.engine.workflows import LiterateWorkflow as Workflow
from niworkflows.interfaces.bids import BIDSFreeSurferDir, BIDSInfo
from niworkflows.interfaces.bids import BIDSFreeSurferDir

dmriprep_wf = Workflow(name="dmriprep_wf")
ver = Version(config.environment.version)

dmriprep_wf = Workflow(name=f"dmriprep_{ver.major}_{ver.minor}_wf")
dmriprep_wf.base_dir = config.execution.work_dir

freesurfer = config.workflow.run_reconall
@@ -91,8 +93,7 @@ def init_dmriprep_wf():
single_subject_wf = init_single_subject_wf(subject_id)

single_subject_wf.config["execution"]["crashdump_dir"] = str(
config.execution.output_dir
/ "dmriprep"
Path(config.execution.dmriprep_dir)
/ f"sub-{subject_id}"
/ "log"
/ config.execution.run_uuid
@@ -112,8 +113,7 @@ def init_dmriprep_wf():

# Dump a copy of the config file into the log directory
log_dir = (
config.execution.output_dir
/ "dmriprep"
Path(config.execution.dmriprep_dir)
/ f"sub-{subject_id}"
/ "log"
/ config.execution.run_uuid
@@ -159,29 +159,55 @@ def init_single_subject_wf(subject_id):
FreeSurfer's ``$SUBJECTS_DIR``
"""
from niworkflows.engine.workflows import LiterateWorkflow as Workflow
from niworkflows.interfaces.bids import BIDSDataGrabber, BIDSInfo
from smriprep.workflows.anatomical import init_anat_preproc_wf

from ..utils.misc import sub_prefix as _prefix

name = f"single_subject_{subject_id}_wf"
subject_data = collect_data(config.execution.layout, subject_id)[0]
subject_data = collect_data(
config.execution.layout,
subject_id,
bids_filters=config.execution.bids_filters,
)[0]

if "flair" in config.workflow.ignore:
subject_data["flair"] = []
if "t2w" in config.workflow.ignore:
subject_data["t2w"] = []

anat_only = config.workflow.anat_only

anat_derivatives = config.execution.anat_derivatives
spaces = config.workflow.spaces
# Make sure we always go through these two checks
if not anat_only and not subject_data["dwi"]:
raise Exception(
f"No DWI data found for participant {subject_id}. "
"All workflows require DWI images."
)
if anat_derivatives:
from smriprep.utils.bids import collect_derivatives

if not subject_data["t1w"]:
std_spaces = spaces.get_spaces(nonstandard=False, dim=(3,))
anat_derivatives = collect_derivatives(
anat_derivatives.absolute(),
subject_id,
std_spaces,
config.workflow.run_reconall,
)
if anat_derivatives is None:
config.loggers.workflow.warning(
f"""\
Attempted to access pre-existing anatomical derivatives at \
<{config.execution.anat_derivatives}>, however not all expectations of fMRIPrep \
were met (for participant <{subject_id}>, spaces <{', '.join(std_spaces)}>, \
reconall <{config.workflow.run_reconall}>)."""
)
if not anat_derivatives and not subject_data["t1w"]:
raise Exception(
f"No T1w images found for participant {subject_id}. "
"All workflows require T1w images."
"No T1w images found for participant {}. "
"All workflows require T1w images.".format(subject_id)
)

workflow = Workflow(name=name)
@@ -212,15 +238,19 @@ def init_single_subject_wf(subject_id):
### References
"""
spaces = config.workflow.spaces
output_dir = config.execution.output_dir
dmriprep_dir = str(config.execution.dmriprep_dir)

fsinputnode = pe.Node(
niu.IdentityInterface(fields=["subjects_dir"]), name="fsinputnode"
)

bidssrc = pe.Node(
BIDSDataGrabber(subject_data=subject_data, anat_only=anat_only),
BIDSDataGrabber(
subject_data=subject_data,
anat_only=anat_only,
anat_derivatives=anat_derivatives,
subject_id=subject_id,
),
name="bidssrc",
)

@@ -248,32 +278,22 @@ def init_single_subject_wf(subject_id):

ds_report_summary = pe.Node(
DerivativesDataSink(
base_directory=str(output_dir), desc="summary", datatype="figures"
base_directory=dmriprep_dir,
desc="summary",
datatype="figures",
),
name="ds_report_summary",
run_without_submitting=True,
)

ds_report_about = pe.Node(
DerivativesDataSink(
base_directory=str(output_dir), desc="about", datatype="figures"
base_directory=dmriprep_dir, desc="about", datatype="figures"
),
name="ds_report_about",
run_without_submitting=True,
)

anat_derivatives = config.execution.anat_derivatives
if anat_derivatives:
from smriprep.utils.bids import collect_derivatives

std_spaces = spaces.get_spaces(nonstandard=False, dim=(3,))
anat_derivatives = collect_derivatives(
anat_derivatives.absolute(),
subject_id,
std_spaces,
config.workflow.run_reconall,
)

# Preprocessing of T1w (includes registration to MNI)
anat_preproc_wf = init_anat_preproc_wf(
bids_root=str(config.execution.bids_dir),
@@ -283,7 +303,7 @@ def init_single_subject_wf(subject_id):
hires=config.workflow.hires,
longitudinal=config.workflow.longitudinal,
omp_nthreads=config.nipype.omp_nthreads,
output_dir=str(output_dir),
output_dir=dmriprep_dir,
skull_strip_fixed_seed=config.workflow.skull_strip_fixed_seed,
skull_strip_mode="force",
skull_strip_template=Reference.from_string(
@@ -292,39 +312,116 @@ def init_single_subject_wf(subject_id):
spaces=spaces,
t1w=subject_data["t1w"],
)
anat_preproc_wf.__desc__ = f"\n\n{anat_preproc_wf.__desc__}"
workflow.connect(
[
(
fsinputnode,
anat_preproc_wf,
[("subjects_dir", "inputnode.subjects_dir")],
),
(fsinputnode, summary, [("subjects_dir", "subjects_dir")]),
(bids_info, summary, [("subject", "subject_id")]),
(
bids_info,
anat_preproc_wf,
[(("subject", _prefix), "inputnode.subject_id")],
),
(
bidssrc,
anat_preproc_wf,
[
("t1w", "inputnode.t1w"),
("t2w", "inputnode.t2w"),
("roi", "inputnode.roi"),
("flair", "inputnode.flair"),
],
),
(summary, ds_report_summary, [("out_report", "in_file")]),
(about, ds_report_about, [("out_report", "in_file")]),
]
)

if not anat_derivatives:
workflow.connect(
[
(
bidssrc,
bids_info,
[(("t1w", fix_multi_T1w_source_name), "in_file")],
),
(bidssrc, summary, [("t1w", "t1w"), ("t2w", "t2w")]),
(
bidssrc,
ds_report_summary,
[(("t1w", fix_multi_T1w_source_name), "source_file")],
),
(
bidssrc,
ds_report_about,
[(("t1w", fix_multi_T1w_source_name), "source_file")],
),
]
)
else:
workflow.connect(
[
(
anat_preproc_wf,
summary,
[("outputnode.t1w_preproc", "t1w")],
),
(
anat_preproc_wf,
ds_report_summary,
[("outputnode.t1w_preproc", "source_file")],
),
(
anat_preproc_wf,
ds_report_about,
[("outputnode.t1w_preproc", "source_file")],
),
]
)

# fmt:off
workflow.connect([
(fsinputnode, anat_preproc_wf, [("subjects_dir", "inputnode.subjects_dir")]),
(bidssrc, bids_info, [(("t1w", fix_multi_T1w_source_name), "in_file")]),
(fsinputnode, summary, [("subjects_dir", "subjects_dir")]),
(bidssrc, summary, [("t1w", "t1w"), ("t2w", "t2w"), ("dwi", "dwi")]),
(bids_info, summary, [("subject", "subject_id")]),
(bids_info, anat_preproc_wf, [(("subject", _prefix), "inputnode.subject_id")]),
(bidssrc, anat_preproc_wf, [
("t1w", "inputnode.t1w"),
("t2w", "inputnode.t2w"),
("roi", "inputnode.roi"),
("flair", "inputnode.flair"),
]),
(bidssrc, ds_report_summary, [
(("t1w", fix_multi_T1w_source_name), "source_file"),
]),
(summary, ds_report_summary, [("out_report", "in_file")]),
(bidssrc, ds_report_about, [
(("t1w", fix_multi_T1w_source_name), "source_file")
]),
(about, ds_report_about, [("out_report", "in_file")]),
])
# fmt:off
# Overwrite ``out_path_base`` of smriprep's DataSinks
for node in workflow.list_node_names():
if node.split(".")[-1].startswith("ds_"):
workflow.get_node(node).interface.out_path_base = "dmriprep"
workflow.get_node(node).interface.out_path_base = ""

if anat_only:
return workflow
# fmt:off
# workflow.connect([
# (fsinputnode, anat_preproc_wf, [("subjects_dir", "inputnode.subjects_dir")]),
# (bidssrc, bids_info, [(("t1w", fix_multi_T1w_source_name), "in_file")]),
# (fsinputnode, summary, [("subjects_dir", "subjects_dir")]),
# (bidssrc, summary, [("t1w", "t1w"), ("t2w", "t2w"), ("dwi", "dwi")]),
# (bids_info, summary, [("subject", "subject_id")]),
# (bids_info, anat_preproc_wf, [(("subject", _prefix), "inputnode.subject_id")]),
# (bidssrc, anat_preproc_wf, [
# ("t1w", "inputnode.t1w"),
# ("t2w", "inputnode.t2w"),
# ("roi", "inputnode.roi"),
# ("flair", "inputnode.flair"),
# ]),
# (bidssrc, ds_report_summary, [
# (("t1w", fix_multi_T1w_source_name), "source_file"),
# ]),
# (summary, ds_report_summary, [("out_report", "in_file")]),
# (bidssrc, ds_report_about, [
# (("t1w", fix_multi_T1w_source_name), "source_file")
# ]),
# (about, ds_report_about, [("out_report", "in_file")]),
# ])
# # fmt:off
# # Overwrite ``out_path_base`` of smriprep's DataSinks
# for node in workflow.list_node_names():
# if node.split(".")[-1].startswith("ds_"):
# workflow.get_node(node).interface.out_path_base = "dmriprep"

# if anat_only:
# return workflow
return workflow

from .dwi.base import init_dwi_preproc_wf