From 0e3821b5520d1e7e52731945739fe680dfca44f0 Mon Sep 17 00:00:00 2001 From: lauraporta Date: Wed, 4 Dec 2024 18:17:31 +0000 Subject: [PATCH] Remove outdated scripts --- calcium_imaging_automation/core/app.py | 50 ------- calcium_imaging_automation/core/pipeline.py | 138 -------------------- examples/debugging.py | 26 ---- 3 files changed, 214 deletions(-) delete mode 100644 calcium_imaging_automation/core/app.py delete mode 100644 calcium_imaging_automation/core/pipeline.py delete mode 100644 examples/debugging.py diff --git a/calcium_imaging_automation/core/app.py b/calcium_imaging_automation/core/app.py deleted file mode 100644 index 6ca2b60..0000000 --- a/calcium_imaging_automation/core/app.py +++ /dev/null @@ -1,50 +0,0 @@ -import argparse -from pathlib import Path - -from calcium_imaging_automation.core.pipeline import orchestrator - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description="Example usage of the pipeline manager." - ) - - parser.add_argument( - "raw_data_path", type=Path, help="Path to the raw data." - ) - parser.add_argument( - "output_path", type=Path, help="Path to the output data." - ) - parser.add_argument( - "--folder_read_pattern", - type=str, - help="Glob pattern for reading folder.", - default="*", - ) - parser.add_argument( - "--file_read_pattern", - type=str, - help="List of glob patterns for reading files.", - action="append", - ) - parser.add_argument( - "--experiment_name", - type=str, - help="Name of the experiment.", - default="pipeline_test", - ) - parser.add_argument( - "--compute_metric", - type=Path, - help="Path to the suite2p ops file.", - ) - - args = parser.parse_args() - - orchestrator( - args.raw_data_path, - args.output_path, - args.folder_read_pattern, - args.file_read_pattern, - args.experiment_name, - args.compute_metric, - ) diff --git a/calcium_imaging_automation/core/pipeline.py b/calcium_imaging_automation/core/pipeline.py deleted file mode 100644 index 0270c9a..0000000 --- a/calcium_imaging_automation/core/pipeline.py +++ /dev/null @@ -1,138 +0,0 @@ -import datetime -import logging -import time -from pathlib import Path -from typing import Callable, List - -import pandas as pd -import submitit -from submitit import AutoExecutor - -from calcium_imaging_automation.core.reader import ReadAquiredData -from calcium_imaging_automation.core.writer import DatashuttleWrapper - - -def orchestrator( - raw_data_path: Path, - output_path: Path, - folder_read_pattern: str, - file_read_pattern: List[str], - preprocessing_function: Callable, - compute_metric: Callable, - experiment_name: str = "pipeline_test", -): - # --- Setup logging and MLflow --- - logging_setup(output_path) - - # mkdir for submitit logs submitit / timestamp - (output_path / "submitit").mkdir(exist_ok=True) - - # --- Read folders and files --- - reader = ReadAquiredData( - raw_data_path, - folder_read_pattern, - file_read_pattern, - ) - logging.info(f"Found {len(reader.datasets_paths)} datasets.") - logging.info(f"Dataset names: {reader.dataset_names}") - - number_of_tiffs = reader.max_session_number(filetype="tif") - logging.info(f"Max of tiffs found: {number_of_tiffs}") - - # --- Write folders and files --- - writer = DatashuttleWrapper(output_path) - writer.create_folders(reader.dataset_names, session_number=number_of_tiffs) - - # --- Start processing --- - results, errors = launch_job_array( - datasets=reader.datasets_paths, - output_path=output_path, - analysis_pipeline=analysis_pipeline, - writer=writer, - preprocessing_function=preprocessing_function, - compute_metric=compute_metric, - ) - - # save the results and errors as csv - results_df = pd.DataFrame(results) - results_df.to_csv(output_path / "results.csv") - errors_df = pd.DataFrame(errors) - errors_df.to_csv(output_path / "errors.csv") - - logging.info("Pipeline finished.") - - -def launch_job_array( - datasets, - output_path, - analysis_pipeline, - writer, - preprocessing_function, - compute_metric, -): - executor = AutoExecutor(folder=output_path / "submitit") - executor.update_parameters( - timeout_min=30, - slurm_partition="fast", - cpus_per_task=1, - tasks_per_node=1, - slurm_mem="16G", - slurm_array_parallelism=20, - ) - - logging.info(f"Running {len(datasets)} jobs.") - jobs = executor.map_array( - analysis_pipeline, - datasets, - [writer.get_dataset_path(dataset.stem) for dataset in datasets], - [preprocessing_function] * len(datasets), - [compute_metric] * len(datasets), - ) - - results = [] - errors = [] - for job in jobs: - while not job.done(): - time.sleep(10) - try: - results.append(job.result()) - errors.append(None) - except submitit.core.utils.FailedJobError as e: - logging.error(f"Job {job.job_id} failed: {e}") - results.append(None) - errors.append(job.stderr()) - - return results, errors - - -def analysis_pipeline( - dataset, output_path_dataset, preprocessing_function, compute_metric -): - import os - - os.system("module load miniconda") - os.system("source activate /nfs/nhome/live/lporta/.conda/envs/cimat") - output_path_dataset = output_path_dataset / "ses-0/funcimg/" - try: - data = preprocessing_function(dataset, output_path_dataset) - metric_measured = compute_metric(data) - with open(output_path_dataset / "metric.txt", "w") as f: - f.write(str(metric_measured)) - except Exception as e: - with open(output_path_dataset / "error.txt", "w") as f: - f.write(str(e.args)) - return metric_measured - - -def logging_setup(output_path: Path): - # --- Setup experiment-wide logging to file --- - (output_path / "logs").mkdir(exist_ok=True) - logging.basicConfig( - filename=str( - output_path - / "logs" - / f"{datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}.log" - ), - level=logging.INFO, - format="%(asctime)s - %(message)s", - ) diff --git a/examples/debugging.py b/examples/debugging.py deleted file mode 100644 index c0fca25..0000000 --- a/examples/debugging.py +++ /dev/null @@ -1,26 +0,0 @@ -import shutil -from pathlib import Path - -from derotation.analysis.metrics import stability_of_most_detected_blob -from derotation.derotate_batch import derotate - -from calcium_imaging_automation.core.pipeline import orchestrator - -try: - shutil.rmtree("/ceph/margrie/laura/cimaut/derivatives/") - shutil.rmtree("/ceph/margrie/laura/cimaut/submitit/") -except FileNotFoundError: - print("No derivatives folder found") - -orchestrator( - raw_data_path=Path( - "/nfs/winstor/margrie/SimonWeiler/RawData/Invivo_imaging/3photon_rotation/shared/" - ), - output_path=Path("/ceph/margrie/laura/cimaut/"), - folder_read_pattern="2*", - file_read_pattern=["rotation_00001.tif", "*.bin"], - experiment_name="submitit_04", - preprocessing_function=derotate, - compute_metric=stability_of_most_detected_blob, - # suite2p_ops_path="/ceph/margrie/laura/derotation/suite2p/laura_ops.npy", -)