From 2c32f382d8a2bdb09ae33e62ed5fc77c7c9393e7 Mon Sep 17 00:00:00 2001 From: Ashley Stewart Date: Mon, 18 Nov 2024 16:21:06 -0600 Subject: [PATCH] v0.1.3 --- dcm_check/__init__.py | 2 +- dcm_check/compliance_check.py | 22 +++++++------ dcm_check/dcm_gen_session.py | 60 ++++++++++++++++++++++++----------- 3 files changed, 55 insertions(+), 29 deletions(-) diff --git a/dcm_check/__init__.py b/dcm_check/__init__.py index 9c2099f..8a9e3b4 100644 --- a/dcm_check/__init__.py +++ b/dcm_check/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.1.3" +__version__ = "0.1.4" from .compliance_check import \ get_dicom_values, \ diff --git a/dcm_check/compliance_check.py b/dcm_check/compliance_check.py index 328046d..bd83b57 100644 --- a/dcm_check/compliance_check.py +++ b/dcm_check/compliance_check.py @@ -9,6 +9,7 @@ from pydicom.uid import UID from pydicom.valuerep import PersonName, DSfloat, IS from pydantic_core import PydanticUndefined +from io import BytesIO def get_dicom_values(ds: pydicom.dataset.FileDataset) -> Dict[str, Any]: """Convert a DICOM dataset to a dictionary, handling sequences and DICOM-specific data types. @@ -23,7 +24,7 @@ def get_dicom_values(ds: pydicom.dataset.FileDataset) -> Dict[str, Any]: def process_element(element): if element.VR == 'SQ': - return [get_dicom_values(item) for item in element] # TODO TEST THIS + return [get_dicom_values(item) for item in element] elif isinstance(element.value, MultiValue): return list(element.value) elif isinstance(element.value, (UID, PersonName)): @@ -35,26 +36,29 @@ def process_element(element): elif isinstance(element.value, (int, float)): return element.value else: - return str(element.value[:50]) + return str(element.value)[:50] for element in ds: - # skip pixel data - if element.tag == 0x7fe00010: + if element.tag == 0x7fe00010: # skip pixel data continue dicom_dict[element.keyword] = process_element(element) - + return dicom_dict -def load_dicom(dicom_file: str) -> Dict[str, Any]: - """Load a DICOM file and extract the values as a dictionary. +def load_dicom(dicom_file: Union[str, bytes]) -> Dict[str, Any]: + """Load a DICOM file from a path or bytes and extract values as a dictionary. Args: - dicom_file (str): Path to the DICOM file to load. + dicom_file (Union[str, bytes]): Path to the DICOM file or file content as bytes. Returns: dicom_values (Dict[str, Any]): A dictionary of DICOM values. """ - ds = pydicom.dcmread(dicom_file) + if isinstance(dicom_file, bytes): + ds = pydicom.dcmread(BytesIO(dicom_file)) + else: + ds = pydicom.dcmread(dicom_file) + return get_dicom_values(ds) def create_reference_model(reference_values: Dict[str, Any], fields_config: List[Union[str, Dict[str, Any]]]) -> BaseModel: diff --git a/dcm_check/dcm_gen_session.py b/dcm_check/dcm_gen_session.py index 17ec27d..601d0ca 100755 --- a/dcm_check/dcm_gen_session.py +++ b/dcm_check/dcm_gen_session.py @@ -4,8 +4,8 @@ import json import os import sys +from typing import Optional, Dict from dcm_check import load_dicom -from collections import defaultdict import pandas as pd class MissingFieldDict(dict): @@ -13,27 +13,49 @@ class MissingFieldDict(dict): def __missing__(self, key): return "N/A" -def generate_json_ref(in_session_dir, acquisition_fields, reference_fields, name_template): +def generate_json_ref( + in_session_dir: Optional[str] = None, + acquisition_fields=None, + reference_fields=None, + name_template="{ProtocolName}-{SeriesDescription}", + dicom_files: Optional[Dict[str, bytes]] = None +): + """Generate a JSON reference for DICOM compliance. + + Args: + in_session_dir (Optional[str]): Directory containing DICOM files for the session. + acquisition_fields (list): Fields to uniquely identify each acquisition. + reference_fields (list): Fields to include in JSON reference with their values. + name_template (str): Naming template for each acquisition series. + dicom_files (Optional[Dict[str, bytes]]): In-memory dictionary of DICOM files. + + Returns: + output (dict): JSON structure with acquisition data. + """ acquisitions = {} dicom_data = [] - print(f"Generating JSON reference for DICOM files in {in_session_dir}") - - # Walk through all files in the specified session directory - for root, _, files in os.walk(in_session_dir): - for file in files: - if not (file.endswith(".dcm") or file.endswith(".IMA")): - continue # Skip non-DICOM files - - dicom_path = os.path.join(root, file) - dicom_values = load_dicom(dicom_path) - - # Store the data for easier handling with pandas - dicom_entry = {field: dicom_values.get(field, "N/A") for field in acquisition_fields + reference_fields} - dicom_entry['dicom_path'] = dicom_path - dicom_data.append(dicom_entry) - - # Convert collected DICOM data to a DataFrame + # Process either in_session_dir or dicom_files + if dicom_files is not None: + print(f"Generating JSON reference for provided DICOM files") + files_to_process = dicom_files.items() + elif in_session_dir: + print(f"Generating JSON reference for DICOM files in {in_session_dir}") + files_to_process = [ + (os.path.join(root, file), None) for root, _, files in os.walk(in_session_dir) + for file in files if file.endswith((".dcm", ".IMA")) + ] + else: + raise ValueError("Either in_session_dir or dicom_files must be provided.") + + # Load and process each DICOM file + for dicom_path, dicom_content in files_to_process: + dicom_values = load_dicom(dicom_content or dicom_path) + dicom_entry = {field: dicom_values.get(field, "N/A") for field in acquisition_fields + reference_fields} + dicom_entry['dicom_path'] = dicom_path + dicom_data.append(dicom_entry) + + # Convert collected DICOM data to a DataFrame and proceed as before dicom_df = pd.DataFrame(dicom_data) # Handle list-type entries for duplicate detection