diff --git a/MedicalDiagnosis/Main_AFedSV.py b/MedicalDiagnosis/AFedSV.py similarity index 95% rename from MedicalDiagnosis/Main_AFedSV.py rename to MedicalDiagnosis/AFedSV.py index 44a8084..d73917b 100644 --- a/MedicalDiagnosis/Main_AFedSV.py +++ b/MedicalDiagnosis/AFedSV.py @@ -1,6 +1,3 @@ -import sys -sys.path.insert(0,"/home/lixiang/FLamby-main") - import torch from MedicalDiagnosis.utils import evaluate_model_on_tests @@ -19,7 +16,7 @@ from MedicalDiagnosis.datasets.fed_isic2019 import FedIsic2019 as FedDataset # 1st line of code to change to switch to another strategy -from MedicalDiagnosis.strategies.fed_lsv import FedLSV as strat +from MedicalDiagnosis.strategies.afedsv import FedOptSV as strat # We loop on all the clients of the distributed dataset and instantiate associated data loaders train_dataloaders = [ diff --git a/MedicalDiagnosis/FedAvg.py b/MedicalDiagnosis/FedAvg.py new file mode 100644 index 0000000..d538eeb --- /dev/null +++ b/MedicalDiagnosis/FedAvg.py @@ -0,0 +1,77 @@ +import torch +from MedicalDiagnosis.utils import evaluate_model_on_tests + +print(torch.Tensor([1,2]).cuda()) +# 2 lines of code to change to switch to another dataset +from MedicalDiagnosis.datasets.fed_isic2019 import ( + BATCH_SIZE, + LR, + NUM_EPOCHS_POOLED, + Baseline, + BaselineLoss, + metric, + NUM_CLIENTS, + get_nb_max_rounds +) +from MedicalDiagnosis.datasets.fed_isic2019 import FedIsic2019 as FedDataset + +# 1st line of code to change to switch to another strategy +from MedicalDiagnosis.strategies.fed_avg import FedAvg as strat + +# We loop on all the clients of the distributed dataset and instantiate associated data loaders +train_dataloaders = [ + torch.utils.data.DataLoader( + FedDataset(center = i, train = True, pooled = False), + batch_size = BATCH_SIZE, + shuffle = True, + num_workers = 0 + ) + for i in range(NUM_CLIENTS) + ] +full_dataset = FedDataset(train = False, pooled = True) +valid_size = int(0.25 * len(full_dataset)) +test_size = len(full_dataset) -valid_size +valid_dataset, test_dataset = torch.utils.data.random_split(full_dataset, [valid_size, test_size]) +print(len(valid_dataset), len(test_dataset)) +test_dataloaders = [ + torch.utils.data.DataLoader( + test_dataset, + batch_size = BATCH_SIZE, + shuffle = False, + num_workers = 0, + ) + ] +valid_dataloaders = [ + torch.utils.data.DataLoader( + valid_dataset, + batch_size = BATCH_SIZE, + shuffle = False, + num_workers = 0, + ) + ] +lossfunc = BaselineLoss() +m = Baseline() + +# Federated Learning loop +# 2nd line of code to change to switch to another strategy (feed the FL strategy the right HPs) +args = { + "training_dataloaders": train_dataloaders, + "valid_dataloaders": valid_dataloaders, + "test_dataloaders": test_dataloaders, + "model": m, + "loss": lossfunc, + "optimizer_class": torch.optim.SGD, + "learning_rate": 0.01, + "num_updates": 100, +# This helper function returns the number of rounds necessary to perform approximately as many +# epochs on each local dataset as with the pooled training + "nrounds": 25, + } +s = strat(**args) +seeds = [20,21,22,23,24] +for seed in seeds: + m = s.run(seed)[0] + +# Evaluation +# We only instantiate one test set in this particular case: the pooled one + diff --git a/MedicalDiagnosis/Main_Baselines.py b/MedicalDiagnosis/FedProx.py similarity index 100% rename from MedicalDiagnosis/Main_Baselines.py rename to MedicalDiagnosis/FedProx.py diff --git a/MedicalDiagnosis/FedSV.py b/MedicalDiagnosis/FedSV.py new file mode 100644 index 0000000..fef05df --- /dev/null +++ b/MedicalDiagnosis/FedSV.py @@ -0,0 +1,77 @@ +import torch +from MedicalDiagnosis.utils import evaluate_model_on_tests + +print(torch.Tensor([1,2]).cuda()) +# 2 lines of code to change to switch to another dataset +from MedicalDiagnosis.datasets.fed_isic2019 import ( + BATCH_SIZE, + LR, + NUM_EPOCHS_POOLED, + Baseline, + BaselineLoss, + metric, + NUM_CLIENTS, + get_nb_max_rounds +) +from MedicalDiagnosis.datasets.fed_isic2019 import FedIsic2019 as FedDataset + +# 1st line of code to change to switch to another strategy +from MedicalDiagnosis.strategies.fed_lsv import FedLSV as strat + +# We loop on all the clients of the distributed dataset and instantiate associated data loaders +train_dataloaders = [ + torch.utils.data.DataLoader( + FedDataset(center = i, train = True, pooled = False), + batch_size = BATCH_SIZE, + shuffle = True, + num_workers = 0 + ) + for i in range(NUM_CLIENTS) + ] +full_dataset = FedDataset(train = False, pooled = True) +valid_size = int(0.25 * len(full_dataset)) +test_size = len(full_dataset) -valid_size +valid_dataset, test_dataset = torch.utils.data.random_split(full_dataset, [valid_size, test_size]) +print(len(valid_dataset), len(test_dataset)) +test_dataloaders = [ + torch.utils.data.DataLoader( + test_dataset, + batch_size = BATCH_SIZE, + shuffle = False, + num_workers = 0, + ) + ] +valid_dataloaders = [ + torch.utils.data.DataLoader( + valid_dataset, + batch_size = BATCH_SIZE, + shuffle = False, + num_workers = 0, + ) + ] +lossfunc = BaselineLoss() +m = Baseline() + +# Federated Learning loop +# 2nd line of code to change to switch to another strategy (feed the FL strategy the right HPs) +args = { + "training_dataloaders": train_dataloaders, + "valid_dataloaders": valid_dataloaders, + "test_dataloaders": test_dataloaders, + "model": m, + "loss": lossfunc, + "optimizer_class": torch.optim.SGD, + "learning_rate": 0.01, + "num_updates": 100, +# This helper function returns the number of rounds necessary to perform approximately as many +# epochs on each local dataset as with the pooled training + "nrounds": 25, + } +s = strat(**args) +seeds = [20,21,22,23,24] +for seed in seeds: + m = s.run(seed)[0] + +# Evaluation +# We only instantiate one test set in this particular case: the pooled one + diff --git a/MedicalDiagnosis/RFA.py b/MedicalDiagnosis/RFA.py new file mode 100644 index 0000000..9631007 --- /dev/null +++ b/MedicalDiagnosis/RFA.py @@ -0,0 +1,77 @@ +import torch +from MedicalDiagnosis.utils import evaluate_model_on_tests + +print(torch.Tensor([1,2]).cuda()) +# 2 lines of code to change to switch to another dataset +from MedicalDiagnosis.datasets.fed_isic2019 import ( + BATCH_SIZE, + LR, + NUM_EPOCHS_POOLED, + Baseline, + BaselineLoss, + metric, + NUM_CLIENTS, + get_nb_max_rounds +) +from MedicalDiagnosis.datasets.fed_isic2019 import FedIsic2019 as FedDataset + +# 1st line of code to change to switch to another strategy +from MedicalDiagnosis.strategies.rfa import RFA as strat + +# We loop on all the clients of the distributed dataset and instantiate associated data loaders +train_dataloaders = [ + torch.utils.data.DataLoader( + FedDataset(center = i, train = True, pooled = False), + batch_size = BATCH_SIZE, + shuffle = True, + num_workers = 0 + ) + for i in range(NUM_CLIENTS) + ] +full_dataset = FedDataset(train = False, pooled = True) +valid_size = int(0.25 * len(full_dataset)) +test_size = len(full_dataset) -valid_size +valid_dataset, test_dataset = torch.utils.data.random_split(full_dataset, [valid_size, test_size]) +print(len(valid_dataset), len(test_dataset)) +test_dataloaders = [ + torch.utils.data.DataLoader( + test_dataset, + batch_size = BATCH_SIZE, + shuffle = False, + num_workers = 0, + ) + ] +valid_dataloaders = [ + torch.utils.data.DataLoader( + valid_dataset, + batch_size = BATCH_SIZE, + shuffle = False, + num_workers = 0, + ) + ] +lossfunc = BaselineLoss() +m = Baseline() + +# Federated Learning loop +# 2nd line of code to change to switch to another strategy (feed the FL strategy the right HPs) +args = { + "training_dataloaders": train_dataloaders, + "valid_dataloaders": valid_dataloaders, + "test_dataloaders": test_dataloaders, + "model": m, + "loss": lossfunc, + "optimizer_class": torch.optim.SGD, + "learning_rate": 0.01, + "num_updates": 100, +# This helper function returns the number of rounds necessary to perform approximately as many +# epochs on each local dataset as with the pooled training + "nrounds": 25, + } +s = strat(**args) +seeds = [20,21,22,23,24] +for seed in seeds: + m = s.run(seed)[0] + +# Evaluation +# We only instantiate one test set in this particular case: the pooled one + diff --git a/MedicalDiagnosis/strategies/fed_lsv.py b/MedicalDiagnosis/strategies/fed_lsv.py new file mode 100644 index 0000000..0ae6593 --- /dev/null +++ b/MedicalDiagnosis/strategies/fed_lsv.py @@ -0,0 +1,235 @@ +import time +from typing import List + +import torch +import copy +from tqdm import tqdm +import json +from Shapley import Shapley +from noise import GradientNoise +import numpy as np +from flamby.strategies.utils import DataLoaderWithMemory, _Model +from flamby.utils import evaluate_model_on_tests +from flamby.datasets.fed_isic2019 import ( + BATCH_SIZE, + LR, + NUM_EPOCHS_POOLED, + Baseline, + BaselineLoss, + metric, + NUM_CLIENTS, + get_nb_max_rounds +) +def softmax(a): + exp_a = np.exp(a) + sum_exp_a = np.sum(exp_a) + y = exp_a / sum_exp_a + return y + +class FedLSV: + """Federated Averaging Strategy class. + + The Federated Averaging strategy is the most simple centralized FL strategy. + Each client first trains his version of a global model locally on its data, + the states of the model of each client are then weighted-averaged and returned + to each client for further training. + + References + ---------- + - https://arxiv.org/abs/1602.05629 + + Parameters + ---------- + training_dataloaders : List + The list of training dataloaders from multiple training centers. + model : torch.nn.Module + An initialized torch model. + loss : torch.nn.modules.loss._Loss + The loss to minimize between the predictions of the model and the + ground truth. + optimizer_class : torch.optim.Optimizer + The class of the torch model optimizer to use at each step. + learning_rate : float + The learning rate to be given to the optimizer_class. + num_updates : int + The number of updates to do on each client at each round. + nrounds : int + The number of communication rounds to do. + dp_target_epsilon: float + The target epsilon for (epsilon, delta)-differential + private guarantee. Defaults to None. + dp_target_delta: float + The target delta for (epsilon, delta)-differential private + guarantee. Defaults to None. + dp_max_grad_norm: float + The maximum L2 norm of per-sample gradients; used to enforce + differential privacy. Defaults to None. + log: bool, optional + Whether or not to store logs in tensorboard. Defaults to False. + log_period: int, optional + If log is True then log the loss every log_period batch updates. + Defauts to 100. + bits_counting_function : Union[callable, None], optional + A function making sure exchanges respect the rules, this function + can be obtained by decorating check_exchange_compliance in + flamby.utils. Should have the signature List[Tensor] -> int. + Defaults to None. + logdir: str, optional + Where logs are stored. Defaults to ./runs. + log_basename: str, optional + The basename of the created log_file. Defaults to fed_avg. + """ + + def __init__( + self, + training_dataloaders: List, + test_dataloaders: List, + valid_dataloaders: List, + model: torch.nn.Module, + loss: torch.nn.modules.loss._Loss, + optimizer_class: torch.optim.Optimizer, + learning_rate: float, + num_updates: int, + nrounds: int, + dp_target_epsilon: float = None, + dp_target_delta: float = None, + dp_max_grad_norm: float = None, + log: bool = False, + log_period: int = 100, + bits_counting_function: callable = None, + logdir: str = "./runs", + log_basename: str = "fed_avg", + seed=None, + ): + """ + Cf class docstring + """ + self._seed = seed if seed is not None else int(time.time()) + self.test_dataloaders = test_dataloaders + self.valid_dataloaders = valid_dataloaders + self.training_dataloaders_with_memory = [ + DataLoaderWithMemory(e) for e in training_dataloaders + ] + self.training_sizes = [len(e) for e in self.training_dataloaders_with_memory] + self.total_number_of_samples = sum(self.training_sizes) + + self.dp_target_epsilon = dp_target_epsilon + self.dp_target_delta = dp_target_delta + self.dp_max_grad_norm = dp_max_grad_norm + + self.log = log + self.log_period = log_period + self.log_basename = log_basename + self.logdir = logdir + + self.models_list = [ + _Model( + model=model, + optimizer_class=optimizer_class, + lr=learning_rate, + train_dl=_train_dl, + dp_target_epsilon=self.dp_target_epsilon, + dp_target_delta=self.dp_target_delta, + dp_max_grad_norm=self.dp_max_grad_norm, + loss=loss, + nrounds=nrounds, + log=self.log, + client_id=i, + log_period=self.log_period, + log_basename=self.log_basename, + logdir=self.logdir, + seed=self._seed, + ) + for i, _train_dl in enumerate(training_dataloaders) + ] + self.nrounds = nrounds + self.num_updates = num_updates + + + self.num_clients = len(self.training_sizes) + self.bits_counting_function = bits_counting_function + + def _local_optimization(self, _model: _Model, dataloader_with_memory): + """Carry out the local optimization step. + + Parameters + ---------- + _model: _Model + The model on the local device used by the optimization step. + dataloader_with_memory : dataloaderwithmemory + A dataloader that can be called infinitely using its get_samples() + method. + """ + _model._local_train(dataloader_with_memory, self.num_updates) + + def perform_round(self): + """Does a single federated averaging round. The following steps will be + performed: + + - each model will be trained locally for num_updates batches. + - the parameter updates will be collected and averaged. Averages will be + weighted by the number of samples in each client + - the averaged updates willl be used to update the local model + """ + local_states = list() + noise = 5 + noiselevel = 0.1 + org = copy.deepcopy(self.models_list[0].model.state_dict()) + for _model, dataloader_with_memory, size in zip( + self.models_list, self.training_dataloaders_with_memory, self.training_sizes + ): + # Local Optimization + _local_previous_state = _model._get_current_params() + self._local_optimization(_model, dataloader_with_memory) + _local_next_state = _model._get_current_params() + local_states.append(copy.deepcopy(_model.model.state_dict())) + del _local_next_state + + # Reset local model + for p_new, p_old in zip(_model.model.parameters(), _local_previous_state): + p_new.data = torch.from_numpy(p_old).to(p_new.device) + self.device = p_new.device + del _local_previous_state + + if self.bits_counting_function is not None: + self.bits_counting_function(updates) + local_states = GradientNoise (local_states, noise, noiselevel, self.device, self.noiseseed) + # Aggregation step + aggregated_delta_weights = [ + None for _ in range(len(local_states[0])) + ] + # for idx in range(6): + # print(len(local_weights[idx]),type(local_weights[idx])) + Fed_sv = Shapley(local_states, self.models_list[0].model, self.valid_dataloaders, 0) + shapley = Fed_sv.eval_ccshap_stratified(5) + + tmpshapley = softmax(shapley) + + w_avg = copy.deepcopy(org) + for key in w_avg.keys(): + for i in range(0, len(local_states)): + w_avg[key] = w_avg[key] + (local_states[i][key]-org[key])*tmpshapley[i] + w_avg[key] = torch.div(w_avg[key], 2) + torch.div(org[key], 2) + + # Update models + for _model in self.models_list: + _model.model.load_state_dict(w_avg) + + dict_cindex = evaluate_model_on_tests(self.models_list[0].model, self.test_dataloaders, metric) + print(dict_cindex) + path = "fedlsv{}_{}_{}.txt".format(self.noiseseed, noise, noiselevel) + f = open(path, "a+") + print(dict_cindex) + js = json.dumps(dict_cindex) + f.write(js) + f.write('\r\n') + f.close() + + def run(self, noiseseed): + """This method performs self.nrounds rounds of averaging + and returns the list of models. + """ + self.noiseseed = noiseseed + for _ in tqdm(range(self.nrounds)): + self.perform_round() + return [m.model for m in self.models_list] \ No newline at end of file diff --git a/README.md b/README.md index 726a044..d1228a5 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,71 @@ # ShapleyFL: Robust Federated Learning Based on Shapley Value -Source code for 《ShapleyFL: Robust Federated Learning Based on Shapley Value》 +Implementation of the FL paper : ShapleyFL: Robust Federated Learning Based on Shapley Value + +**The image classification experiments** are produced on MNIST, Fashion MNIST and CIFAR10. Since the purpose of these experiments are to illustrate the effectiveness of the federated learning paradigm, only simple models CNN are used. + +We study 5 popular data and model poisoning scenarios based on the Non-IID data setting + +1. imbalanced data with long-tailed distribution +2. irrelevant data with open-set label noise +3. malicious clients with closed-set label noise +4. malicious clients with data noise +5. attacks with gradient poisoning + + **The medical diagnosis experiments** are conducted on a realistic cross-silo healthcare dataset Fed-ISIC2019. The best-performing EfficientNets architecture is used as the central model. The code is based on Flamby( https://github.com/owkin/FLamby/tree/main/flamby/datasets). Please see its document for more details. + +## Requirments + +Install all the packages from requirements.txt + +## Data + +#### **Image Classification** + +Download train and test datasets manually or they will be automatically downloaded from torchvision datasets. + +#### **Medical Diagnosis** + +We keep the orginal data creation and image preprocessing script from FLamby/fed_isic2019(https://github.com/owkin/FLamby/tree/main/flamby/datasets/fed_isic2019). Please see its document for more details. + +## Running the experiments + +#### **Image Classification** + +E.g. Fashion Mnist Dataset with long-tailed data distribution noise on GPU no.1 + +```shell +cd ImageClassification/src_opt +python AfedSV+.py --model=cnn --dataset=fmnist --epochs=100 --num_users=100 --frac=0.1 --gamma_sv=0.3 --noise=1 --gpu=1 +``` + +**Option** + +* ```--dataset:``` Default: 'cifar'. Options: 'fmnist', 'cifar' +* ```--model:``` Default: 'mlp'. Options: 'mlp', 'cnn' +* ```--gpu:``` Default: None (runs on CPU). Can also be set to the specific gpu id. +* ```--epochs:``` Number of rounds of training. +* ```--num_users:```Number of users. Default is 100. +* ```--frac:``` Fraction of users to be used for federated updates. Default is 0.1. +* ```--gamma_sv:``` SV update ratio. Default is 0.3. +* ```--noise:``` Type of noise injected. Default is 0. + * 0 - NonIID + * 1 - Long-tailed distribution + * 2 - Open-set label noise + * 3 - Closed-set label noise + * 4 - Data noise + * 5 - Gradient poisoning +* ```--noiselevel:``` Level of noise injected. Default is 0. + +**Medical Diagnosis** + +```shell +cd MedicalDiagnosis +python AfedSV.py +``` + +## References + +[1] FL framework(FedAvg) https://github.com/AshwinRJ/Federated-Learning-PyTorch + +[2] Fed-ISIC2019 dataset https://github.com/owkin/FLamby/tree/main/flamby/datasets diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..dbae268 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,15 @@ +albumentations==1.3.0 +efficientnet_pytorch==0.7.1 +joblib==1.1.0 +matplotlib==3.3.3 +numpy==1.21.4 +pandas==1.2.4 +Pillow==9.4.0 +PyYAML==6.0 +scikit_learn==1.2.1 +seaborn==0.11.2 +tensorboardX==2.5.1 +torch==1.11.0 +torchvision==0.12.0 +tqdm==4.50.2 +umap==0.1.1