Skip to content

Commit

Permalink
Rename scan-group to acquisition-series
Browse files Browse the repository at this point in the history
  • Loading branch information
astewartau committed Nov 15, 2024
1 parent 0b1ff09 commit 58d3aa8
Show file tree
Hide file tree
Showing 12 changed files with 136 additions and 137 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ The CLI provides options to specify a reference model and validate a DICOM file.
Command Syntax

```bash
dcm-check --ref <reference-file> --type <json|dicom|pydantic> --scan <scan-type> --in <dicom-file> [--fields <field1 field2 ...>] [--out <output-file>]
dcm-check --ref <reference-file> --type <json|dicom|pydantic> --acquisition <acquisition-type> --in <dicom-file> [--fields <field1 field2 ...>] [--out <output-file>]
```

**Arguments:**

- `--ref`: Path to the reference file (JSON, DICOM, or Python module).
- `--type`: Type of reference model (json, dicom, or pydantic).
- `--scan`: Scan type (e.g., T1, T2w, etc.) when using JSON or Pydantic references; inferred if not given.
- `--acquisition`: Acquisition type (e.g., T1, T2w, etc.) when using JSON or Pydantic references; inferred if not given.
- `--in`: Path to the DICOM file to validate.
- `--fields`: (Optional) List of specific DICOM fields to include in validation for DICOM reference types.
- `--out`: (Optional) Path to save the compliance report as a JSON file.
Expand All @@ -44,7 +44,7 @@ dcm-check --ref <reference-file> --type <json|dicom|pydantic> --scan <scan-type>
Validate a DICOM file using a JSON reference model:

```bash
dcm-check --ref reference.json --scan T1 --in dicom_file.dcm
dcm-check --ref reference.json --acquisition T1 --in dicom_file.dcm
```

Validate a DICOM file using another DICOM as a reference:
Expand All @@ -56,7 +56,7 @@ dcm-check --ref reference.dcm --in dicom_file.dcm --fields EchoTime RepetitionTi
Validate a DICOM file using a Pydantic model in a Python module:

```bash
dcm-check --ref reference.py --scan T1_MPR --in dicom_file.dcm
dcm-check --ref reference.py --acquisition T1_MPR --in dicom_file.dcm
```

**Output**
Expand Down
28 changes: 14 additions & 14 deletions dcm_check/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ def main():
parser = argparse.ArgumentParser(description="Check DICOM compliance against a reference model.")
parser.add_argument("--ref", required=True, help="Reference JSON file, DICOM file, or Python module to use for compliance.")
parser.add_argument("--type", choices=["json", "dicom", "pydantic"], help="Reference type: 'json', 'dicom', or 'pydantic'.")
parser.add_argument("--scan", required=False, help="Scan type when using a JSON or Pydantic reference.")
parser.add_argument("--group", required=False, help="Specific group name within the acquisition for JSON references.")
parser.add_argument("--acquisition", required=False, help="Acquisition name when using a JSON or Pydantic reference.")
parser.add_argument("--series", required=False, help="Specific series name within the acquisition for JSON references.")
parser.add_argument("--in", dest="in_file", required=True, help="Path to the DICOM file to check.")
parser.add_argument("--fields", nargs="*", help="Optional: List of DICOM fields to include in validation for DICOM reference.")
parser.add_argument("--out", required=False, help="Path to save the compliance report in JSON format.")
Expand All @@ -43,34 +43,34 @@ def main():
ref_type = args.type or infer_type_from_extension(args.ref)

if ref_type == "json":
# Include group if specified
if args.group:
reference_model = load_ref_json(args.ref, args.scan, group_name=args.group)
# Include series if specified
if args.series:
reference_model = load_ref_json(args.ref, args.acquisition, series_name=args.series)
else:
reference_model = load_ref_json(args.ref, args.scan)
reference_model = load_ref_json(args.ref, args.acquisition)
elif ref_type == "dicom":
ref_dicom_values = load_dicom(args.ref)
reference_model = load_ref_dicom(ref_dicom_values, args.fields)
elif ref_type == "pydantic":
# check if scan is provided
if not args.scan:
print("Error: Scan type is required (--scan) when using a Pydantic reference.", file=sys.stderr)
# check if acquisition is provided
if not args.acquisition:
print("Error: Acquisition type is required (--acquisition) when using a Pydantic reference.", file=sys.stderr)
sys.exit(1)
reference_model = load_ref_pydantic(args.ref, args.scan)
reference_model = load_ref_pydantic(args.ref, args.acquisition)
else:
print(f"Error: Unsupported reference type '{ref_type}'", file=sys.stderr)
sys.exit(1)

in_dicom_values = load_dicom(args.in_file)
results = get_compliance_summary(reference_model, in_dicom_values, args.scan, args.group)
results = get_compliance_summary(reference_model, in_dicom_values, args.acquisition, args.series)

df = pd.DataFrame(results)

# remove "Acquisition" and/or "Group" columns if they are empty
# remove "Acquisition" and/or "Series" columns if they are empty
if "Acquisition" in df.columns and df["Acquisition"].isnull().all():
df.drop(columns=["Acquisition"], inplace=True)
if "Group" in df.columns and df["Group"].isnull().all():
df.drop(columns=["Group"], inplace=True)
if "Series" in df.columns and df["Series"].isnull().all():
df.drop(columns=["Series"], inplace=True)

if len(df) == 0:
print("DICOM file is compliant with the reference model.")
Expand Down
62 changes: 31 additions & 31 deletions dcm_check/dcm_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,13 @@ def contains_check(cls, v):
# Create model with dynamically added validators
return create_model("ReferenceModel", **model_fields, __validators__=validators)

def load_ref_json(json_path: str, scan_type: str, group_name: Optional[str] = None) -> BaseModel:
"""Load a JSON configuration file and create a reference model for a specified scan type and group.
def load_ref_json(json_path: str, acquisition: str, series_name: Optional[str] = None) -> BaseModel:
"""Load a JSON configuration file and create a reference model for a specified acquisition and series.
Args:
json_path (str): Path to the JSON configuration file.
scan_type (str): Acquisition scan type to load (e.g., "T1").
group_name (Optional[str]): Specific group name to validate within the acquisition.
acquisition (str): Acquisition to load (e.g., "T1").
series_name (Optional[str]): Specific series name to validate within the acquisition.
Returns:
reference_model (BaseModel): A Pydantic model based on the JSON configuration.
Expand All @@ -118,9 +118,9 @@ def load_ref_json(json_path: str, scan_type: str, group_name: Optional[str] = No
config = json.load(f)

# Load acquisition configuration
acquisition_config = config.get("acquisitions", {}).get(scan_type)
acquisition_config = config.get("acquisitions", {}).get(acquisition)
if not acquisition_config:
raise ValueError(f"Scan type '{scan_type}' not found in JSON configuration.")
raise ValueError(f"Acquisition '{acquisition}' not found in JSON configuration.")

# Load the reference DICOM if specified
ref_file = acquisition_config.get("ref", None)
Expand All @@ -133,19 +133,19 @@ def load_ref_json(json_path: str, scan_type: str, group_name: Optional[str] = No
# Always include acquisition-level fields
reference_values.update(acquisition_reference)

# Check if a group_name is specified and retrieve its configuration
group_fields = []
if group_name:
group = next((grp for grp in acquisition_config.get("groups", []) if grp["name"] == group_name), None)
if not group:
raise ValueError(f"Group '{group_name}' not found in acquisition '{scan_type}'.")
# Check if a series_name is specified and retrieve its configuration
series_fields = []
if series_name:
series = next((grp for grp in acquisition_config.get("series", []) if grp["name"] == series_name), None)
if not series:
raise ValueError(f"Series '{series_name}' not found in acquisition '{acquisition}'.")

group_fields = group.get("fields", [])
group_reference = {field["field"]: field.get("value") for field in group_fields if "value" in field}
reference_values.update(group_reference)
series_fields = series.get("fields", [])
series_reference = {field["field"]: field.get("value") for field in series_fields if "value" in field}
reference_values.update(series_reference)

# Combine acquisition and group fields for the reference model creation
combined_fields_config = fields_config + group_fields
# Combine acquisition and series fields for the reference model creation
combined_fields_config = fields_config + series_fields

return create_reference_model(reference_values, combined_fields_config)

Expand All @@ -165,34 +165,34 @@ def load_ref_dicom(dicom_values: Dict[str, Any], fields: Optional[List[str]] = N
fields_config = [{"field": field} for field in fields] if fields else [{"field": key} for key in dicom_values]
return create_reference_model(dicom_values, fields_config)

def load_ref_pydantic(module_path: str, scan_type: str) -> BaseModel:
"""Load a Pydantic model from a specified Python file for the given scan type.
def load_ref_pydantic(module_path: str, acquisition: str) -> BaseModel:
"""Load a Pydantic model from a specified Python file for the given acquisition.
Args:
module_path (str): Path to the Python file containing the scan models.
scan_type (str): The scan type to retrieve (e.g., "T1_MPR").
module_path (str): Path to the Python file containing the acquisition models.
acquisition (str): The acquisition to retrieve (e.g., "T1_MPR").
Returns:
reference_model (BaseModel): The Pydantic model for the specified scan type.
reference_model (BaseModel): The Pydantic model for the specified acquisition type.
"""
# Load the module from the specified file path
spec = importlib.util.spec_from_file_location("ref_module", module_path)
ref_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(ref_module)

# Retrieve SCAN_MODELS from the loaded module
scan_models: Dict[str, Any] = getattr(ref_module, "SCAN_MODELS", None)
if not scan_models:
raise ValueError(f"No SCAN_MODELS found in the module '{module_path}'.")
# Retrieve ACQUISITION_MODELS from the loaded module
acquisition_models: Dict[str, Any] = getattr(ref_module, "ACQUISITION_MODELS", None)
if not acquisition_models:
raise ValueError(f"No ACQUISITION_MODELS found in the module '{module_path}'.")

# Retrieve the specific model for the given scan type
reference_model = scan_models.get(scan_type)
# Retrieve the specific model for the given acquisition
reference_model = acquisition_models.get(acquisition)
if not reference_model:
raise ValueError(f"Scan type '{scan_type}' is not defined in SCAN_MODELS.")
raise ValueError(f"Acquisition '{acquisition}' is not defined in ACQUISITION_MODELS.")

return reference_model

def get_compliance_summary(reference_model: BaseModel, dicom_values: Dict[str, Any], acquisition: str = None, group: str = None, raise_errors: bool = False) -> List[Dict[str, Any]]:
def get_compliance_summary(reference_model: BaseModel, dicom_values: Dict[str, Any], acquisition: str = None, series: str = None, raise_errors: bool = False) -> List[Dict[str, Any]]:
"""Validate a DICOM file against the reference model."""
compliance_summary = []

Expand All @@ -209,7 +209,7 @@ def get_compliance_summary(reference_model: BaseModel, dicom_values: Dict[str, A
actual = dicom_values.get(param, "N/A") if param != "Model-Level Error" else "N/A"
compliance_summary.append({
"Acquisition": acquisition,
"Group": group,
"Series": series,
"Parameter": param,
"Value": actual,
"Expected": expected
Expand Down
4 changes: 2 additions & 2 deletions dcm_check/generate_json_ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def generate_json_ref(in_session_dir, out_json_ref, acquisition_fields, referenc
]
if group_fields:
groups.append({
"name": f"Group {group_number}", # Assign default name
"name": f"Series {group_number}", # Assign default name
"fields": group_fields,
"ref": ref_path
})
Expand Down Expand Up @@ -120,7 +120,7 @@ def generate_json_ref(in_session_dir, out_json_ref, acquisition_fields, referenc
acquisitions[final_series_name] = {
"ref": unique_row['dicom_path'],
"fields": acquisition_fields_list,
"groups": groups
"series": groups
}
else:
# No changing groups, so we store only the acquisition-level fields
Expand Down
42 changes: 21 additions & 21 deletions dcm_check/read_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,16 @@ def find_closest_matches(session_df, acquisitions_info):
acq_name = acq_info["name"]
acq_diff_score = calculate_total_difference(acq_info, row)

if not acq_info["groups"]: # Acquisitions without groups (assign group as None)
if not acq_info["series"]: # Acquisitions without groups (assign group as None)
row_costs.append(acq_diff_score)
row_assignments.append((i, acq_name, None, acq_diff_score))
else:
for group in acq_info["groups"]:
group_name = group["name"]
group_diff_score = calculate_total_difference(group, row)
total_score = acq_diff_score + group_diff_score
for series in acq_info["series"]:
series_name = series["name"]
series_diff_score = calculate_total_difference(series, row)
total_score = acq_diff_score + series_diff_score
row_costs.append(total_score)
row_assignments.append((i, acq_name, group_name, total_score))
row_assignments.append((i, acq_name, series_name, total_score))

cost_matrix.append(row_costs)
possible_assignments.append(row_assignments)
Expand All @@ -88,16 +88,16 @@ def find_closest_matches(session_df, acquisitions_info):
row_indices, col_indices = linear_sum_assignment(cost_matrix)

best_acquisitions = [None] * len(session_df)
best_groups = [None] * len(session_df)
best_series = [None] * len(session_df)
best_scores = [None] * len(session_df) # Use NaN for unmatched scores

for row_idx, col_idx in zip(row_indices, col_indices):
_, acq_name, group_name, score = possible_assignments[row_idx][col_idx]
_, acq_name, series_name, score = possible_assignments[row_idx][col_idx]
best_acquisitions[row_idx] = acq_name
best_groups[row_idx] = group_name
best_series[row_idx] = series_name
best_scores[row_idx] = score if acq_name else None # Only assign score if acquisition is matched

return best_acquisitions, best_groups, best_scores
return best_acquisitions, best_series, best_scores


def read_session(reference_json, session_dir):
Expand All @@ -110,21 +110,21 @@ def read_session(reference_json, session_dir):
"fields": {field["field"]: field.get("value", field.get("contains")) for field in acquisition.get("fields", [])},
"tolerance": {field["field"]: field["tolerance"] for field in acquisition.get("fields", []) if "tolerance" in field},
"contains": {field["field"]: field["contains"] for field in acquisition.get("fields", []) if "contains" in field},
"groups": [
"series": [
{
"name": group["name"],
"fields": {field["field"]: field.get("value", field.get("contains")) for field in group.get("fields", [])},
"tolerance": {field["field"]: field["tolerance"] for field in group.get("fields", []) if "tolerance" in field},
"contains": {field["field"]: field["contains"] for field in group.get("fields", []) if "contains" in field}
"name": series["name"],
"fields": {field["field"]: field.get("value", field.get("contains")) for field in series.get("fields", [])},
"tolerance": {field["field"]: field["tolerance"] for field in series.get("fields", []) if "tolerance" in field},
"contains": {field["field"]: field["contains"] for field in series.get("fields", []) if "contains" in field}
}
for group in acquisition.get("groups", [])
for series in acquisition.get("series", [])
]
}
for acq_name, acquisition in reference_data.get("acquisitions", {}).items()
]

all_fields = {field for acq in acquisitions_info for field in acq["fields"].keys()}
all_fields.update({field for acq in acquisitions_info for group in acq["groups"] for field in group["fields"].keys()})
all_fields.update({field for acq in acquisitions_info for series in acq["series"] for field in series["fields"].keys()})

session_data = []
for root, _, files in os.walk(session_dir):
Expand Down Expand Up @@ -160,18 +160,18 @@ def read_session(reference_json, session_dir):
.reset_index()
)

acquisitions, groups, scores = find_closest_matches(series_count_df, acquisitions_info)
acquisitions, series, scores = find_closest_matches(series_count_df, acquisitions_info)

series_count_df["Acquisition"] = acquisitions
series_count_df["Group"] = groups
series_count_df["Series"] = series
series_count_df["Match_Score"] = scores

series_count_df.sort_values(["Acquisition", "Group", "Match_Score"], inplace=True)
series_count_df.sort_values(["Acquisition", "Series", "Match_Score"], inplace=True)

return series_count_df

def main():
parser = argparse.ArgumentParser(description="Map a DICOM session directory to a JSON reference file and print the closest acquisition and group matches.")
parser = argparse.ArgumentParser(description="Map a DICOM session directory to a JSON reference file and print the closest acquisition and series matches.")
parser.add_argument("--ref", required=True, help="Path to the reference JSON file.")
parser.add_argument("--session_dir", required=True, help="Directory containing DICOM files for the session.")
args = parser.parse_args()
Expand Down
Loading

0 comments on commit 58d3aa8

Please sign in to comment.