From 25e8f0ee67a0cbedb3841a1991e012c0e4627f76 Mon Sep 17 00:00:00 2001 From: Daniel Young Date: Tue, 30 Jul 2024 14:53:02 -0700 Subject: [PATCH 01/13] Updated pylint to work with new X variables --- .pylintrc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pylintrc b/.pylintrc index 22ffb8a..36dbe65 100644 --- a/.pylintrc +++ b/.pylintrc @@ -42,6 +42,6 @@ suggestion-mode=yes disable= # Default set of "always good" names -good-names=_,X_train,X_test +good-names=_,X_train,X_test,X,X_val,X_train_scaled,X_test_scaled,X_val_scaled recursive=y From ea77d71405413137b41fc25a0de4dfa6eca3cdf5 Mon Sep 17 00:00:00 2001 From: Daniel Young Date: Tue, 30 Jul 2024 14:53:47 -0700 Subject: [PATCH 02/13] Added neural net predictor --- src/prsdk/data/cao_mapping.py | 14 ++ src/prsdk/data/torch_data.py | 27 +++ .../neural_network/neural_net_predictor.py | 210 ++++++++++++++++++ .../neural_network/torch_neural_net.py | 55 +++++ 4 files changed, 306 insertions(+) create mode 100644 src/prsdk/data/cao_mapping.py create mode 100644 src/prsdk/data/torch_data.py create mode 100644 src/prsdk/predictors/neural_network/neural_net_predictor.py create mode 100644 src/prsdk/predictors/neural_network/torch_neural_net.py diff --git a/src/prsdk/data/cao_mapping.py b/src/prsdk/data/cao_mapping.py new file mode 100644 index 0000000..2709547 --- /dev/null +++ b/src/prsdk/data/cao_mapping.py @@ -0,0 +1,14 @@ +""" +Immutable NamedTuple for storing the context, actions, and outcomes for a given project. +Note: We choose to use NamedTuple over dataclasses because NamedTuple is immutable. +""" +from typing import NamedTuple + + +class CAOMapping(NamedTuple): + """ + Class defining the context, actions, and outcomes for a given project. + """ + context: list[str] + actions: list[str] + outcomes: list[str] diff --git a/src/prsdk/data/torch_data.py b/src/prsdk/data/torch_data.py new file mode 100644 index 0000000..25ca8f6 --- /dev/null +++ b/src/prsdk/data/torch_data.py @@ -0,0 +1,27 @@ +""" +A simple custom PyTorch dataset is created here. This is used to keep our +datasets standard between models. It is used in both Torch prescription +and Neural Network training. +""" +import numpy as np +import torch +from torch.utils.data.dataset import Dataset + + +class TorchDataset(Dataset): + """ + Simple custom torch dataset. + :param X: data + :param y: labels + """ + def __init__(self, X: np.ndarray, y: np.ndarray, device="cpu"): + super().__init__() + self.X = torch.tensor(X, dtype=torch.float32, device=device) + self.y = torch.tensor(y, device=device) + assert len(self.X) == len(self.y), "X and y must have the same length" + + def __len__(self): + return len(self.X) + + def __getitem__(self, idx: int) -> tuple: + return self.X[idx], self.y[idx] diff --git a/src/prsdk/predictors/neural_network/neural_net_predictor.py b/src/prsdk/predictors/neural_network/neural_net_predictor.py new file mode 100644 index 0000000..d8a341f --- /dev/null +++ b/src/prsdk/predictors/neural_network/neural_net_predictor.py @@ -0,0 +1,210 @@ +""" +Implementation of predictor.py using a simple feed-forward NeuralNetwork +implemented in PyTorch. +""" +import copy +import time + +import numpy as np +import pandas as pd +from sklearn.preprocessing import StandardScaler +from tqdm import tqdm + +import torch +from torch.utils.data import DataLoader +from torch.utils.tensorboard import SummaryWriter + +from data.cao_mapping import CAOMapping +from data.torch_data import TorchDataset +from predictors.predictor import Predictor +from predictors.neural_network.torch_neural_net import TorchNeuralNet + + +# pylint: disable=too-many-instance-attributes +class NeuralNetPredictor(Predictor): + """ + Simple feed-forward neural network predictor implemented in PyTorch. + Has the option to use wide and deep, concatenating the input to the output of the hidden layers + in order to take advantage of the linear relationship in the data. + Data is automatically standardized and the scaler is saved with the model. + TODO: We want to be able to have custom scaling in the future. + """ + def __init__(self, cao: CAOMapping, model_config: dict): + """ + :param context: list of context features. + :param actions: list of action features. + :param outcomes: list of outcomes to predict. + :param model_config: dictionary of model configuration parameters. + Model config should contain the following: + features: list of features to use in the model (optional, defaults to all context + actions) + label: name of the label column (optional, defaults to passed label in fit) + hidden_sizes: list of hidden layer sizes (defaults to single layer of size 4096) + linear_skip: whether to concatenate input to hidden layer output (defaults to True) + dropout: dropout probability (defaults to 0) + device: device to run the model on (defaults to "cpu") + epochs: number of epochs to train for (defaults to 3) + batch_size: batch size for training (defaults to 2048) + optim_params: dictionary of parameters to pass to the optimizer (defaults to PyTorch default) + train_pct: percentage of training data to use (defaults to 1) + step_lr_params: dictionary of parameters to pass to the step learning rate scheduler (defaults to 1, 0.1) + """ + super().__init__(cao) + self.features = model_config.get("features", None) + self.label = model_config.get("label", None) + + self.hidden_sizes = model_config.get("hidden_sizes", [4096]) + self.linear_skip = model_config.get("linear_skip", True) + self.dropout = model_config.get("dropout", 0) + self.device = model_config.get("device", "cpu") + self.epochs = model_config.get("epochs", 3) + self.batch_size = model_config.get("batch_size", 2048) + self.optim_params = model_config.get("optim_params", {}) + self.train_pct = model_config.get("train_pct", 1) + self.step_lr_params = model_config.get("step_lr_params", {"step_size": 1, "gamma": 0.1}) + + self.model = None + self.scaler = StandardScaler() + + # pylint: disable=too-many-arguments,too-many-locals,too-many-branches,too-many-statements + def fit(self, X_train: pd.DataFrame, y_train: pd.Series, + X_val=None, y_val=None, + X_test=None, y_test=None, + log_path=None, verbose=False) -> dict: + """ + Fits neural network to given data using predefined parameters and hyperparameters. + If no features were specified we use all the columns in X_train. + We scale based on the training data and apply it to validation and test data. + AdamW optimizer is used with L1 loss. + TODO: We want to be able to customize the loss function in the future. + :param X_train: training data, may be unscaled and have excess features. + :param y_train: training labels. + :param X_val: validation data, may be unscaled and have excess features. + :param y_val: validation labels. + :param X_test: test data, may be unscaled and have excess features. + :param y_test: test labels. + :param log_path: path to log training data to tensorboard. + :param verbose: whether to print progress bars. + :return: dictionary of results from training containing time taken, best epoch, best loss, + and test loss if applicable. + """ + if not self.features: + self.features = X_train.columns.tolist() + self.label = y_train.name + + self.model = TorchNeuralNet(len(self.features), self.hidden_sizes, self.linear_skip, self.dropout) + self.model.to(self.device) + self.model.train() + + start = time.time() + + # Set up train set + X_train = self.scaler.fit_transform(X_train[self.features]) + y_train = y_train.values + train_ds = TorchDataset(X_train, y_train) + sampler = torch.utils.data.RandomSampler(train_ds, num_samples=int(len(train_ds) * self.train_pct)) + train_dl = DataLoader(train_ds, self.batch_size, sampler=sampler) + + # If we pass in a validation set, use them + if X_val is not None and y_val is not None: + X_val = self.scaler.transform(X_val[self.features]) + y_val = y_val.values + val_ds = TorchDataset(X_val, y_val) + val_dl = DataLoader(val_ds, self.batch_size, shuffle=False) + + # Optimization parameters + optimizer = torch.optim.AdamW(self.model.parameters(), **self.optim_params) + loss_fn = torch.nn.L1Loss() + if self.step_lr_params: + scheduler = torch.optim.lr_scheduler.StepLR(optimizer, **self.step_lr_params) + + if log_path: + writer = SummaryWriter(log_path) + + # Keeping track of best performance for validation + result_dict = {} + best_model = None + best_loss = np.inf + end = 0 + + step = 0 + for epoch in range(self.epochs): + self.model.train() + # Standard training loop + train_iter = tqdm(train_dl) if verbose else train_dl + for X, y in train_iter: + X, y = X.to(self.device), y.to(self.device) + optimizer.zero_grad() + out = self.model(X) + loss = loss_fn(out.squeeze(), y.squeeze()) + if log_path: + writer.add_scalar("loss", loss.item(), step) + step += 1 + loss.backward() + optimizer.step() + + # LR Decay + if self.step_lr_params: + scheduler.step() + + # Evaluate epoch + if X_val is not None and y_val is not None: + total = 0 + self.model.eval() + with torch.no_grad(): + for X, y in tqdm(val_dl): + X, y = X.to(self.device), y.to(self.device) + out = self.model(X) + loss = loss_fn(out.squeeze(), y.squeeze()) + total += loss.item() * y.shape[0] + + if log_path: + writer.add_scalar("val_loss", total / len(val_ds), step) + + if total < best_loss: + best_model = copy.deepcopy(self.model.state_dict()) + best_loss = total + end = time.time() + result_dict["best_epoch"] = epoch + result_dict["best_loss"] = total / len(val_ds) + result_dict["time"] = end - start + + print(f"epoch {epoch} mae {total / len(val_ds)}") + + if best_model: + self.model.load_state_dict(best_model) + else: + end = time.time() + result_dict["time"] = end - start + + # If we provide a test dataset + if X_test is not None and y_test is not None: + y_pred = self.predict(X_test) + y_true = y_test.values + mae = np.mean(np.abs(y_pred - y_true)) + result_dict["test_loss"] = mae + + return result_dict + # pylint: enable=too-many-arguments,too-many-locals,too-many-branches,too-many-statements + + def predict(self, context_actions_df: pd.DataFrame) -> pd.DataFrame: + """ + Generates prediction from model for given test data. + :param context_actions_df: test data to predict on. + :return: DataFrame of predictions properly labeled and indexed. + """ + X_test_scaled = self.scaler.transform(context_actions_df[self.features]) + test_ds = TorchDataset(X_test_scaled, np.zeros(len(X_test_scaled))) + test_dl = DataLoader(test_ds, self.batch_size, shuffle=False) + pred_list = [] + with torch.no_grad(): + self.model.eval() + for X, _ in test_dl: + X = X.to(self.device) + pred_list.append(self.model(X)) + + # Flatten into a single numpy array if we have multiple batches + if len(pred_list) > 1: + y_pred = torch.concatenate(pred_list, dim=0).cpu().numpy() + else: + y_pred = pred_list[0].cpu().numpy() + return pd.DataFrame(y_pred, index=context_actions_df.index, columns=[self.label]) diff --git a/src/prsdk/predictors/neural_network/torch_neural_net.py b/src/prsdk/predictors/neural_network/torch_neural_net.py new file mode 100644 index 0000000..ce15a0d --- /dev/null +++ b/src/prsdk/predictors/neural_network/torch_neural_net.py @@ -0,0 +1,55 @@ +""" +Simple feed-forward neural network to be used in the Neural Network Predictor. +""" +import torch + + +class TorchNeuralNet(torch.nn.Module): + """ + Custom torch neural network module. + :param in_size: number of input features + :param hidden_sizes: list of hidden layer sizes + :param linear_skip: whether to concatenate input to hidden layer output + :param dropout: dropout probability + """ + class EncBlock(torch.nn.Module): + """ + Encoding block for neural network. + Simple feed forward layer with ReLU activation and optional dropout. + """ + def __init__(self, in_size: int, out_size: int, dropout: float): + super().__init__() + self.model = torch.nn.Sequential( + torch.nn.Linear(in_size, out_size), + torch.nn.ReLU(), + torch.nn.Dropout(p=dropout) + ) + + def forward(self, X: torch.FloatTensor) -> torch.FloatTensor: + """ + Passes input through the block. + """ + return self.model(X) + + def __init__(self, in_size: int, hidden_sizes: list[str], linear_skip: bool, dropout: float): + super().__init__() + self.linear_skip = linear_skip + hidden_sizes = [in_size] + hidden_sizes + enc_blocks = [self.EncBlock(hidden_sizes[i], hidden_sizes[i+1], dropout) for i in range(len(hidden_sizes) - 1)] + self.enc = torch.nn.Sequential(*enc_blocks) + # If we are using linear skip, we concatenate the input to the output of the hidden layers + out_size = hidden_sizes[-1] + in_size if linear_skip else hidden_sizes[-1] + self.linear = torch.nn.Linear(out_size, 1) + + def forward(self, X: torch.FloatTensor) -> torch.FloatTensor: + """ + Performs a forward pass of the neural net. + If linear_skip is True, we concatenate the input to the output of the hidden layers. + :param X: input data + :return: output of the neural net + """ + hid = self.enc(X) + if self.linear_skip: + hid = torch.concatenate([hid, X], dim=1) + out = self.linear(hid) + return out From 408ae5a11a9b0307f102edf28d7810bc93ec5ac4 Mon Sep 17 00:00:00 2001 From: Daniel Young Date: Tue, 30 Jul 2024 14:54:33 -0700 Subject: [PATCH 03/13] Added sklearn predictors --- .../linear_regression_predictor.py | 25 +++++++++ .../random_forest_predictor.py | 23 ++++++++ .../sklearn_predictors/sklearn_predictor.py | 53 +++++++++++++++++++ 3 files changed, 101 insertions(+) create mode 100644 src/prsdk/predictors/sklearn_predictors/linear_regression_predictor.py create mode 100644 src/prsdk/predictors/sklearn_predictors/random_forest_predictor.py create mode 100644 src/prsdk/predictors/sklearn_predictors/sklearn_predictor.py diff --git a/src/prsdk/predictors/sklearn_predictors/linear_regression_predictor.py b/src/prsdk/predictors/sklearn_predictors/linear_regression_predictor.py new file mode 100644 index 0000000..490d878 --- /dev/null +++ b/src/prsdk/predictors/sklearn_predictors/linear_regression_predictor.py @@ -0,0 +1,25 @@ +""" +Implementation of SKLearnPredictor as a LinearRegressor. +""" +from sklearn.linear_model import LinearRegression + +from data.cao_mapping import CAOMapping +from predictors.sklearn_predictors.sklearn_predictor import SKLearnPredictor + + +class LinearRegressionPredictor(SKLearnPredictor): + """ + Simple linear regression predictor. + See SKLearnPredictor for more details. + """ + def __init__(self, cao: CAOMapping, model_config: dict): + """ + :param cao: CAOMapping object with context, actions, and outcomes for super constructor. + :param model_config: Configuration to pass into the SKLearn constructor. Also contains the keys "features" and + "label" to keep track of the features and label to predict. + """ + if not model_config: + model_config = {} + lr_config = {key: value for key, value in model_config.items() if key not in ["features", "label"]} + model = LinearRegression(**lr_config) + super().__init__(cao, model, model_config) diff --git a/src/prsdk/predictors/sklearn_predictors/random_forest_predictor.py b/src/prsdk/predictors/sklearn_predictors/random_forest_predictor.py new file mode 100644 index 0000000..37e46c0 --- /dev/null +++ b/src/prsdk/predictors/sklearn_predictors/random_forest_predictor.py @@ -0,0 +1,23 @@ +""" +Implementation of SKLearnPredictor as a RandomForestRegressor. +""" +from sklearn.ensemble import RandomForestRegressor + +from data.cao_mapping import CAOMapping +from predictors.sklearn_predictors.sklearn_predictor import SKLearnPredictor + + +class RandomForestPredictor(SKLearnPredictor): + """ + Simple random forest predictor. + See SKLearnPredictor for more details. + """ + def __init__(self, cao: CAOMapping, model_config: dict): + """ + :param cao: CAOMapping object with context, actions, and outcomes for super constructor. + :param model_config: Configuration to pass into the SKLearn constructor. Also contains the keys "features" and + "label" to keep track of the features and label to predict. + """ + rf_config = {key: value for key, value in model_config.items() if key not in ["features", "label"]} + model = RandomForestRegressor(**rf_config) + super().__init__(cao, model, model_config) diff --git a/src/prsdk/predictors/sklearn_predictors/sklearn_predictor.py b/src/prsdk/predictors/sklearn_predictors/sklearn_predictor.py new file mode 100644 index 0000000..6e64d27 --- /dev/null +++ b/src/prsdk/predictors/sklearn_predictors/sklearn_predictor.py @@ -0,0 +1,53 @@ +""" +Abstract SKLearn predictor. +Since the SKLearn library is standardized we can easily make more. +""" +from abc import ABC + +import pandas as pd + +from data.cao_mapping import CAOMapping +from predictors.predictor import Predictor + + +class SKLearnPredictor(Predictor, ABC): + """ + Simple abstract class for sklearn predictors. + Keeps track of features fit on and label to predict. + """ + def __init__(self, cao: CAOMapping, model, model_config: dict): + """ + Model config contains the following: + features: list of features to use for prediction (optional, defaults to all features) + label: name of the label to predict (optional, defaults to passed label during fit) + Any other parameters are passed to the model. + """ + super().__init__(cao) + self.config = model_config + self.model = model + + def fit(self, X_train: pd.DataFrame, y_train: pd.Series): + """ + Fits SKLearn model with standard sklearn fit method. + If we passed in features, use those. Otherwise use all columns. + :param X_train: DataFrame with input data + :param y_train: series with target data + """ + if "features" in self.config: + X_train = X_train[self.config["features"]] + else: + self.config["features"] = list(X_train.columns) + self.config["label"] = y_train.name + self.model.fit(X_train, y_train) + + def predict(self, context_actions_df: pd.DataFrame) -> pd.DataFrame: + """ + Standard sklearn predict method. + Makes sure to use the same features as were used in fit. + Ensures index and label are properly applied to the output. + :param context_actions_df: DataFrame with input data + :return: properly labeled DataFrame with predictions and matching index. + """ + context_actions_df = context_actions_df[self.config["features"]] + y_pred = self.model.predict(context_actions_df) + return pd.DataFrame(y_pred, index=context_actions_df.index, columns=[self.config["label"]]) From a156a166a037533156ff4835cd484e3d57c89063 Mon Sep 17 00:00:00 2001 From: Daniel Young Date: Tue, 30 Jul 2024 14:55:06 -0700 Subject: [PATCH 04/13] Updated models to use new CAO mapping --- src/prsdk/predictors/predictor.py | 12 +++++------- src/prsdk/prescriptors/prescriptor.py | 13 +++++++++---- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/src/prsdk/predictors/predictor.py b/src/prsdk/predictors/predictor.py index 31b8b74..486cb93 100644 --- a/src/prsdk/predictors/predictor.py +++ b/src/prsdk/predictors/predictor.py @@ -5,6 +5,8 @@ import pandas as pd +from data.cao_mapping import CAOMapping + class Predictor(ABC): """ @@ -12,16 +14,12 @@ class Predictor(ABC): Predictors must be able to be fit and predict on a DataFrame. It is up to the Predictor to keep track of the proper label to label the output DataFrame. """ - def __init__(self, context: list[str], actions: list[str], outcomes: list[str]): + def __init__(self, cao: CAOMapping): """ Initializes the Predictor with the context, actions, and outcomes. - :param context: list of context columns - :param actions: list of action columns - :param outcomes: list of outcome columns + :param cao: CAOMapping object with context, actions, and outcomes. """ - self.context = context - self.actions = actions - self.outcomes = outcomes + self.cao = cao @abstractmethod def fit(self, X_train: pd.DataFrame, y_train: pd.Series): diff --git a/src/prsdk/prescriptors/prescriptor.py b/src/prsdk/prescriptors/prescriptor.py index d743fc8..6d06c6e 100644 --- a/src/prsdk/prescriptors/prescriptor.py +++ b/src/prsdk/prescriptors/prescriptor.py @@ -5,16 +5,21 @@ import pandas as pd +from data.cao_mapping import CAOMapping + # pylint: disable=too-few-public-methods class Prescriptor(ABC): """ Abstract class for prescriptors to allow us to experiment with different implementations. """ - def __init__(self, context: list[str], actions: list[str]): - # We keep track of the context and actions to ensure that the prescriptor is compatible with the environment. - self.context = context - self.actions = actions + def __init__(self, cao: CAOMapping): + """ + We keep track of the context, actions, and outcomes in the CAO mapping to ensure the prescriptor is compatible + with the project it's in. + :param cao: CAOMapping object with context, actions, and outcomes. + """ + self.cao = cao @abstractmethod def prescribe(self, context_df: pd.DataFrame) -> pd.DataFrame: From 2cdca674fd93172631636bb05baaf6640b0ee21a Mon Sep 17 00:00:00 2001 From: Daniel Young Date: Tue, 30 Jul 2024 14:55:23 -0700 Subject: [PATCH 05/13] Transferred over unit tests and added some for hf persistence --- tests/persistence/__init__.py | 0 tests/persistence/test_hf_persistence.py | 54 +++++++++++ .../test_predictor_serialization.py | 91 +++++++++++++++++++ tests/predictors/__init__.py | 0 tests/predictors/test_neural_net.py | 56 ++++++++++++ 5 files changed, 201 insertions(+) create mode 100644 tests/persistence/__init__.py create mode 100644 tests/persistence/test_hf_persistence.py create mode 100644 tests/persistence/test_predictor_serialization.py create mode 100644 tests/predictors/__init__.py create mode 100644 tests/predictors/test_neural_net.py diff --git a/tests/persistence/__init__.py b/tests/persistence/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/persistence/test_hf_persistence.py b/tests/persistence/test_hf_persistence.py new file mode 100644 index 0000000..9ef829e --- /dev/null +++ b/tests/persistence/test_hf_persistence.py @@ -0,0 +1,54 @@ +""" +Unit tests for the HuggingFace persistor. +""" +from pathlib import Path +import shutil +import unittest + +import numpy as np +import pandas as pd + +from persistence.persistors.hf_persistor import HuggingFacePersistor +from persistence.serializers.neural_network_serializer import NeuralNetSerializer +from predictors.neural_network.neural_net_predictor import NeuralNetPredictor + + +class TestHuggingFacePersistence(unittest.TestCase): + """ + Tests the HuggingFace Persistor. We can't test the actual upload but we can test the download with an + arbitrary model from HuggingFace. + TODO: We have to update our models to match the new configs that save CAO + """ + def setUp(self): + self.temp_dir = Path("tests/temp") + + def test_download_model(self): + """ + Tests downloading a model from HuggingFace. + """ + url = "danyoung/eluc-global-nn" + serializer = NeuralNetSerializer() + persistor = HuggingFacePersistor(serializer) + + model = persistor.from_pretrained(url, local_dir=str(self.temp_dir / url.replace("/", "--"))) + self.assertTrue(isinstance(model, NeuralNetPredictor)) + self.assertTrue((self.temp_dir / url.replace("/", "--") / "config.json").exists()) + self.assertTrue((self.temp_dir / url.replace("/", "--") / "model.pt").exists()) + self.assertTrue((self.temp_dir / url.replace("/", "--") / "scaler.joblib").exists()) + + def test_predict_model(self): + """ + Tests that you can predict with a downloaded model. + """ + url = "danyoung/eluc-global-nn" + serializer = NeuralNetSerializer() + persistor = HuggingFacePersistor(serializer) + + model = persistor.from_pretrained(url, local_dir=str(self.temp_dir / url.replace("/", "--"))) + test_data = pd.DataFrame({cont: np.random.rand(5) for cont in model.features}) + out = model.predict(test_data) + self.assertEqual(out.shape, (5, 1)) + + def tearDown(self): + if self.temp_dir.exists(): + shutil.rmtree(self.temp_dir) diff --git a/tests/persistence/test_predictor_serialization.py b/tests/persistence/test_predictor_serialization.py new file mode 100644 index 0000000..01dcf43 --- /dev/null +++ b/tests/persistence/test_predictor_serialization.py @@ -0,0 +1,91 @@ +""" +Unit tests for the predictors. +""" +import unittest +import shutil +from pathlib import Path + +import pandas as pd + +from data.cao_mapping import CAOMapping +from persistence.serializers.neural_network_serializer import NeuralNetSerializer +from persistence.serializers.sklearn_serializer import SKLearnSerializer +from predictors.neural_network.neural_net_predictor import NeuralNetPredictor +from predictors.sklearn_predictors.linear_regression_predictor import LinearRegressionPredictor +from predictors.sklearn_predictors.random_forest_predictor import RandomForestPredictor + + +class TestPredictorSerialization(unittest.TestCase): + """ + Tests the 3 base predictor implementations' saving and loading behavior. + """ + def setUp(self): + """ + We set the models up like this so that in test_loaded_same we can instantiate + 2 models with the same parameters, load one from the other's save, and check if + their predictions are the same. + """ + self.cao = CAOMapping(["a", "b"], ["c"], ["label"]) + self.models = [ + NeuralNetPredictor, + LinearRegressionPredictor, + RandomForestPredictor + ] + self.serializers = [ + NeuralNetSerializer(), + SKLearnSerializer(), + SKLearnSerializer() + ] + self.configs = [ + {'hidden_sizes': [4], 'epochs': 1, 'batch_size': 1, 'device': 'cpu'}, + {'n_jobs': -1}, + {'n_jobs': -1, "n_estimators": 10, "max_depth": 2} + ] + self.dummy_data = pd.DataFrame({"a": [1, 2, 3, 4], "b": [4, 5, 6, 4], "c": [7, 8, 9, 4]}) + self.dummy_target = pd.Series([1, 2, 3, 4], name="label") + self.temp_path = Path("tests/temp") + + def test_save_file_names(self): + """ + Checks to make sure the model's save method creates the correct files. + """ + save_file_names = [ + ["model.pt", "config.json", "scaler.joblib"], + ["model.joblib", "config.json"], + ["model.joblib", "config.json"] + ] + for model, serializer, config, test_names in zip(self.models, self.serializers, self.configs, save_file_names): + with self.subTest(model=model): + predictor = model(self.cao, config) + predictor.fit(self.dummy_data, self.dummy_target) + serializer.save(predictor, self.temp_path) + files = [f.name for f in self.temp_path.glob("**/*") if f.is_file()] + self.assertEqual(set(files), set(test_names)) + shutil.rmtree(self.temp_path) + self.assertFalse(self.temp_path.exists()) + + def test_loaded_same(self): + """ + Makes sure a predictor's predictions are consistent before and after saving/loading. + Fits a predictor then saves and loads it, then checks if the predictions are the same. + """ + for model, serializer, config in zip(self.models, self.serializers, self.configs): + with self.subTest(model=model): + predictor = model(self.cao, config) + predictor.fit(self.dummy_data.iloc[:2], self.dummy_target.iloc[:2]) + output = predictor.predict(self.dummy_data.iloc[2:]) + serializer.save(predictor, self.temp_path) + + loaded = serializer.load(self.temp_path) + loaded_output = loaded.predict(self.dummy_data.iloc[2:]) + + self.assertTrue((output == loaded_output).all().all()) + shutil.rmtree(self.temp_path) + self.assertFalse(self.temp_path.exists()) + + def tearDown(self): + """ + Removes the temp directory if it exists. + """ + if self.temp_path.exists(): + shutil.rmtree(self.temp_path) diff --git a/tests/predictors/__init__.py b/tests/predictors/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/predictors/test_neural_net.py b/tests/predictors/test_neural_net.py new file mode 100644 index 0000000..c1bf23e --- /dev/null +++ b/tests/predictors/test_neural_net.py @@ -0,0 +1,56 @@ +""" +Unit tests for the NeuralNetPredictor class. +""" +import unittest + +import pandas as pd + +from data.cao_mapping import CAOMapping +from predictors.neural_network.neural_net_predictor import NeuralNetPredictor + + +class TestNeuralNet(unittest.TestCase): + """ + Specifically tests the neural net predictor + """ + def setUp(self): + self.cao = CAOMapping(["a", "b"], ["c"], ["label"]) + + def test_single_input(self): + """ + Tests the neural net with a single input. + """ + predictor = NeuralNetPredictor(self.cao, {"hidden_sizes": [4], "epochs": 1, "batch_size": 1, "device": "cpu"}) + + train_data = pd.DataFrame({"a": [1], "b": [2], "c": [3], "label": [4]}) + test_data = pd.DataFrame({"a": [4], "b": [5], "c": [6]}) + + predictor.fit(train_data[['a', 'b', 'c']], train_data['label']) + out = predictor.predict(test_data) + self.assertEqual(out.shape, (1, 1)) + + def test_multi_input(self): + """ + Tests the neural net with multiple inputs. + """ + predictor = NeuralNetPredictor(self.cao, {"hidden_sizes": [4], "epochs": 1, "batch_size": 1, "device": "cpu"}) + + train_data = pd.DataFrame({"a": [1, 2], "b": [2, 3], "c": [3, 4], "label": [4, 5]}) + test_data = pd.DataFrame({"a": [4, 5], "b": [5, 6], "c": [6, 7]}) + + predictor.fit(train_data[['a', 'b', 'c']], train_data['label']) + out = predictor.predict(test_data) + self.assertEqual(out.shape, (2, 1)) + + def test_batched_input(self): + """ + Tests the neural network with batched inputs. + """ + predictor = NeuralNetPredictor(self.cao, {"hidden_sizes": [4], "epochs": 1, "batch_size": 2, "device": "cpu"}) + + train_data = pd.DataFrame({"a": [1, 2, 3], "b": [2, 3, 4], "c": [3, 4, 5], "label": [4, 5, 6]}) + test_data = pd.DataFrame({"a": [4, 5], "b": [5, 6], "c": [6, 7]}) + + predictor.fit(train_data[['a', 'b', 'c']], train_data['label']) + out = predictor.predict(test_data) + self.assertEqual(out.shape, (2, 1)) From 2abe9788c78705c55529387952071e56532a28cf Mon Sep 17 00:00:00 2001 From: Daniel Young Date: Tue, 30 Jul 2024 14:55:42 -0700 Subject: [PATCH 06/13] Updated requirements --- requirements.txt | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index c3df1f6..a16e1e7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,10 @@ coverage==7.6.0 flake8==7.1.0 huggingface_hub==0.24.3 +joblib==1.2.0 +numpy==1.23.5 pandas==1.5.3 -pylint==3.2.6 \ No newline at end of file +pylint==3.2.6 +scikit-learn==1.2.2 +tensorboard==2.13.0 +torch==2.3.1 \ No newline at end of file From 2677243544be7e06b2b9b0945bf98b916af6a1b3 Mon Sep 17 00:00:00 2001 From: Daniel Young Date: Tue, 30 Jul 2024 14:56:05 -0700 Subject: [PATCH 07/13] Added serializers --- .../persistence/persistors/hf_persistor.py | 1 - .../serializers/neural_network_serializer.py | 80 +++++++++++++++++++ .../serializers/sklearn_serializer.py | 54 +++++++++++++ 3 files changed, 134 insertions(+), 1 deletion(-) create mode 100644 src/prsdk/persistence/serializers/neural_network_serializer.py create mode 100644 src/prsdk/persistence/serializers/sklearn_serializer.py diff --git a/src/prsdk/persistence/persistors/hf_persistor.py b/src/prsdk/persistence/persistors/hf_persistor.py index efc0f26..2ea3994 100644 --- a/src/prsdk/persistence/persistors/hf_persistor.py +++ b/src/prsdk/persistence/persistors/hf_persistor.py @@ -64,7 +64,6 @@ def from_pretrained(self, path_or_url: str, **hf_args): url_path = path_or_url.replace("/", "--") local_dir = hf_args.get("local_dir", f"~/.cache/huggingface/project-resilience/{url_path}") - if not Path(local_dir).exists() or not Path(local_dir).is_dir(): hf_args["local_dir"] = local_dir snapshot_download(repo_id=path_or_url, **hf_args) diff --git a/src/prsdk/persistence/serializers/neural_network_serializer.py b/src/prsdk/persistence/serializers/neural_network_serializer.py new file mode 100644 index 0000000..acac90d --- /dev/null +++ b/src/prsdk/persistence/serializers/neural_network_serializer.py @@ -0,0 +1,80 @@ +""" +Serializer for the Neural Network Predictor class. +""" +import json +from pathlib import Path + +import joblib +import torch + +from data.cao_mapping import CAOMapping +from persistence.serializers.serializer import Serializer +from predictors.neural_network.torch_neural_net import TorchNeuralNet +from predictors.neural_network.neural_net_predictor import NeuralNetPredictor + + +class NeuralNetSerializer(Serializer): + """ + Serializer for the NeuralNetPredictor. + Saves config necessary to recreate the model, the model itself, and the scaler for the data to a folder. + """ + def save(self, model: NeuralNetPredictor, path: Path): + """ + Saves model, config, and scaler into format for loading. + Generates path to folder if it does not exist. + :param model: the neural network predictor to save. + :param path: path to folder to save model files. + """ + if model.model is None: + raise ValueError("Model not fitted yet.") + path.mkdir(parents=True, exist_ok=True) + + config = { + "context": model.cao.context, + "actions": model.cao.actions, + "outcomes": model.cao.outcomes, + "features": model.features, + "label": model.label, + "hidden_sizes": model.hidden_sizes, + "linear_skip": model.linear_skip, + "dropout": model.dropout, + "device": model.device, + "epochs": model.epochs, + "batch_size": model.batch_size, + "optim_params": model.optim_params, + "train_pct": model.train_pct, + "step_lr_params": model.step_lr_params + } + with open(path / "config.json", "w", encoding="utf-8") as file: + json.dump(config, file) + torch.save(model.model.state_dict(), path / "model.pt") + joblib.dump(model.scaler, path / "scaler.joblib") + + def load(self, path: Path) -> NeuralNetPredictor: + """ + Loads a model from a given folder. Creates empty model with config, then loads model state dict and scaler. + :param path: path to folder containing model files. + """ + if not path.exists() or not path.is_dir(): + raise FileNotFoundError(f"Path {path} does not exist.") + if not (path / "config.json").exists() or \ + not (path / "model.pt").exists() or \ + not (path / "scaler.joblib").exists(): + raise FileNotFoundError("Model files not found in path.") + + # Initialize model with config + with open(path / "config.json", "r", encoding="utf-8") as file: + config = json.load(file) + # Grab CAO out of config + cao = CAOMapping(config.pop("context"), config.pop("actions"), config.pop("outcomes")) + nnp = NeuralNetPredictor(cao, config) + + nnp.model = TorchNeuralNet(len(config["features"]), + config["hidden_sizes"], + config["linear_skip"], + config["dropout"]) + nnp.model.load_state_dict(torch.load(path / "model.pt")) + nnp.model.to(config["device"]) + nnp.model.eval() + nnp.scaler = joblib.load(path / "scaler.joblib") + return nnp diff --git a/src/prsdk/persistence/serializers/sklearn_serializer.py b/src/prsdk/persistence/serializers/sklearn_serializer.py new file mode 100644 index 0000000..8bccc6f --- /dev/null +++ b/src/prsdk/persistence/serializers/sklearn_serializer.py @@ -0,0 +1,54 @@ +""" +Serializer for the SKLearnPredictor class. +""" +import json +from pathlib import Path + +import joblib + +from data.cao_mapping import CAOMapping +from persistence.serializers.serializer import Serializer +from predictors.sklearn_predictors.sklearn_predictor import SKLearnPredictor + + +class SKLearnSerializer(Serializer): + """ + Serializer for the SKLearnPredictor. + Uses joblib to save the model and json to save the config used to load it. + """ + def save(self, model: SKLearnPredictor, path: Path): + """ + Saves saves model and features into format for loading. + Generates path to folder if it does not exist. + :param path: path to folder to save model files. + """ + path.mkdir(parents=True, exist_ok=True) + + # Add CAO to the config + config = dict(model.config.items()) + cao_dict = {"context": model.cao.context, "actions": model.cao.actions, "outcomes": model.cao.outcomes} + config.update(cao_dict) + + with open(path / "config.json", "w", encoding="utf-8") as file: + json.dump(config, file) + joblib.dump(model.model, path / "model.joblib") + + def load(self, path: Path) -> "SKLearnPredictor": + """ + Loads saved model and config from a local folder. + :param path: path to folder to load model files from. + """ + load_path = Path(path) + if not load_path.exists() or not load_path.is_dir(): + raise FileNotFoundError(f"Path {path} does not exist.") + if not (load_path / "config.json").exists() or not (load_path / "model.joblib").exists(): + raise FileNotFoundError("Model files not found in path.") + + # Extract CAO from config + with open(load_path / "config.json", "r", encoding="utf-8") as file: + config = json.load(file) + cao = CAOMapping(config.pop("context"), config.pop("actions"), config.pop("outcomes")) + + model = joblib.load(load_path / "model.joblib") + sklearn_predictor = SKLearnPredictor(cao, model, config) + return sklearn_predictor From ec589d2b3c42f35ca7a31f8597984413e70406bf Mon Sep 17 00:00:00 2001 From: Daniel Young Date: Tue, 30 Jul 2024 15:03:02 -0700 Subject: [PATCH 08/13] Updated workflow to properly set pythonpath --- .github/workflows/sdk.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/sdk.yml b/.github/workflows/sdk.yml index 3fcc407..6dee78a 100644 --- a/.github/workflows/sdk.yml +++ b/.github/workflows/sdk.yml @@ -17,6 +17,8 @@ jobs: uses: actions/setup-python@v3 with: python-version: "3.10" + - name: Set PYTHONPATH + run: echo "PYTHONPATH=$PWD/src/prsdk" >> $GITHUB_ENV - name: Install dependencies run: | python -m pip install --upgrade pip From 6c679d1e667553b661a4b1612deea742a53b6455 Mon Sep 17 00:00:00 2001 From: Daniel Young Date: Tue, 30 Jul 2024 15:10:23 -0700 Subject: [PATCH 09/13] Updated nn serializer to not automatically load onto mps --- .../persistence/serializers/neural_network_serializer.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/prsdk/persistence/serializers/neural_network_serializer.py b/src/prsdk/persistence/serializers/neural_network_serializer.py index acac90d..dfa9d86 100644 --- a/src/prsdk/persistence/serializers/neural_network_serializer.py +++ b/src/prsdk/persistence/serializers/neural_network_serializer.py @@ -47,12 +47,15 @@ def save(self, model: NeuralNetPredictor, path: Path): } with open(path / "config.json", "w", encoding="utf-8") as file: json.dump(config, file) + # Put model on CPU before saving + model.model.to("cpu") torch.save(model.model.state_dict(), path / "model.pt") joblib.dump(model.scaler, path / "scaler.joblib") def load(self, path: Path) -> NeuralNetPredictor: """ Loads a model from a given folder. Creates empty model with config, then loads model state dict and scaler. + NOTE: We don't put the model back on the device it was trained on. This has to be done manually. :param path: path to folder containing model files. """ if not path.exists() or not path.is_dir(): @@ -74,7 +77,6 @@ def load(self, path: Path) -> NeuralNetPredictor: config["linear_skip"], config["dropout"]) nnp.model.load_state_dict(torch.load(path / "model.pt")) - nnp.model.to(config["device"]) nnp.model.eval() nnp.scaler = joblib.load(path / "scaler.joblib") return nnp From 6a871e03b35b24f4c17bbb18a031abe5b47a9593 Mon Sep 17 00:00:00 2001 From: Daniel Young Date: Tue, 30 Jul 2024 15:13:25 -0700 Subject: [PATCH 10/13] Set map location to CPU for NN loading to avoid MPS issues --- src/prsdk/persistence/serializers/neural_network_serializer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/prsdk/persistence/serializers/neural_network_serializer.py b/src/prsdk/persistence/serializers/neural_network_serializer.py index dfa9d86..977257b 100644 --- a/src/prsdk/persistence/serializers/neural_network_serializer.py +++ b/src/prsdk/persistence/serializers/neural_network_serializer.py @@ -76,7 +76,8 @@ def load(self, path: Path) -> NeuralNetPredictor: config["hidden_sizes"], config["linear_skip"], config["dropout"]) - nnp.model.load_state_dict(torch.load(path / "model.pt")) + # Set map_location to CPU to avoid issues with GPU availability + nnp.model.load_state_dict(torch.load(path / "model.pt", map_location="cpu")) nnp.model.eval() nnp.scaler = joblib.load(path / "scaler.joblib") return nnp From 1660c7747fa63418f5ad9b73efd1ba4dd8564bb4 Mon Sep 17 00:00:00 2001 From: Daniel Young Date: Tue, 30 Jul 2024 15:21:14 -0700 Subject: [PATCH 11/13] Move values in prediction based on model's device vs self.device which may no longer match --- src/prsdk/predictors/neural_network/neural_net_predictor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/prsdk/predictors/neural_network/neural_net_predictor.py b/src/prsdk/predictors/neural_network/neural_net_predictor.py index d8a341f..982c42a 100644 --- a/src/prsdk/predictors/neural_network/neural_net_predictor.py +++ b/src/prsdk/predictors/neural_network/neural_net_predictor.py @@ -199,7 +199,7 @@ def predict(self, context_actions_df: pd.DataFrame) -> pd.DataFrame: with torch.no_grad(): self.model.eval() for X, _ in test_dl: - X = X.to(self.device) + X = X.to(self.model.device) pred_list.append(self.model(X)) # Flatten into a single numpy array if we have multiple batches From 7d35fd07fb668801907bf495fe9b8f8b74dca48c Mon Sep 17 00:00:00 2001 From: Daniel Young Date: Tue, 30 Jul 2024 15:26:21 -0700 Subject: [PATCH 12/13] Does not save device model was trained on to avoid confusion. Added method to manually set model's device. --- .../serializers/neural_network_serializer.py | 2 +- .../predictors/neural_network/neural_net_predictor.py | 11 ++++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/prsdk/persistence/serializers/neural_network_serializer.py b/src/prsdk/persistence/serializers/neural_network_serializer.py index 977257b..b39e03c 100644 --- a/src/prsdk/persistence/serializers/neural_network_serializer.py +++ b/src/prsdk/persistence/serializers/neural_network_serializer.py @@ -29,6 +29,7 @@ def save(self, model: NeuralNetPredictor, path: Path): raise ValueError("Model not fitted yet.") path.mkdir(parents=True, exist_ok=True) + # Note: we don't save the model's device, as it's not guaranteed to be available on load config = { "context": model.cao.context, "actions": model.cao.actions, @@ -38,7 +39,6 @@ def save(self, model: NeuralNetPredictor, path: Path): "hidden_sizes": model.hidden_sizes, "linear_skip": model.linear_skip, "dropout": model.dropout, - "device": model.device, "epochs": model.epochs, "batch_size": model.batch_size, "optim_params": model.optim_params, diff --git a/src/prsdk/predictors/neural_network/neural_net_predictor.py b/src/prsdk/predictors/neural_network/neural_net_predictor.py index 982c42a..a01f7a3 100644 --- a/src/prsdk/predictors/neural_network/neural_net_predictor.py +++ b/src/prsdk/predictors/neural_network/neural_net_predictor.py @@ -199,7 +199,7 @@ def predict(self, context_actions_df: pd.DataFrame) -> pd.DataFrame: with torch.no_grad(): self.model.eval() for X, _ in test_dl: - X = X.to(self.model.device) + X = X.to(self.device) pred_list.append(self.model(X)) # Flatten into a single numpy array if we have multiple batches @@ -208,3 +208,12 @@ def predict(self, context_actions_df: pd.DataFrame) -> pd.DataFrame: else: y_pred = pred_list[0].cpu().numpy() return pd.DataFrame(y_pred, index=context_actions_df.index, columns=[self.label]) + + def set_device(self, device: str): + """ + Sets the device to run the model on. + """ + self.device = device + if self.model: + self.model.to(device) + From 418d67611c45773ff8c3deb48d41cfa924f0358e Mon Sep 17 00:00:00 2001 From: Daniel Young Date: Tue, 30 Jul 2024 15:30:53 -0700 Subject: [PATCH 13/13] Removed erroneous white space --- src/prsdk/predictors/neural_network/neural_net_predictor.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/prsdk/predictors/neural_network/neural_net_predictor.py b/src/prsdk/predictors/neural_network/neural_net_predictor.py index a01f7a3..8542ec6 100644 --- a/src/prsdk/predictors/neural_network/neural_net_predictor.py +++ b/src/prsdk/predictors/neural_network/neural_net_predictor.py @@ -208,7 +208,7 @@ def predict(self, context_actions_df: pd.DataFrame) -> pd.DataFrame: else: y_pred = pred_list[0].cpu().numpy() return pd.DataFrame(y_pred, index=context_actions_df.index, columns=[self.label]) - + def set_device(self, device: str): """ Sets the device to run the model on. @@ -216,4 +216,3 @@ def set_device(self, device: str): self.device = device if self.model: self.model.to(device) -