Skip to content

Commit

Permalink
Update tool.py
Browse files Browse the repository at this point in the history
* Add `FeatureSelection` class
  • Loading branch information
jafetcc02 committed Mar 19, 2024
1 parent b06da1f commit 0250dac
Showing 1 changed file with 230 additions and 7 deletions.
237 changes: 230 additions & 7 deletions likelihood/tools/tools.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import math
import os
import pickle
from typing import Callable, List, Tuple

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf
from numpy import ndarray
from pandas.core.frame import DataFrame

Expand Down Expand Up @@ -341,6 +344,103 @@ def get_period(dataset: ndarray) -> float:
return period


def sigmoide_inv(y: float) -> float:
"""Calculates the inverse of the sigmoid function
Args:
y (`float`): the number to evaluate the function
Returns:
`float`: value of evaluated function
"""

return math.log(y / (1 - y))


def sigmoide(x: float) -> float:

return 1 / (1 + math.exp(-x))


class LogisticRegression:
"""class implementing multiple logistic regression"""

__slots__ = ["importance", "X", "y"]

def __init__(self) -> None:
"""The class initializer"""

self.importance = []

def fit(self, dataset: ndarray, values: ndarray) -> None:
"""Performs linear multiple model training
Parameters
----------
dataset : `np.array`
An array containing the scaled data.
values : `np.ndarray`
A set of values returned by the linear function.
Returns
-------
importance : `np.array`
An array containing the importance of each feature.
"""

self.X = dataset
self.y = values

U, S, VT = np.linalg.svd(self.X, full_matrices=False)

inverse_sig = np.vectorize(sigmoide_inv)
w = (VT.T @ np.linalg.inv(np.diag(S)) @ U.T).T @ inverse_sig(self.y)

if self.y.shape[1] > 1:
for row in w:
self.importance.append(np.around(np.max(row), decimals=8))
else:
for i in range(self.X.shape[0]):
a = np.around(w[i], decimals=8)
self.importance.append(a)

def predict(self, datapoints: ndarray) -> ndarray:
"""
Performs predictions for a set of points
Parameters
----------
datapoints : `np.array`
An array containing the values of the independent variable.
"""
sig = np.vectorize(sigmoide)

return sig(np.array(self.importance) @ datapoints)

def get_importances(self, print_important_features: bool = False) -> ndarray:
"""
Returns the important features
Parameters
----------
print_important_features : `bool`
determines whether or not are printed on the screen. By default it is set to `False`.
Returns
-------
importance : `np.array`
An array containing the importance of each feature.
"""
if print_important_features:
for i, a in enumerate(self.importance):
print(f"The importance of the {i+1} feature is {a}")
return np.array(self.importance)


class LinearRegression:
"""class implementing multiple linear regression"""

Expand All @@ -351,7 +451,7 @@ def __init__(self) -> None:

self.importance = []

def fit(self, dataset: ndarray, values: ndarray) -> None:
def fit(self, dataset: ndarray, values: ndarray, verbose: bool = False) -> None:
"""Performs linear multiple model training
Parameters
Expand All @@ -378,10 +478,11 @@ def fit(self, dataset: ndarray, values: ndarray) -> None:
a = np.around(w[i], decimals=8)
self.importance.append(a)

print("\nSummary:")
print("--------")
print("\nParameters:", np.array(self.importance).shape)
print("RMSE: {:.4f}".format(mean_square_error(self.y, self.predict(self.X))))
if verbose:
print("\nSummary:")
print("--------")
print("\nParameters:", np.array(self.importance).shape)
print("RMSE: {:.4f}".format(mean_square_error(self.y, self.predict(self.X))))

def predict(self, datapoints: ndarray) -> ndarray:
"""
Expand Down Expand Up @@ -443,7 +544,7 @@ def cal_average(y, alpha: float = 1):
class DataScaler:
"""numpy array `scaler` and `rescaler`"""

__slots__ = ["dataset_", "_n", "data_scaled", "values"]
__slots__ = ["dataset_", "_n", "data_scaled", "values", "transpose"]

def __init__(self, dataset: ndarray, n: int = 1) -> None:
"""Initializes the parameters required for scaling the data"""
Expand All @@ -470,15 +571,20 @@ def rescale(self) -> ndarray:
msg = "Trying to access an item at an invalid index."
print(f"{error_type}: {msg}")
return None
if self.dataset_.shape[0] > self.dataset_.shape[1]:
self.dataset_ = self.dataset_.T
self.transpose = True
else:
self.transpose = False
for i in range(self.dataset_.shape[0]):
if self._n != None:
fit = np.polyfit(xaxis, self.dataset_[i, :], self._n)
f = np.poly1d(fit)
poly = f(xaxis)
fitting.append(f)
self.data_scaled[i, :] += -poly
else:
fitting.append(0.0)
self.data_scaled[i, :] += -poly
mu.append(np.min(self.data_scaled[i, :]))
if np.max(self.data_scaled[i, :]) != 0:
sigma.append(np.max(self.data_scaled[i, :]) - mu[i])
Expand All @@ -504,6 +610,8 @@ def scale(self, dataset_: ndarray) -> ndarray:
dataset_ : `np.array`
An array containing the rescaled data.
"""
if self.transpose:
dataset_ = dataset_.T
for i in range(dataset_.shape[0]):
dataset_[i, :] += 1
dataset_[i, :] /= 2
Expand Down Expand Up @@ -775,6 +883,121 @@ def _confu_mat(self, y_true: ndarray, y_pred: ndarray, labels: list) -> ndarray:
return count_mat


class FeatureSelection:
"""
Class with method to obtain feature selection of a dataset. Returns string
"""

__slots__ = ["not_features", "X"]

def __init__(self, not_features: list = []) -> None:
"""The class initializer. The initial parameter is a string with variables"""
self.not_features = not_features

def feature_selection(self, dataset: DataFrame, n_importances: int) -> str:

# Asignar y limpiar dataset
self.load_data(dataset)

curr_dataset = self.X
columns = list(curr_dataset.columns)

# Construimos string de causal_graph
feature_string = " digraph { "
for column in columns:
feature_string += column + "; "

numeric_df = curr_dataset.select_dtypes(include="number")
scaler = DataScaler(numeric_df.copy().to_numpy(), n=None)
numeric_scaled = scaler.rescale()
numeric_df = pd.DataFrame(numeric_scaled, columns=numeric_df.columns)
curr_dataset[numeric_df.columns] = numeric_df

# Iteramos sobre todas las columnas para obtener sus importances
for column in columns:

# Variable a predecir
Y = curr_dataset[column]

# Verificamos si es numerica o es categorica
column_type = Y.dtype
if column_type != "object":
# Modelo de regresion lineal
Model = LinearRegression()

# Dataset auxiliar sin la columna en cuestion
X_aux = curr_dataset.drop([column], axis=1)

# Codificamos
dfe = DataFrameEncoder(X_aux)
encoded_df = dfe.encode(save_mode=False)
# Entrenamos

Model.fit(encoded_df.to_numpy().T, Y.to_numpy().T)
# Obtenemos importance
importance = Model.get_importances()
else:
Model = LogisticRegression()
num_unique_entries = curr_dataset[column].nunique()

quick_encoder = DataFrameEncoder(Y.to_frame())
encoded_Y = quick_encoder.encode(save_mode=False)
# Mapeamos a one-hot
train_y = (tf.one_hot(encoded_Y[column], num_unique_entries)).numpy()
# PASAMOS 0 -> 0.5 y 1 -> 0.73105
for i in range(len(train_y)):
for j in range(num_unique_entries):
if train_y[i][j] == 1.0:
train_y[i][j] = 0.73105
else:
train_y[i][j] = 0.5

# Eliminamos la columna en cuestión
X_aux = curr_dataset.drop([column], axis=1)

# Codificamos
dfe = DataFrameEncoder(X_aux)
encoded_df = dfe.encode(save_mode=False)

# Entrenamos
Model.fit(encoded_df.to_numpy().T, train_y)

# Obtenemos importancias
importance = Model.get_importances()

# Obtenemos las n más importantes
top_n_indexes = sorted(
range(len(importance)), key=lambda i: importance[i], reverse=True
)[:n_importances]

# Construimos el string de la columna en cuestión
names_cols = list(X_aux.columns)

# Lo formateamos
for i in top_n_indexes:
feature_string += names_cols[i] + " -> "

feature_string += column + "; "

return feature_string + "} "

def load_data(self, dataset: DataFrame):
# Asignamos datos y limpiamos dataset de columnas no requeridas

if len(self.not_features) > 0:
# Quitamos columnas innecesarias
self.X = dataset.drop(self.not_features, axis=1)

else:
self.X = dataset

self.X.replace([np.inf, -np.inf], np.nan, inplace=True)
self.X.replace(" ", np.nan, inplace=True)
self.X.dropna(inplace=True)
self.X = self.X.reset_index()
self.X = self.X.drop(columns=["index"])


# -------------------------------------------------------------------------
if __name__ == "__main__":
y_true = np.array([1, 2, 2, 1, 1])
Expand Down

0 comments on commit 0250dac

Please sign in to comment.