From 0c04e94d10484499f13c90cf35fa806fbeb69c23 Mon Sep 17 00:00:00 2001 From: Holger Roth Date: Fri, 21 Feb 2025 15:58:31 -0500 Subject: [PATCH] formatting --- .../bionemo/downstream/bionemo_filters.py | 31 +++---- .../bionemo/downstream/finetune_esm2.py | 73 ++++++++--------- .../downstream/sabdab/prepare_sabdab_data.py | 2 +- .../downstream/sabdab/run_sim_sabdab.py | 70 ++++++++-------- .../bionemo/downstream/scl/run_sim_scl.py | 68 +++++++++------- .../downstream/tap/prepare_tap_data.py | 2 +- .../bionemo/downstream/tap/run_sim_tap.py | 81 +++++++++++-------- .../src/bionemo_inference_processor.py | 7 +- .../task_fitting/src/bionemo_launcher.py | 38 +++++---- .../task_fitting/src/bionemo_mlp_job.py | 11 +-- .../task_fitting/src/bionemo_mlp_learner.py | 7 +- .../src/bionemo_mlp_model_persistor.py | 2 +- 12 files changed, 193 insertions(+), 199 deletions(-) diff --git a/examples/advanced/bionemo/downstream/bionemo_filters.py b/examples/advanced/bionemo/downstream/bionemo_filters.py index d207c1f3b0..92f4ff71e9 100644 --- a/examples/advanced/bionemo/downstream/bionemo_filters.py +++ b/examples/advanced/bionemo/downstream/bionemo_filters.py @@ -13,19 +13,15 @@ # limitations under the License. from typing import Union -from torch import Tensor -from nvflare.apis.dxo import DXO, DataKind, MetaKey +from nvflare.apis.dxo import DXO, DataKind from nvflare.apis.dxo_filter import DXOFilter from nvflare.apis.fl_context import FLContext from nvflare.apis.shareable import Shareable class BioNeMoParamsFilter(DXOFilter): - def __init__( - self, - precision="bf16-mixed" - ): + def __init__(self, precision="bf16-mixed"): """Filter to add a prefix to global state dict to avoid key mismatches between global and local state dictionaries. This is needed because of NeMo training framework adding module wrappers depending on the used training precision. @@ -60,18 +56,15 @@ def process_dxo(self, dxo: DXO, shareable: Shareable, fl_ctx: FLContext) -> Unio params = dxo.data new_params = {} for k, v in params.items(): - new_key = self._prefix + k - new_params[new_key] = v - + new_key = self._prefix + k + new_params[new_key] = v + dxo.data = new_params return dxo class BioNeMoExcludeParamsFilter(DXOFilter): - def __init__( - self, - exclude_vars="head" - ): + def __init__(self, exclude_vars="head"): """Filter to remove parameters from state dictionary that shouldn't be shared with other party. Args: @@ -84,7 +77,6 @@ def __init__( self.exclude_vars = exclude_vars - def process_dxo(self, dxo: DXO, shareable: Shareable, fl_ctx: FLContext) -> Union[None, DXO]: """Filter process apply to the Shareable object. @@ -100,14 +92,13 @@ def process_dxo(self, dxo: DXO, shareable: Shareable, fl_ctx: FLContext) -> Unio params = dxo.data new_params = {} for k, v in params.items(): - if self.exclude_vars not in k: - new_params[k] = v - + if self.exclude_vars not in k: + new_params[k] = v + if len(new_params) < len(params): self.log_info(fl_ctx, f"Excluded {len(params)-len(new_params)} parameters matching '{self.exclude_vars}'") else: - raise ValueError(f"State dictionary did not match any exclude keys that matched '{self.exclude_vars}'") - + raise ValueError(f"State dictionary did not match any exclude keys that matched '{self.exclude_vars}'") + dxo.data = new_params return dxo - diff --git a/examples/advanced/bionemo/downstream/finetune_esm2.py b/examples/advanced/bionemo/downstream/finetune_esm2.py index a9556f569e..7ca0acea98 100644 --- a/examples/advanced/bionemo/downstream/finetune_esm2.py +++ b/examples/advanced/bionemo/downstream/finetune_esm2.py @@ -15,39 +15,29 @@ # Copied and adapted for NVFlare from https://github.com/NVIDIA/bionemo-framework/blob/main/sub-packages/bionemo-esm2/src/bionemo/esm2/scripts/finetune_esm2.py import shutil -import argparse -import random from pathlib import Path -from lightning import seed_everything -from typing import Dict, List, Optional, Sequence, Tuple, Type, get_args - -from lightning.pytorch.callbacks import Callback, LearningRateMonitor, RichModelSummary -from megatron.core.distributed import DistributedDataParallelConfig -from megatron.core.optimizer import OptimizerConfig -from nemo import lightning as nl -from nemo.collections import llm -from nemo.lightning import resume -from nemo.lightning.pytorch import callbacks as nl_callbacks -from nemo.lightning.pytorch.optim import MegatronOptimizerModule +from typing import List, Optional, Tuple, Type from bionemo.core.utils.dtypes import PrecisionTypes, get_autocast_dtype from bionemo.esm2.data.tokenizer import get_tokenizer from bionemo.esm2.model.finetune.datamodule import ESM2FineTuneDataModule -from bionemo.esm2.model.finetune.dataset import ( - InMemoryPerTokenValueDataset, - InMemoryProteinDataset, - InMemorySingleValueDataset, -) +from bionemo.esm2.model.finetune.dataset import InMemoryProteinDataset, InMemorySingleValueDataset from bionemo.esm2.model.finetune.sequence_model import ESM2FineTuneSeqConfig -from bionemo.esm2.model.finetune.token_model import ESM2FineTuneTokenConfig + +# Resue parser and config constants from bionemo +from bionemo.esm2.scripts.finetune_esm2 import get_parser from bionemo.llm.model.biobert.lightning import biobert_lightning_module from bionemo.llm.model.biobert.model import BioBertConfig from bionemo.llm.model.config import TorchmetricsConfig -from bionemo.llm.utils.datamodule_utils import float_or_int_or_none, infer_global_batch_size +from bionemo.llm.utils.datamodule_utils import infer_global_batch_size from bionemo.llm.utils.logger_utils import WandbConfig, setup_nemo_lightning_logger - -# Resue parser and config constants from bionemo -from bionemo.esm2.scripts.finetune_esm2 import get_parser, SUPPORTED_CONFIGS, SUPPORTED_DATASETS +from lightning.pytorch.callbacks import Callback, LearningRateMonitor, RichModelSummary +from megatron.core.distributed import DistributedDataParallelConfig +from megatron.core.optimizer import OptimizerConfig +from nemo import lightning as nl +from nemo.collections import llm +from nemo.lightning.pytorch import callbacks as nl_callbacks +from nemo.lightning.pytorch.optim import MegatronOptimizerModule # (1) import nvflare lightning client API import nvflare.client.lightning as flare @@ -111,7 +101,7 @@ def train_model( average_in_collective: bool = True, grad_reduce_in_fp32: bool = False, label_column: str = "labels", - classes: List[str] = None + classes: List[str] = None, ) -> Tuple[Path, Callback | None, nl.Trainer]: """Train an ESM2 model on UR data. @@ -265,7 +255,9 @@ def train_model( # because after flare.patch the trainer.fit/validate will get the # global model internally input_model = flare.receive() - print(f"\n[Current Round={input_model.current_round}, Site = {flare.get_site_name()}, Global model = {input_model} ({len(input_model.params)} params)]\n") + print( + f"\n[Current Round={input_model.current_round}, Site = {flare.get_site_name()}, Global model = {input_model} ({len(input_model.params)} params)]\n" + ) # use a unique result directory for each round # Remove previous checkpoints to preserve disk space @@ -274,21 +266,21 @@ def train_model( previous_ckpt_dir = result_dir / f"round{input_model.current_round-1}" / experiment_name / "dev" / "checkpoints" if previous_ckpt_dir.is_dir(): print(f"Removing previous checkpoint directory {previous_ckpt_dir}") - shutil.rmtree(previous_ckpt_dir) + shutil.rmtree(previous_ckpt_dir) # create output folder for this round result_dir = result_dir / f"round{input_model.current_round}" - + # add a learning rate decay for each round if input_model.current_round > 0: lr_step_reduce = 1.05 # TODO: make lr_step_reduce configurable - new_lr = lr/(input_model.current_round*lr_step_reduce) - new_lr_multiplier = lr_multiplier/(input_model.current_round*lr_step_reduce) + new_lr = lr / (input_model.current_round * lr_step_reduce) + new_lr_multiplier = lr_multiplier / (input_model.current_round * lr_step_reduce) print(f"Reduce lr {lr} by {input_model.current_round*lr_step_reduce}: {new_lr}") else: new_lr = lr - new_lr_multiplier = lr_multiplier - + new_lr_multiplier = lr_multiplier + # remaining bionemo training code tokenizer = get_tokenizer() @@ -302,7 +294,7 @@ def train_model( train_dataset.label_tokenizer.build_vocab([classes]) print(f"Build custom label tokenizer based on label classes: {classes}") valid_dataset.label_tokenizer = train_dataset.label_tokenizer - + data_module = ESM2FineTuneDataModule( train_dataset=train_dataset, valid_dataset=valid_dataset, @@ -379,7 +371,7 @@ def train_model( module = biobert_lightning_module(config=config, tokenizer=tokenizer, optimizer=optimizer) - #If client should save best local checkpoints, set to `save_local_ckpt=True`, + # If client should save best local checkpoints, set to `save_local_ckpt=True`, save_local_ckpt = False if save_local_ckpt: # Configure our custom Checkpointer @@ -393,7 +385,7 @@ def train_model( ) else: checkpoint_callback = None - + # Setup the logger and train the model nemo_logger = setup_nemo_lightning_logger( root_dir=result_dir, @@ -402,7 +394,7 @@ def train_model( wandb_config=wandb_config, ckpt_callback=checkpoint_callback, ) - + # perform local training starting with the received global model llm.train( model=module, @@ -410,7 +402,7 @@ def train_model( trainer=trainer, log=nemo_logger, resume=None, - ) + ) if checkpoint_callback: ckpt_path = Path(checkpoint_callback.last_model_path.replace(".ckpt", "")) @@ -431,7 +423,7 @@ def finetune_esm2_entrypoint(): required=False, default=None, help="Unique strings describing the classes for classification. Used to build the same label vocabulary on each client. Should be comma separate list of strings, e.g. 'pos,neg'", - ) + ) args = parser.parse_args() if args.classes: @@ -440,7 +432,6 @@ def finetune_esm2_entrypoint(): classes = args.classes.split(",") else: classes = None - # to avoid padding for single value labels: if args.min_seq_length is not None and args.datset_class is InMemorySingleValueDataset: @@ -503,10 +494,10 @@ def finetune_esm2_entrypoint(): average_in_collective=not args.no_average_in_collective, grad_reduce_in_fp32=args.grad_reduce_in_fp32, label_column=args.label_column, - classes=classes + classes=classes, ) - + + if __name__ == "__main__": finetune_esm2_entrypoint() flare.shutdown() - diff --git a/examples/advanced/bionemo/downstream/sabdab/prepare_sabdab_data.py b/examples/advanced/bionemo/downstream/sabdab/prepare_sabdab_data.py index 1c76420c3c..ccfc822e2b 100644 --- a/examples/advanced/bionemo/downstream/sabdab/prepare_sabdab_data.py +++ b/examples/advanced/bionemo/downstream/sabdab/prepare_sabdab_data.py @@ -118,7 +118,7 @@ def main(): if do_clean_chains: train_df = clean_chains(train_df) test_df = clean_chains(test_df) - + _split_dir = os.path.join(split_dir, "train") if not os.path.isdir(_split_dir): os.makedirs(_split_dir) diff --git a/examples/advanced/bionemo/downstream/sabdab/run_sim_sabdab.py b/examples/advanced/bionemo/downstream/sabdab/run_sim_sabdab.py index 37610bbc83..090775b1bf 100644 --- a/examples/advanced/bionemo/downstream/sabdab/run_sim_sabdab.py +++ b/examples/advanced/bionemo/downstream/sabdab/run_sim_sabdab.py @@ -13,28 +13,24 @@ # limitations under the License. import argparse -import logging +import os +import sys -from nvflare import FedJob, FilterType from bionemo.core.data.load import load + from nvflare import FilterType +from nvflare.app_common.launchers.subprocess_launcher import SubprocessLauncher from nvflare.app_common.workflows.fedavg import FedAvg from nvflare.app_opt.pt.job_config.base_fed_job import BaseFedJob -from nvflare.job_config.script_runner import ScriptRunner, BaseScriptRunner -from nvflare.app_common.launchers.subprocess_launcher import SubprocessLauncher +from nvflare.job_config.script_runner import BaseScriptRunner -import os -import pandas as pd -import sys -sys.path.append(os.path.join(os.getcwd(), "..")) # include parent folder in path +sys.path.append(os.path.join(os.getcwd(), "..")) # include parent folder in path from bionemo_filters import BioNeMoParamsFilter def main(args): # Create BaseFedJob with initial model - job = BaseFedJob( - name=f"{args.exp_name}_sabdab_esm2_{args.model}" - ) + job = BaseFedJob(name=f"{args.exp_name}_sabdab_esm2_{args.model}") # Define the controller and send to server controller = FedAvg( @@ -48,11 +44,11 @@ def main(args): # Define unique strings describing the classes for classification so we can use the same label vocabulary on each client. classes = "pos,neg" - + # Add clients for i in range(args.num_clients): client_name = f"site-{i+1}" - + # define data paths # We use the same validation set for each client to make their metrics comparable val_data_path = "/tmp/data/sabdab_chen/val/sabdab_chen_valid.csv" @@ -61,29 +57,32 @@ def main(args): assert args.num_clients == 1, "Use num_clients=1 for simulating 'central' training setting." assert args.num_rounds == 1, "Use num_rounds=1 for simulating 'central' training setting." train_data_path = "/tmp/data/sabdab_chen/train/sabdab_chen_full_train.csv" - val_check_interval = int(args.local_steps/20) # 20 times per training - else: # local or fedavg setting - train_data_path = f"/tmp/data/sabdab_chen/train/sabdab_chen_{client_name}_train.csv" + val_check_interval = int(args.local_steps / 20) # 20 times per training + else: # local or fedavg setting + train_data_path = f"/tmp/data/sabdab_chen/train/sabdab_chen_{client_name}_train.csv" if args.num_rounds > 1: val_check_interval = args.local_steps else: - val_check_interval = int(args.local_steps/20) # 20 times per training - + val_check_interval = int(args.local_steps / 20) # 20 times per training + # define training script arguments - #precision = "bf16-mixed" + # precision = "bf16-mixed" precision = "fp32" script_args = f"--restore-from-checkpoint-path {checkpoint_path} --train-data-path {train_data_path} --valid-data-path {val_data_path} --config-class ESM2FineTuneSeqConfig --dataset-class InMemorySingleValueDataset --task-type classification --mlp-ft-dropout 0.1 --mlp-hidden-size 256 --mlp-target-size 2 --experiment-name {job.name} --num-steps {args.local_steps} --num-gpus 1 --val-check-interval {val_check_interval} --log-every-n-steps 10 --lr 5e-4 --lr-multiplier 1e3 --scale-lr-layer classification_head --result-dir bionemo --micro-batch-size 64 --precision {precision} --save-top-k 1 --limit-val-batches 1.0 --classes {classes}" print(f"Running {args.train_script} with args: {script_args}") - + # Define training script runner - runner = BaseScriptRunner(script=args.train_script, - launch_external_process=True, - framework="pytorch", - params_exchange_format="pytorch", - launcher=SubprocessLauncher(script=f"python3 custom/{args.train_script} {script_args}", - launch_once=False)) + runner = BaseScriptRunner( + script=args.train_script, + launch_external_process=True, + framework="pytorch", + params_exchange_format="pytorch", + launcher=SubprocessLauncher(script=f"python3 custom/{args.train_script} {script_args}", launch_once=False), + ) job.to(runner, client_name) - job.to(BioNeMoParamsFilter(precision), client_name, tasks=["train", "validate"], filter_type=FilterType.TASK_DATA) + job.to( + BioNeMoParamsFilter(precision), client_name, tasks=["train", "validate"], filter_type=FilterType.TASK_DATA + ) job.export_job("./exported_jobs") job.simulator_run(f"/tmp/nvflare/bionemo/sabdab/{job.name}", gpu=args.sim_gpus) @@ -94,12 +93,19 @@ def main(args): parser.add_argument("--num_clients", type=int, help="Number of clients", required=False, default=1) parser.add_argument("--num_rounds", type=int, help="Number of rounds", required=False, default=30) parser.add_argument("--local_steps", type=int, help="Number of rounds", required=False, default=10) - parser.add_argument("--train_script", type=str, help="Training script", required=False, default="../finetune_esm2.py") + parser.add_argument( + "--train_script", type=str, help="Training script", required=False, default="../finetune_esm2.py" + ) parser.add_argument("--exp_name", type=str, help="Job name prefix", required=False, default="fedavg") parser.add_argument("--model", choices=["8m", "650m", "3b"], help="ESM2 model", required=False, default="8m") - parser.add_argument("--sim_gpus", type=str, help="GPU indexes to simulate clients, e.g., '0,1,2,3' if you want to run 4 clients, each on a separate GPU. By default run all clients on the same GPU 0.", required=False, default="0") + parser.add_argument( + "--sim_gpus", + type=str, + help="GPU indexes to simulate clients, e.g., '0,1,2,3' if you want to run 4 clients, each on a separate GPU. By default run all clients on the same GPU 0.", + required=False, + default="0", + ) + + args = parser.parse_args() - args = parser.parse_args() - main(args) - \ No newline at end of file diff --git a/examples/advanced/bionemo/downstream/scl/run_sim_scl.py b/examples/advanced/bionemo/downstream/scl/run_sim_scl.py index f140a364ef..8171fde75a 100644 --- a/examples/advanced/bionemo/downstream/scl/run_sim_scl.py +++ b/examples/advanced/bionemo/downstream/scl/run_sim_scl.py @@ -13,28 +13,24 @@ # limitations under the License. import argparse -import logging +import os +import sys -from nvflare import FedJob, FilterType from bionemo.core.data.load import load + from nvflare import FilterType +from nvflare.app_common.launchers.subprocess_launcher import SubprocessLauncher from nvflare.app_common.workflows.fedavg import FedAvg from nvflare.app_opt.pt.job_config.base_fed_job import BaseFedJob -from nvflare.job_config.script_runner import ScriptRunner, BaseScriptRunner -from nvflare.app_common.launchers.subprocess_launcher import SubprocessLauncher +from nvflare.job_config.script_runner import BaseScriptRunner -import os -import pandas as pd -import sys -sys.path.append(os.path.join(os.getcwd(), "..")) # include parent folder in path +sys.path.append(os.path.join(os.getcwd(), "..")) # include parent folder in path from bionemo_filters import BioNeMoParamsFilter def main(args): # Create BaseFedJob with initial model - job = BaseFedJob( - name=f"{args.exp_name}_scl_esm2_{args.model}" - ) + job = BaseFedJob(name=f"{args.exp_name}_scl_esm2_{args.model}") # Define the controller and send to server controller = FedAvg( @@ -48,36 +44,39 @@ def main(args): # Define unique strings describing the classes for classification so we can use the same label vocabulary on each client. classes = "Cell_membrane,Cytoplasm,Endoplasmic_reticulum,Extracellular,Golgi_apparatus,Lysosome,Mitochondrion,Nucleus,Peroxisome,Plastid" - + # Add clients for i in range(args.num_clients): client_name = f"site-{i+1}" - + # define data paths # We use the same validation set for each client to make their metrics comparable - train_data_path = f"/tmp/data/mixed_soft/train/data_train_{client_name}.csv" + train_data_path = f"/tmp/data/mixed_soft/train/data_train_{client_name}.csv" val_data_path = f"/tmp/data/mixed_soft/val/data_val_{client_name}.csv" - if args.num_rounds > 1: # assume FL and set validation only at the end of round + if args.num_rounds > 1: # assume FL and set validation only at the end of round val_check_interval = args.local_steps else: - val_check_interval = int(args.local_steps/20) # 20 times per training - + val_check_interval = int(args.local_steps / 20) # 20 times per training + # define training script arguments - #precision = "bf16-mixed" + # precision = "bf16-mixed" precision = "fp32" script_args = f"--restore-from-checkpoint-path {checkpoint_path} --train-data-path {train_data_path} --valid-data-path {val_data_path} --config-class ESM2FineTuneSeqConfig --dataset-class InMemorySingleValueDataset --task-type classification --mlp-ft-dropout 0.1 --mlp-hidden-size 256 --mlp-target-size 10 --experiment-name {job.name} --num-steps {args.local_steps} --num-gpus 1 --val-check-interval {val_check_interval} --log-every-n-steps 10 --lr 5e-4 --result-dir bionemo --micro-batch-size 64 --precision {precision} --save-top-k 1 --encoder-frozen --limit-val-batches 1.0 --classes {classes}" print(f"Running {args.train_script} with args: {script_args}") - + # Define training script runner - runner = BaseScriptRunner(script=args.train_script, - launch_external_process=True, - framework="pytorch", - params_exchange_format="pytorch", - launcher=SubprocessLauncher(script=f"python3 custom/{args.train_script} {script_args}", - launch_once=False)) + runner = BaseScriptRunner( + script=args.train_script, + launch_external_process=True, + framework="pytorch", + params_exchange_format="pytorch", + launcher=SubprocessLauncher(script=f"python3 custom/{args.train_script} {script_args}", launch_once=False), + ) job.to(runner, client_name) - job.to(BioNeMoParamsFilter(precision), client_name, tasks=["train", "validate"], filter_type=FilterType.TASK_DATA) + job.to( + BioNeMoParamsFilter(precision), client_name, tasks=["train", "validate"], filter_type=FilterType.TASK_DATA + ) job.export_job("./exported_jobs") job.simulator_run(f"/tmp/nvflare/bionemo/scl/{job.name}", gpu=args.sim_gpus) @@ -88,12 +87,19 @@ def main(args): parser.add_argument("--num_clients", type=int, help="Number of clients", required=False, default=1) parser.add_argument("--num_rounds", type=int, help="Number of rounds", required=False, default=30) parser.add_argument("--local_steps", type=int, help="Number of rounds", required=False, default=10) - parser.add_argument("--train_script", type=str, help="Training script", required=False, default="../finetune_esm2.py") + parser.add_argument( + "--train_script", type=str, help="Training script", required=False, default="../finetune_esm2.py" + ) parser.add_argument("--exp_name", type=str, help="Job name prefix", required=False, default="fedavg") parser.add_argument("--model", choices=["8m", "650m", "3b"], help="ESM2 model", required=False, default="8m") - parser.add_argument("--sim_gpus", type=str, help="GPU indexes to simulate clients, e.g., '0,1,2,3' if you want to run 4 clients, each on a separate GPU. By default run all clients on the same GPU 0.", required=False, default="0") + parser.add_argument( + "--sim_gpus", + type=str, + help="GPU indexes to simulate clients, e.g., '0,1,2,3' if you want to run 4 clients, each on a separate GPU. By default run all clients on the same GPU 0.", + required=False, + default="0", + ) + + args = parser.parse_args() - args = parser.parse_args() - main(args) - \ No newline at end of file diff --git a/examples/advanced/bionemo/downstream/tap/prepare_tap_data.py b/examples/advanced/bionemo/downstream/tap/prepare_tap_data.py index 785334233a..d5fe497513 100644 --- a/examples/advanced/bionemo/downstream/tap/prepare_tap_data.py +++ b/examples/advanced/bionemo/downstream/tap/prepare_tap_data.py @@ -84,7 +84,7 @@ def main(): # rename columns to fit BioNeMo convention of "sequences" and "labels" for s in ["train", "valid", "test"]: split[s] = split[s].rename(columns={"Antibody": "sequences"}) - + train_split = pd.concat([split["train"], split["valid"]]) if train_df is None: train_df = train_split diff --git a/examples/advanced/bionemo/downstream/tap/run_sim_tap.py b/examples/advanced/bionemo/downstream/tap/run_sim_tap.py index c04fd62bd6..baf65d3662 100644 --- a/examples/advanced/bionemo/downstream/tap/run_sim_tap.py +++ b/examples/advanced/bionemo/downstream/tap/run_sim_tap.py @@ -13,28 +13,24 @@ # limitations under the License. import argparse -import logging +import os +import sys -from nvflare import FedJob, FilterType from bionemo.core.data.load import load + from nvflare import FilterType +from nvflare.app_common.launchers.subprocess_launcher import SubprocessLauncher from nvflare.app_common.workflows.fedavg import FedAvg from nvflare.app_opt.pt.job_config.base_fed_job import BaseFedJob -from nvflare.job_config.script_runner import ScriptRunner, BaseScriptRunner -from nvflare.app_common.launchers.subprocess_launcher import SubprocessLauncher +from nvflare.job_config.script_runner import BaseScriptRunner -import os -import pandas as pd -import sys -sys.path.append(os.path.join(os.getcwd(), "..")) # include parent folder in path -from bionemo_filters import BioNeMoParamsFilter, BioNeMoExcludeParamsFilter +sys.path.append(os.path.join(os.getcwd(), "..")) # include parent folder in path +from bionemo_filters import BioNeMoExcludeParamsFilter, BioNeMoParamsFilter def main(args): # Create BaseFedJob with initial model - job = BaseFedJob( - name=f"{args.exp_name}_tap_esm2_{args.model}" - ) + job = BaseFedJob(name=f"{args.exp_name}_tap_esm2_{args.model}") # Define the controller and send to server controller = FedAvg( @@ -47,11 +43,11 @@ def main(args): print(f"Downloaded {args.model} to {checkpoint_path}") label_columns = ["PSH", "PPC", "PNC", "SFvCSP"] - + # Add clients - for i, label_column in zip(range(args.num_clients),label_columns): + for i, label_column in zip(range(args.num_clients), label_columns): client_name = f"site-{i+1}" - + # define data paths # We use the same validation set for each client to make their metrics comparable val_data_path = "/tmp/data/tap/val/tap_valid.csv" @@ -59,30 +55,38 @@ def main(args): print("Simulating central training...") assert args.num_rounds == 1, "Use num_rounds=1 for simulating 'central' training setting." train_data_path = "/tmp/data/tap/train/tap_full_train.csv" - val_check_interval = int(args.local_steps/20) # 20 times per training - else: # local or fedavg setting - train_data_path = f"/tmp/data/tap/train/tap_{client_name}_train.csv" + val_check_interval = int(args.local_steps / 20) # 20 times per training + else: # local or fedavg setting + train_data_path = f"/tmp/data/tap/train/tap_{client_name}_train.csv" if args.num_rounds > 1: val_check_interval = args.local_steps else: - val_check_interval = int(args.local_steps/20) # 20 times per training - + val_check_interval = int(args.local_steps / 20) # 20 times per training + # define training script arguments - #precision = "bf16-mixed" + # precision = "bf16-mixed" precision = "fp32" script_args = f"--restore-from-checkpoint-path {checkpoint_path} --train-data-path {train_data_path} --valid-data-path {val_data_path} --config-class ESM2FineTuneSeqConfig --dataset-class InMemorySingleValueDataset --task-type regression --mlp-ft-dropout 0.1 --mlp-hidden-size 256 --mlp-target-size 1 --experiment-name {job.name} --num-steps {args.local_steps} --num-gpus 1 --val-check-interval {val_check_interval} --log-every-n-steps 10 --lr 5e-4 --lr-multiplier 1e3 --scale-lr-layer regression_head --result-dir bionemo --micro-batch-size 8 --precision {precision} --save-top-k 1 --limit-val-batches 1.0 --label-column {label_column}" print(f"Running {args.train_script} with args: {script_args}") - + # Define training script runner - runner = BaseScriptRunner(script=args.train_script, - launch_external_process=True, - framework="pytorch", - params_exchange_format="pytorch", - launcher=SubprocessLauncher(script=f"python3 custom/{args.train_script} {script_args}", - launch_once=False)) + runner = BaseScriptRunner( + script=args.train_script, + launch_external_process=True, + framework="pytorch", + params_exchange_format="pytorch", + launcher=SubprocessLauncher(script=f"python3 custom/{args.train_script} {script_args}", launch_once=False), + ) job.to(runner, client_name) - job.to(BioNeMoParamsFilter(precision), client_name, tasks=["train", "validate"], filter_type=FilterType.TASK_DATA) - job.to(BioNeMoExcludeParamsFilter(exclude_vars="regression_head"), client_name, tasks=["train", "validate"], filter_type=FilterType.TASK_RESULT) # do not share the regression head with the server; each client will train their personal endpoint in this example + job.to( + BioNeMoParamsFilter(precision), client_name, tasks=["train", "validate"], filter_type=FilterType.TASK_DATA + ) + job.to( + BioNeMoExcludeParamsFilter(exclude_vars="regression_head"), + client_name, + tasks=["train", "validate"], + filter_type=FilterType.TASK_RESULT, + ) # do not share the regression head with the server; each client will train their personal endpoint in this example job.export_job("./exported_jobs") job.simulator_run(f"/tmp/nvflare/bionemo/tap/{job.name}", gpu=args.sim_gpus) @@ -93,12 +97,19 @@ def main(args): parser.add_argument("--num_clients", type=int, help="Number of clients", required=False, default=1) parser.add_argument("--num_rounds", type=int, help="Number of rounds", required=False, default=30) parser.add_argument("--local_steps", type=int, help="Number of rounds", required=False, default=10) - parser.add_argument("--train_script", type=str, help="Training script", required=False, default="../finetune_esm2.py") + parser.add_argument( + "--train_script", type=str, help="Training script", required=False, default="../finetune_esm2.py" + ) parser.add_argument("--exp_name", type=str, help="Job name prefix", required=False, default="fedavg") parser.add_argument("--model", choices=["8m", "650m", "3b"], help="ESM2 model", required=False, default="8m") - parser.add_argument("--sim_gpus", type=str, help="GPU indexes to simulate clients, e.g., '0,1,2,3' if you want to run 4 clients, each on a separate GPU. By default run all clients on the same GPU 0.", required=False, default="0") + parser.add_argument( + "--sim_gpus", + type=str, + help="GPU indexes to simulate clients, e.g., '0,1,2,3' if you want to run 4 clients, each on a separate GPU. By default run all clients on the same GPU 0.", + required=False, + default="0", + ) + + args = parser.parse_args() - args = parser.parse_args() - main(args) - \ No newline at end of file diff --git a/examples/advanced/bionemo/task_fitting/src/bionemo_inference_processor.py b/examples/advanced/bionemo/task_fitting/src/bionemo_inference_processor.py index 860252e559..e113b9cc0a 100644 --- a/examples/advanced/bionemo/task_fitting/src/bionemo_inference_processor.py +++ b/examples/advanced/bionemo/task_fitting/src/bionemo_inference_processor.py @@ -12,12 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os import pprint -from bionemo_constants import BioNeMoConstants -from omegaconf import OmegaConf - from nvflare.apis.client import Client from nvflare.apis.dxo import DXO, from_shareable from nvflare.apis.fl_constant import FLContextKey, ReturnCode @@ -30,8 +26,7 @@ class BioNeMoInferenceProcessor(ResponseProcessor): def __init__( self, ): - """Creates task data, runs BioNeMo model inference, and summarizes results. - """ + """Creates task data, runs BioNeMo model inference, and summarizes results.""" super().__init__() self._inference_responses = {} diff --git a/examples/advanced/bionemo/task_fitting/src/bionemo_launcher.py b/examples/advanced/bionemo/task_fitting/src/bionemo_launcher.py index dbed7b6a47..14856ef2be 100644 --- a/examples/advanced/bionemo/task_fitting/src/bionemo_launcher.py +++ b/examples/advanced/bionemo/task_fitting/src/bionemo_launcher.py @@ -14,25 +14,19 @@ import os import time + import torch +from bionemo_constants import BioNeMoConstants -from nvflare.apis.dxo import MetaKey, DXO, from_shareable -from nvflare.apis.event_type import EventType +from nvflare.apis.dxo import DXO from nvflare.apis.executor import Executor from nvflare.apis.fl_constant import ReturnCode from nvflare.apis.fl_context import FLContext from nvflare.apis.shareable import Shareable, make_reply from nvflare.apis.signal import Signal -from nvflare.app_common.abstract.learner_spec import Learner -from nvflare.app_common.app_constant import AppConstants, ValidateType -from nvflare.security.logging import secure_format_exception -from nvflare.app_common.workflows.model_controller import ModelController from nvflare.app_common.abstract.launcher import Launcher, LauncherRunStatus -from nvflare.apis.fl_context import FLContext -from nvflare.apis.shareable import Shareable from nvflare.fuel.utils.validation_utils import check_object_type - -from bionemo_constants import BioNeMoConstants +from nvflare.security.logging import secure_format_exception class BioNeMoLauncher(Executor): @@ -40,7 +34,7 @@ def __init__( self, launcher_id: str = "launcher", task_name: str = BioNeMoConstants.TASK_INFERENCE, - check_interval: float = 10.0 + check_interval: float = 10.0, ): """Run a command on the client using a launcher. @@ -53,7 +47,7 @@ def __init__( self._launcher_id = launcher_id self._task_name = task_name self._check_interval = check_interval - self.is_initialized = False + self.is_initialized = False def _init_launcher(self, fl_ctx: FLContext): engine = fl_ctx.get_engine() @@ -66,16 +60,16 @@ def _init_launcher(self, fl_ctx: FLContext): def execute(self, task_name: str, shareable: Shareable, fl_ctx: FLContext, abort_signal: Signal) -> Shareable: try: - if task_name == self._task_name: + if task_name == self._task_name: if not self.is_initialized: self._init_launcher(fl_ctx) success = self._launch_script(fl_ctx) - + if success: # Get results path from inference script arguments args = self.launcher._script.split() - results_path = args[args.index("--results-path")+1] + results_path = args[args.index("--results-path") + 1] if os.path.isfile(results_path): self.log_info(fl_ctx, f"Get result info from: {results_path}") results = torch.load(results_path) @@ -83,22 +77,27 @@ def execute(self, task_name: str, shareable: Shareable, fl_ctx: FLContext, abort result_shapes = {} for k, v in results.items(): if v is not None: - result_shapes[k] = list(v.shape) # turn torch Size type into a simple list for sharing with server + result_shapes[k] = list( + v.shape + ) # turn torch Size type into a simple list for sharing with server n_sequences = len(results["embeddings"]) else: n_sequences, result_shapes = "n/a", "n/a" # Prepare a DXO for our updated model. Create shareable and return - data_info = {BioNeMoConstants.NUMBER_SEQUENCES: n_sequences, BioNeMoConstants.RESULT_SHAPES: result_shapes} - + data_info = { + BioNeMoConstants.NUMBER_SEQUENCES: n_sequences, + BioNeMoConstants.RESULT_SHAPES: result_shapes, + } + outgoing_dxo = DXO(data_kind=BioNeMoConstants.DATA_INFO, data=data_info) return outgoing_dxo.to_shareable() else: return make_reply(ReturnCode.EXECUTION_EXCEPTION) else: # If unknown task name, set RC accordingly. - return make_reply(ReturnCode.TASK_UNKNOWN) + return make_reply(ReturnCode.TASK_UNKNOWN) except Exception as e: self.log_exception(fl_ctx, f"Exception in execute: {secure_format_exception(e)}.") return make_reply(ReturnCode.EXECUTION_EXCEPTION) @@ -124,4 +123,3 @@ def _launch_script(self, fl_ctx: FLContext): self.launcher.finalize(fl_ctx=fl_ctx) self.log_info(fl_ctx, "Stop Executor Launcher.") return success - diff --git a/examples/advanced/bionemo/task_fitting/src/bionemo_mlp_job.py b/examples/advanced/bionemo/task_fitting/src/bionemo_mlp_job.py index 65ab1aa74f..2322c0a66c 100644 --- a/examples/advanced/bionemo/task_fitting/src/bionemo_mlp_job.py +++ b/examples/advanced/bionemo/task_fitting/src/bionemo_mlp_job.py @@ -13,19 +13,16 @@ # limitations under the License. from typing import List, Optional -from torch import nn as nn from bionemo_mlp_model_persistor import BioNeMoMLPModelPersistor +from torch import nn as nn -from nvflare.app_opt.pt.file_model_locator import PTFileModelLocator -from nvflare.app_common.abstract.model_locator import ModelLocator -from nvflare.app_common.abstract.model_persistor import ModelPersistor from nvflare.app_common.tracking.tracker_types import ANALYTIC_EVENT_TYPE from nvflare.app_common.widgets.convert_to_fed_event import ConvertToFedEvent from nvflare.app_common.widgets.intime_model_selector import IntimeModelSelector from nvflare.app_common.widgets.streaming import AnalyticsReceiver from nvflare.app_common.widgets.validation_json_generator import ValidationJsonGenerator -from nvflare.app_opt.pt.job_config.model import PTModel +from nvflare.app_opt.pt.file_model_locator import PTFileModelLocator from nvflare.app_opt.tracking.tb.tb_receiver import TBAnalyticsReceiver from nvflare.job_config.api import FedJob, validate_object_for_job @@ -42,7 +39,7 @@ def __init__( intime_model_selector: Optional[IntimeModelSelector] = None, convert_to_fed_event: Optional[ConvertToFedEvent] = None, analytics_receiver: Optional[AnalyticsReceiver] = None, - embedding_dimensions: int = 320 # embedding dimensions of ESM2-8m + embedding_dimensions: int = 320, # embedding dimensions of ESM2-8m ): """PyTorch BaseFedJob. @@ -106,7 +103,7 @@ def __init__( ) self.to_server(id="persistor", obj=BioNeMoMLPModelPersistor(embedding_dimensions=embedding_dimensions)) - + self.to_server(id="locator", obj=PTFileModelLocator(pt_persistor_id="persistor")) def set_up_client(self, target: str): diff --git a/examples/advanced/bionemo/task_fitting/src/bionemo_mlp_learner.py b/examples/advanced/bionemo/task_fitting/src/bionemo_mlp_learner.py index 31f66d81f8..9852060dcf 100644 --- a/examples/advanced/bionemo/task_fitting/src/bionemo_mlp_learner.py +++ b/examples/advanced/bionemo/task_fitting/src/bionemo_mlp_learner.py @@ -18,12 +18,11 @@ import pickle from distutils.util import strtobool from typing import Union -import torch - import numpy as np import pandas as pd import sklearn +import torch from sklearn.metrics import accuracy_score from sklearn.neural_network import MLPClassifier from torch.utils.tensorboard import SummaryWriter @@ -46,7 +45,7 @@ def __init__( analytic_sender_id: str = "analytic_sender", batch_size: int = 128, num_workers: int = 0, - embedding_dimensions: int = 320 # embedding dimensions of ESM2-8m + embedding_dimensions: int = 320, # embedding dimensions of ESM2-8m ): """BioNeMo MLP Trainer. @@ -121,7 +120,7 @@ def initialize(self): # Read embeddings results = torch.load(self.inference_result) - protein_embeddings = results['embeddings'] + protein_embeddings = results["embeddings"] self.info(f"Loaded {len(protein_embeddings)} embeddings") # Read labels diff --git a/examples/advanced/bionemo/task_fitting/src/bionemo_mlp_model_persistor.py b/examples/advanced/bionemo/task_fitting/src/bionemo_mlp_model_persistor.py index 577b120961..cf90eca7d8 100644 --- a/examples/advanced/bionemo/task_fitting/src/bionemo_mlp_model_persistor.py +++ b/examples/advanced/bionemo/task_fitting/src/bionemo_mlp_model_persistor.py @@ -41,7 +41,7 @@ def __init__( best_global_model_file_name=DefaultCheckpointFileName.BEST_GLOBAL_MODEL, source_ckpt_file_full_name=None, filter_id: str = None, - embedding_dimensions: int = 320 # embedding dimensions of ESM2-8m + embedding_dimensions: int = 320, # embedding dimensions of ESM2-8m ): """Persist sklearn-based model to/from file system.