Skip to content

Commit

Permalink
Handle groups
Browse files Browse the repository at this point in the history
  • Loading branch information
astewartau committed Nov 14, 2024
1 parent 638b017 commit 5d5a752
Show file tree
Hide file tree
Showing 8 changed files with 288 additions and 225 deletions.
18 changes: 14 additions & 4 deletions dcm_check/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,20 @@
import os

from tabulate import tabulate

from dcm_check.dcm_check import load_ref_json, load_ref_dicom, load_ref_pydantic, get_compliance_summary, load_dicom
from dcm_check.dcm_check import (
load_ref_json,
load_ref_dicom,
load_ref_pydantic,
get_compliance_summary,
load_dicom
)

def infer_type_from_extension(ref_path):
"""Infer the reference type based on the file extension."""
_, ext = os.path.splitext(ref_path.lower())
if ext == ".json":
return "json"
elif ext == ".dcm":
elif ext in [".dcm", ".IMA"]:
return "dicom"
elif ext == ".py":
return "pydantic"
Expand All @@ -28,6 +33,7 @@ def main():
parser.add_argument("--ref", required=True, help="Reference JSON file, DICOM file, or Python module to use for compliance.")
parser.add_argument("--type", choices=["json", "dicom", "pydantic"], help="Reference type: 'json', 'dicom', or 'pydantic'.")
parser.add_argument("--scan", required=False, help="Scan type when using a JSON or Pydantic reference.")
parser.add_argument("--group", required=False, help="Specific group name within the acquisition for JSON references.")
parser.add_argument("--in", dest="in_file", required=True, help="Path to the DICOM file to check.")
parser.add_argument("--fields", nargs="*", help="Optional: List of DICOM fields to include in validation for DICOM reference.")
parser.add_argument("--out", required=False, help="Path to save the compliance report in JSON format.")
Expand All @@ -37,7 +43,11 @@ def main():
ref_type = args.type or infer_type_from_extension(args.ref)

if ref_type == "json":
reference_model = load_ref_json(args.ref, args.scan)
# Include group if specified
if args.group:
reference_model = load_ref_json(args.ref, args.scan, group_name=args.group)
else:
reference_model = load_ref_json(args.ref, args.scan)
elif ref_type == "dicom":
ref_dicom_values = load_dicom(args.ref)
reference_model = load_ref_dicom(ref_dicom_values, args.fields)
Expand Down
73 changes: 53 additions & 20 deletions dcm_check/dcm_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pydicom.uid import UID
from pydicom.valuerep import PersonName, DSfloat, IS
from pydantic_core import PydanticUndefined
from pydantic.class_validators import validator

def get_dicom_values(ds: pydicom.dataset.FileDataset) -> Dict[str, Any]:
"""Convert a DICOM dataset to a dictionary, handling sequences and DICOM-specific data types.
Expand Down Expand Up @@ -59,62 +60,94 @@ def load_dicom(dicom_file: str) -> Dict[str, Any]:

def create_reference_model(reference_values: Dict[str, Any], fields_config: List[Union[str, Dict[str, Any]]]) -> BaseModel:
model_fields = {}
validators = {}

# Define validation functions dynamically
def contains_check_factory(field_name, contains_value):
@validator(field_name, pre=True, allow_reuse=True)
def contains_check(cls, v):
if not isinstance(v, list) or contains_value not in v:
raise ValueError(f"{field_name} must contain '{contains_value}'")
return v
return contains_check

for field in fields_config:
field_name = field["field"]
tolerance = field.get("tolerance")
pattern = field.get("value") if isinstance(field.get("value"), str) and "*" in field["value"] else None
contains = field.get("contains")
ref_value = reference_values.get(field_name, field.get("value"))

if pattern:
# Use the `pattern` parameter in Field to directly enforce the regex pattern
# Pattern matching
model_fields[field_name] = (
str,
Field(
default=PydanticUndefined,
pattern=pattern.replace("*", ".*") # Pydantic will apply this regex pattern directly
),
Field(default=PydanticUndefined, pattern=pattern.replace("*", ".*"))
)
elif tolerance is not None:
# Numeric tolerance
model_fields[field_name] = (
confloat(ge=ref_value - tolerance, le=ref_value + tolerance),
Field(default=ref_value),
Field(default=ref_value)
)
elif contains:
# Add a field expecting a list and register a custom validator for "contains"
model_fields[field_name] = (List[str], Field(default=PydanticUndefined))
validators[f"{field_name}_contains"] = contains_check_factory(field_name, contains)
else:
# Exact match
model_fields[field_name] = (
Literal[ref_value],
Field(default=PydanticUndefined),
Field(default=PydanticUndefined)
)

return create_model("ReferenceModel", **model_fields)

# Create model with dynamically added validators
return create_model("ReferenceModel", **model_fields, __validators__=validators)

def load_ref_json(json_path: str, scan_type: str) -> BaseModel:
"""Load a JSON configuration file and create a reference model for a specified scan type.
def load_ref_json(json_path: str, scan_type: str, group_name: Optional[str] = None) -> BaseModel:
"""Load a JSON configuration file and create a reference model for a specified scan type and group.
Args:
json_path (str): Path to the JSON configuration file.
scan_type (str): Scan type to load (e.g., "T1").
scan_type (str): Acquisition scan type to load (e.g., "T1").
group_name (Optional[str]): Specific group name to validate within the acquisition.
Returns:
reference_model (BaseModel): A Pydantic model based on the JSON configuration.
"""
with open(json_path, 'r') as f:
config = json.load(f)

scan_config = config.get("acquisitions", {}).get(scan_type)
if not scan_config:
# Load acquisition configuration
acquisition_config = config.get("acquisitions", {}).get(scan_type)
if not acquisition_config:
raise ValueError(f"Scan type '{scan_type}' not found in JSON configuration.")

ref_file = scan_config.get("ref", None)
fields_config = scan_config["fields"]
# Load the reference DICOM if specified
ref_file = acquisition_config.get("ref", None)
reference_values = load_dicom(ref_file) if ref_file else {}

# Add acquisition-level fields to the reference model configuration
fields_config = acquisition_config.get("fields", [])
acquisition_reference = {field["field"]: field.get("value") for field in fields_config if "value" in field}

if ref_file:
reference_values = load_dicom(ref_file)
# Check if a group_name is specified and retrieve its configuration
group_fields = []
if group_name:
group = next((grp for grp in acquisition_config.get("groups", []) if grp["name"] == group_name), None)
if not group:
raise ValueError(f"Group '{group_name}' not found in acquisition '{scan_type}'.")

group_fields = group.get("fields", [])
group_reference = {field["field"]: field.get("value") for field in group_fields if "value" in field}
reference_values.update(group_reference)
else:
reference_values = {field["field"]: field["value"] for field in fields_config if "value" in field}
reference_values.update(acquisition_reference)

# Combine acquisition and group fields for the reference model creation
combined_fields_config = fields_config + group_fields

return create_reference_model(reference_values, fields_config)
return create_reference_model(reference_values, combined_fields_config)

def load_ref_dicom(dicom_values: Dict[str, Any], fields: Optional[List[str]] = None) -> BaseModel:
"""Create a reference model based on DICOM values.
Expand Down
55 changes: 39 additions & 16 deletions dcm_check/generate_json_ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ def generate_json_ref(in_session_dir, out_json_ref, acquisition_fields, referenc
if dicom_df[field].apply(lambda x: isinstance(x, list)).any():
dicom_df[field] = dicom_df[field].apply(lambda x: tuple(x) if isinstance(x, list) else x)

# Sort the DataFrame by acquisition fields and reference fields
sort_order = acquisition_fields + reference_fields
dicom_df = dicom_df.sort_values(by=sort_order).reset_index(drop=True)

# Drop duplicates based on unique acquisition fields
unique_series_df = dicom_df.drop_duplicates(subset=acquisition_fields)

Expand All @@ -58,6 +62,9 @@ def generate_json_ref(in_session_dir, out_json_ref, acquisition_fields, referenc
# Dictionary to store unique groups of reference fields
unique_groups = {}

# Track reference fields that are constant across all groups
constant_reference_fields = {}

# Group by reference field combinations and gather representative paths
for _, group_row in series_df.drop(columns=acquisition_fields).drop_duplicates().iterrows():
# Create a tuple for the current field combination
Expand All @@ -67,6 +74,28 @@ def generate_json_ref(in_session_dir, out_json_ref, acquisition_fields, referenc
if group_values not in unique_groups:
unique_groups[group_values] = group_row['dicom_path']

# Identify constant reference fields across groups
for field in reference_fields:
unique_values = series_df[field].unique()
if len(unique_values) == 1:
constant_reference_fields[field] = unique_values[0]

# Remove constant fields from the groups and only include changing fields
groups = []
group_number = 1
for group, example_path in unique_groups.items():
group_fields = [
{"field": field, "value": value}
for field, value in group if field not in constant_reference_fields
]
if group_fields:
groups.append({
"name": f"Group {group_number}", # Assign default name
"fields": group_fields,
"example": example_path
})
group_number += 1

# Format the series name based on the template using MissingFieldDict to handle missing keys
try:
series_name = name_template.format_map(MissingFieldDict(unique_row.to_dict()))
Expand All @@ -81,28 +110,23 @@ def generate_json_ref(in_session_dir, out_json_ref, acquisition_fields, referenc
# Add acquisition-level fields and values
acquisition_fields_list = [{"field": field, "value": unique_row[field]} for field in acquisition_fields]

# Include constant reference fields in the acquisition-level fields
acquisition_fields_list.extend(
[{"field": field, "value": value} for field, value in constant_reference_fields.items()]
)

# Decide whether to include groups or inline reference fields
if len(unique_groups) == 1:
# Only one unique group, so inline its fields
single_group = list(unique_groups.items())[0]
group_fields = [{"field": field, "value": value} for field, value in single_group[0]]
if groups:
acquisitions[final_series_name] = {
"ref": unique_row['dicom_path'],
"fields": acquisition_fields_list + group_fields
"fields": acquisition_fields_list,
"groups": groups
}
else:
# Multiple groups, add them under the "groups" key
groups = [
{
"fields": [{"field": field, "value": value} for field, value in group],
"example": example_path
}
for group, example_path in unique_groups.items()
]
# No changing groups, so we store only the acquisition-level fields
acquisitions[final_series_name] = {
"ref": unique_row['dicom_path'],
"fields": acquisition_fields_list,
"groups": groups
"fields": acquisition_fields_list
}

# Build the JSON output structure
Expand All @@ -116,7 +140,6 @@ def generate_json_ref(in_session_dir, out_json_ref, acquisition_fields, referenc
print(f"JSON reference saved to {out_json_ref}")



def main():
parser = argparse.ArgumentParser(description="Generate a JSON reference for DICOM compliance.")
parser.add_argument("--in_session_dir", required=True, help="Directory containing DICOM files for the session.")
Expand Down
Binary file modified dcm_check/tests/ref_dicom.dcm
Binary file not shown.
15 changes: 11 additions & 4 deletions dcm_check/tests/ref_json.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
{
"acquisitions": {
"T1": {
"ref": "dcm_check/tests/ref_dicom.dcm",
"fields": [
{ "field": "EchoTime", "tolerance": 0.1 },
{ "field": "RepetitionTime" },
{ "field": "SeriesDescription", "value": "*t1*" }
{"field": "EchoTime", "tolerance": 0.1, "value": 3.0},
{"field": "RepetitionTime", "value": 8.0},
{"field": "SeriesDescription", "value": "*T1*"}
],
"groups": [
{
"name": "Group 1",
"fields": [
{"field": "ImageType", "contains": "M"}
]
}
]
}
}
Expand Down
Loading

0 comments on commit 5d5a752

Please sign in to comment.