From 991757520f6d67335ba1c66e17356deff29440ab Mon Sep 17 00:00:00 2001 From: Brad Macdonald <52762200+BWMac@users.noreply.github.com> Date: Tue, 28 May 2024 11:21:52 -0600 Subject: [PATCH] [ORCA-291] LENS DAG (#30) * updates LENS DAG * updates s3_prefix passing * updates s3_prefix * updates synstage and synindex configs * updates lens dag * updates LENS DAG with config * formatting --- local/iatlas/lens/lens.py | 117 +++++++++++++++++++++++++++++++------- 1 file changed, 96 insertions(+), 21 deletions(-) diff --git a/local/iatlas/lens/lens.py b/local/iatlas/lens/lens.py index 6343097..dd23a25 100644 --- a/local/iatlas/lens/lens.py +++ b/local/iatlas/lens/lens.py @@ -1,14 +1,26 @@ #!/usr/bin/env python3 """ -LENS workflow metaflow DAG ***This recipe is a WIP*** +LENS workflow metaflow DAG. + +Takes two parameters: + + dataset_id: Synapse ID of dataset file to be processed. + s3_prefix: S3 prefix for parent S3 bucket where workflow files will be stored. + +Example usage: + +``` + python3 local/iatlas/lens/lens.py run \ + --dataset_id syn58366876 \ + --s3_prefix s3://iatlas-project-tower-bucket/LENS \ +``` based on https://github.com/Sage-Bionetworks-Workflows/py-orca/blob/main/demo.py """ import asyncio from dataclasses import dataclass -from pathlib import PurePosixPath import s3fs import yaml @@ -24,27 +36,65 @@ class LENSDataset: Attributes: id: Unique dataset identifier. - samplesheet: Synapse ID for nf-core/rnaseq CSV samplesheet. + samplesheet: Synapse ID for CSV samplesheet to be used for Synstage and LENS. + output_folder: Synapse ID for Synindex output folder. """ id: str samplesheet: str + output_folder: str def get_run_name(self, suffix: str) -> str: """Generate run name with given suffix.""" return f"{self.id}_{suffix}" - def synstage_info(self, samplesheet_uri: str) -> LaunchInfo: + def synstage_info( + self, samplesheet_uri: str, s3_prefix: str, stage_key: str + ) -> LaunchInfo: """Generate LaunchInfo for nf-synstage.""" run_name = self.get_run_name("synstage") return LaunchInfo( run_name=run_name, - pipeline="Sage-Bionetworks-Workflows/nf-synstage", + pipeline="Sage-Bionetworks-Workflows/nf-synapse", revision="main", - profiles=["sage"], + entry_name="NF_SYNSTAGE", params={ "input": samplesheet_uri, - "outdir": "s3://iatlas-project-tower-bucket/LENS/synstage", # manually load to persistent bucket for now + "save_strategy": "flat", + "outdir": f"{s3_prefix}/{stage_key}", + }, + workspace_secrets=["SYNAPSE_AUTH_TOKEN"], + ) + + def lens_info( + self, samplesheet_uri: str, s3_prefix: str, stage_key: str + ) -> LaunchInfo: + """Generate LaunchInfo for LENS.""" + run_name = self.get_run_name("LENS") + return LaunchInfo( + run_name=run_name, + pipeline="https://gitlab.com/landscape-of-effective-neoantigens-software/lens_for_nf_tower", + revision="sage", + params={ + "lens_dir": s3_prefix, + "fq_dir": f"{s3_prefix}/{stage_key}", + "global_fq_dir": f"{s3_prefix}/{stage_key}", + "output_dir": f"{s3_prefix}/{stage_key}_outputs", + "manifest_path": samplesheet_uri, + }, + workspace_secrets=["SYNAPSE_AUTH_TOKEN"], + ) + + def synindex_info(self, s3_prefix: str, stage_key: str) -> LaunchInfo: + """Generate LaunchInfo for nf-synindex.""" + return LaunchInfo( + run_name=self.get_run_name("synindex"), + pipeline="Sage-Bionetworks-Workflows/nf-synapse", + revision="main", + entry_name="NF_SYNINDEX", + params={ + "s3_prefix": f"{s3_prefix}/{stage_key}_outputs", + "parent_id": self.output_folder, }, workspace_secrets=["SYNAPSE_AUTH_TOKEN"], ) @@ -57,17 +107,16 @@ class TowerLENSFlow(FlowSpec): synapse = SynapseOps() s3 = s3fs.S3FileSystem() - # Parameters dataset_id = Parameter( "dataset_id", type=str, - help="Synapse ID for a YAML file describing an RNA-seq dataset", + help="Synapse ID for a YAML file describing the dataset", ) s3_prefix = Parameter( "s3_prefix", type=str, - help="S3 prefix for storing output files from different runs", + help="S3 prefix for parent S3 bucket where workflow files will be stored.", ) def monitor_workflow(self, workflow_id): @@ -90,19 +139,14 @@ def load_dataset(self): with self.synapse.fs.open(self.dataset_id, "r") as fp: kwargs = yaml.safe_load(fp) self.dataset = LENSDataset(**kwargs) - self.next(self.get_lens_outdir) - - @step - def get_lens_outdir(self): - """Generate output directory for LENS.""" - run_name = self.dataset.get_run_name("LENS") - self.lens_outdir = f"{self.s3_prefix}/{run_name}" self.next(self.transfer_samplesheet_to_s3) @step def transfer_samplesheet_to_s3(self): """Transfer raw samplesheet from Synapse to Tower S3 bucket.""" - self.samplesheet_uri = f"{self.s3_prefix}/{self.dataset.id}.csv" + self.samplesheet_uri = ( + f"{self.s3_prefix}/{self.dataset.id}/{self.dataset.id}.csv" + ) sheet_contents = self.synapse.fs.readtext(self.dataset.samplesheet) self.s3.write_text(self.samplesheet_uri, sheet_contents) self.next(self.launch_synstage) @@ -110,7 +154,10 @@ def transfer_samplesheet_to_s3(self): @step def launch_synstage(self): """Launch nf-synstage to stage Synapse files in samplesheet.""" - launch_info = self.dataset.synstage_info(self.samplesheet_uri) + launch_info = self.dataset.synstage_info( + self.samplesheet_uri, self.s3_prefix, self.dataset_id + ) + self.lens_manifest_uri = f"{self.s3_prefix}/{self.dataset.id}/{launch_info.run_name}/{self.dataset.id}.csv" self.synstage_id = self.tower.launch_workflow(launch_info, "spot") self.next(self.monitor_synstage) @@ -118,6 +165,34 @@ def launch_synstage(self): def monitor_synstage(self): """Monitor nf-synstage workflow run (wait until done).""" self.monitor_workflow(self.synstage_id) + self.next(self.launch_lens) + + @step + def launch_lens(self): + """Launch LENS workflow.""" + launch_info = self.dataset.lens_info( + self.lens_manifest_uri, self.s3_prefix, self.dataset_id + ) + self.LENS_id = self.tower.launch_workflow(launch_info, "spot") + self.next(self.monitor_lens) + + @step + def monitor_lens(self): + """Monitor LENS workflow run (wait until done).""" + self.monitor_workflow(self.LENS_id) + self.next(self.launch_synindex) + + @step + def launch_synindex(self): + """Launch nf-synindex to index S3 files back into Synapse.""" + launch_info = self.dataset.synindex_info(self.s3_prefix, self.dataset_id) + self.synindex_id = self.tower.launch_workflow(launch_info, "spot") + self.next(self.monitor_synindex) + + @step + def monitor_synindex(self): + """Monitor nf-synindex workflow run (wait until done).""" + self.monitor_workflow(self.synindex_id) self.next(self.end) @step @@ -125,9 +200,9 @@ def end(self): """End point.""" print(f"Completed processing {self.dataset}") print(f"synstage workflow ID: {self.synstage_id}") + print(f"LENS workflow ID: {self.LENS_id}") + print(f"synindex workflow ID: {self.synindex_id}") if __name__ == "__main__": TowerLENSFlow() - -# run with: python3 local/lens.py run --dataset_id syn51753796 --s3_prefix s3://iatlas-project-tower-bucket/LENS