Skip to content

Commit

Permalink
Use with real data and add integration with submitit
Browse files Browse the repository at this point in the history
  • Loading branch information
lauraporta committed Nov 26, 2024
1 parent ca90cfe commit 4720086
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 72 deletions.
10 changes: 8 additions & 2 deletions calcium_imaging_automation/core/app.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import argparse
from pathlib import Path

from calcium_imaging_automation.core.pipeline import pipeline
from calcium_imaging_automation.core.pipeline import mlflow_orchestrator

if __name__ == "__main__":
parser = argparse.ArgumentParser(
Expand Down Expand Up @@ -32,13 +32,19 @@
help="Name of the experiment.",
default="pipeline_test",
)
parser.add_argument(
"--compute_metric",
type=Path,
help="Path to the suite2p ops file.",
)

args = parser.parse_args()

pipeline(
mlflow_orchestrator(
args.raw_data_path,
args.output_path,
args.folder_read_pattern,
args.file_read_pattern,
args.experiment_name,
args.compute_metric,
)
176 changes: 106 additions & 70 deletions calcium_imaging_automation/core/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,34 @@
import datetime
import logging
import time
from pathlib import Path
from typing import List
from typing import Callable, List

import mlflow
import numpy as np
import setuptools_scm
import submitit
from submitit import AutoExecutor

from calcium_imaging_automation.core.reader import ReadAquiredData
from calcium_imaging_automation.core.writer import DatashuttleWrapper


def pipeline(
def mlflow_orchestrator(
raw_data_path: Path,
output_path: Path,
folder_read_pattern: str,
file_read_pattern: List[str],
preprocessing_function: Callable,
compute_metric: Callable,
experiment_name: str = "pipeline_test",
):
# --- Setup logging and MLflow ---
logging_setup(output_path)
mlflow_setup(output_path)

# mkdir for submitit logs submitit / timestamp
(output_path / "submitit").mkdir(exist_ok=True)

# --- Read folders and files ---
reader = ReadAquiredData(
raw_data_path,
Expand All @@ -39,68 +46,97 @@ def pipeline(
writer.create_folders(reader.dataset_names, session_number=number_of_tiffs)

# --- Start processing ---
for dataset in reader.datasets_paths:
dataset_name = dataset.stem

for session in range(0, number_of_tiffs):
mlflow_set_experiment(experiment_name, dataset_name, session)

# Generate mock data
data = np.random.rand(100, 100)

# Start a new MLflow experiment for each dataset-session
with mlflow.start_run(): # this is the parent run
mlflow_parent_run_logs(
dataset_name,
session,
raw_data_path,
output_path,
folder_read_pattern,
file_read_pattern,
)

logging.info(
f"Starting MLflow experiment for dataset {dataset_name} "
+ f"session {session}..."
)

# Mock processing for different runs within the experiment
for i in range(0, 10): # n runs with varying parameters
# Start a child run under the main dataset-session run
with mlflow.start_run(nested=True):
# Mock metric calculation
metric_measured = np.mean(data) * i

# Log the generated data as an artifact if desired
# Here, simulate an image or data file save path
image_path = writer.save_image(
image=data,
dataset_name=dataset_name,
session_number=session,
filename=f"image_{mlflow.active_run().info.run_id}.png",
)

mlflow_log_run(
i,
dataset_name,
session,
metric_measured,
image_path,
)

logging.info(
f"Completed MLflow run iteration {i} for dataset "
+ f"{dataset_name} session {session}"
)

logging.info(
f"Completed MLflow experiment for dataset {dataset_name}"
+ f" session {session}"
)
results, errors = launch_job_array(
datasets=reader.datasets_paths,
output_path=output_path,
analysis_pipeline=analysis_pipeline,
writer=writer,
preprocessing_function=preprocessing_function,
compute_metric=compute_metric,
)

# --- Log all results with MLflow ---
for dataset, result, error in zip(reader.dataset_names, results, errors):
mlflow_set_experiment(experiment_name, dataset, 0)

with mlflow.start_run():
mlflow_parent_run_logs(
dataset,
0,
raw_data_path,
output_path,
folder_read_pattern,
file_read_pattern,
)

# log error if any
if error:
mlflow.log_param("error", error)

if result:
mlflow.log_metric("stability", result)

mlflow.end_run()

logging.info("Pipeline finished.")


def launch_job_array(
datasets,
output_path,
analysis_pipeline,
writer,
preprocessing_function,
compute_metric,
):
executor = AutoExecutor(folder=output_path / "submitit")
executor.update_parameters(
timeout_min=30,
slurm_partition="fast",
cpus_per_task=1,
tasks_per_node=1,
slurm_mem="16G",
slurm_array_parallelism=20,
)

logging.info(f"Running {len(datasets)} jobs.")
jobs = executor.map_array(
analysis_pipeline,
datasets,
[writer.get_dataset_path(dataset.stem) for dataset in datasets],
[preprocessing_function] * len(datasets),
[compute_metric] * len(datasets),
)

results = []
errors = []
for job in jobs:
while not job.done():
time.sleep(10)
try:
results.append(job.result())
errors.append(None)
except submitit.core.utils.FailedJobError as e:
logging.error(f"Job {job.job_id} failed: {e}")
results.append(None)
errors.append(job.stderr())

return results, errors


def analysis_pipeline(
dataset, output_path_dataset, preprocessing_function, compute_metric
):
import os

os.system("module load miniconda")
os.system("source activate /nfs/nhome/live/lporta/.conda/envs/cimat")
output_path_dataset = output_path_dataset / "ses-0/funcimg/"
data = preprocessing_function(dataset, output_path_dataset)
metric_measured = compute_metric(data)
return metric_measured


def logging_setup(output_path: Path):
# --- Setup experiment-wide logging to file ---
(output_path / "logs").mkdir(exist_ok=True)
Expand Down Expand Up @@ -156,7 +192,7 @@ def mlflow_log_run(
dataset_name: str,
session: int,
metric_measured: float,
image_path: Path,
# image_path: Path,
):
# give specific name to the run
mlflow.set_tag("mlflow.runName", f"param_{i}")
Expand All @@ -165,11 +201,11 @@ def mlflow_log_run(
mlflow.log_param("data_size", f"{i * 10}x100")
mlflow.log_param("run_iteration", i)
mlflow.log_param("run_id", mlflow.active_run().info.run_id)
mlflow.log_metric("metric_measured", metric_measured)

mlflow.log_artifact(
# where I am storing the image according to Neuroblueprint
# I think it gets copied in the mlflow data structure
image_path,
artifact_path=f"{dataset_name}/session_{session}/run_{i}",
)
mlflow.log_metric("stability", metric_measured)

# mlflow.log_artifact(
# # where I am storing the image according to Neuroblueprint
# # I think it gets copied in the mlflow data structure
# image_path,
# artifact_path=f"{dataset_name}/session_{session}/run_{i}",
# )
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies = [
"setuptools_scm",
"mlflow",
"numpy",
"submitit",
]

license = {text = "BSD-3-Clause"}
Expand Down

0 comments on commit 4720086

Please sign in to comment.