Skip to content

Commit

Permalink
introduce phenoStats.py
Browse files Browse the repository at this point in the history
close #25
  • Loading branch information
abearab committed Dec 26, 2023
1 parent 44ca695 commit 7fb521c
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 131 deletions.
229 changes: 98 additions & 131 deletions screenpro/phenoScore.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,101 @@
import numpy as np
import pandas as pd
from pydeseq2 import preprocessing
from scipy.stats import ttest_rel
from statsmodels.stats.multitest import multipletests
from .phenoStats import matrixStat, getFDR


def calculateDelta(x, y, math, ave):
"""
Calculate log ratio of y / x.
`ave` == 'all' (i.e. averaged across all values, oligo and replicates)
`ave` == 'col' (i.e. averaged across columns, replicates)
Args:
x (np.array): array of values
y (np.array): array of values
math (str): math to use for calculating score
ave (str): average method
Returns:
np.array: array of log ratio values
"""
if ave == 'all':
# average across all values
if math == 'log2(x+1)':
return np.mean(np.log2(y+1) - np.log2(x+1))
elif math == 'log10':
return np.mean(np.log10(y) - np.log10(x))
elif math == 'log1p':
return np.mean(np.log1p(y) - np.log1p(x))
elif ave == 'row':
# average across rows
if math == 'log2(x+1)':
return np.mean(np.log2(y+1) - np.log2(x+1), axis=0)
elif math == 'log10':
return np.mean(np.log10(y) - np.log10(x), axis=0)
elif math == 'log1p':
return np.mean(np.log1p(y) - np.log1p(x), axis=0)
elif ave == 'col':
# average across columns
if math == 'log2(x+1)':
return np.mean(np.log2(y+1) - np.log2(x+1), axis=1)
elif math == 'log10':
return np.mean(np.log10(y) - np.log10(x), axis=1)
elif math == 'log1p':
return np.mean(np.log1p(y) - np.log1p(x), axis=1)


def calculatePhenotypeScore(x, y, x_ctrl, y_ctrl, growth_rate, math, ave):
"""
Calculate phenotype score normalized by negative control and growth rate.
Args:
x (np.array): array of values
y (np.array): array of values
x_ctrl (np.array): array of values
y_ctrl (np.array): array of values
growth_rate (int): growth rate
math (str): math to use for calculating score
ave (str): average method
Returns:
np.array: array of scores
"""
# calculate control median and std
ctrl_median = np.median(calculateDelta(x=x_ctrl, y=y_ctrl, math=math, ave=ave))

# calculate delta
delta = calculateDelta(x=x, y=y, math=math, ave=ave)

# calculate score
return (delta - ctrl_median) / growth_rate

def matrixTest(x, y, x_ctrl, y_ctrl, math, ave_reps, test = 'ttest', growth_rate = 1):
"""
Calculate phenotype score and p-values comparing `y` vs `x` matrices.
Args:
x (np.array): array of values
y (np.array): array of values
x_ctrl (np.array): array of values
y_ctrl (np.array): array of values
math (str): math to use for calculating score
ave_reps (bool): average replicates
test (str): test to use for calculating p-value
growth_rate (int): growth rate
Returns:
np.array: array of scores
np.array: array of p-values
"""
# check if average across replicates
ave = 'col' if ave_reps else 'all'

# calculate growth score
scores = calculatePhenotypeScore(
x = x, y = y, x_ctrl = x_ctrl, y_ctrl = y_ctrl,
growth_rate = growth_rate, math = math,
ave = ave
)

# compute p-value
p_values = matrixStat(x, y, test=test, ave_reps=ave_reps)

return scores, p_values


def runPhenoScore(adata, cond1, cond2, math, test, score_level,
Expand Down Expand Up @@ -56,20 +149,16 @@ def runPhenoScore(adata, cond1, cond2, math, test, score_level,
math=math, ave_reps=True, test=test,
growth_rate=growth_rate
)
# fill na with 1
p_values[np.isnan(p_values)] = 1
# Calculate the adjusted p-values using the Benjamini-Hochberg method
if p_values is None:
raise ValueError('p_values is None')
_, adj_pvalues, _, _ = multipletests(p_values, alpha=0.05, method='fdr_bh')
# get adjusted p-values
adj_p_values = getFDR(p_values)
# get targets
targets = adata.var.index.str.split('_[-,+]_').str[0].to_list()
# combine results into a dataframe
result = pd.concat([
pd.Series(targets, index=adata.var.index, name='target'),
pd.Series(scores, index=adata.var.index, name='score'),
pd.Series(p_values, index=adata.var.index, name=f'{test} pvalue'),
pd.Series(adj_pvalues, index=adata.var.index, name='BH adj_pvalue'),
pd.Series(adj_p_values, index=adata.var.index, name='BH adj_pvalue'),
], axis=1)

return result_name, result
Expand Down Expand Up @@ -113,69 +202,6 @@ def addPseudoCount():
# 'Pseudocount behavior not recognized or not implemented')


def calculateDelta(x, y, math, ave):
"""
Calculate log ratio of y / x.
`ave` == 'all' (i.e. averaged across all values, oligo and replicates)
`ave` == 'col' (i.e. averaged across columns, replicates)
Args:
x (np.array): array of values
y (np.array): array of values
math (str): math to use for calculating score
ave (str): average method
Returns:
np.array: array of log ratio values
"""
if ave == 'all':
# average across all values
if math == 'log2(x+1)':
return np.mean(np.log2(y+1) - np.log2(x+1))
elif math == 'log10':
return np.mean(np.log10(y) - np.log10(x))
elif math == 'log1p':
return np.mean(np.log1p(y) - np.log1p(x))
elif ave == 'row':
# average across rows
if math == 'log2(x+1)':
return np.mean(np.log2(y+1) - np.log2(x+1), axis=0)
elif math == 'log10':
return np.mean(np.log10(y) - np.log10(x), axis=0)
elif math == 'log1p':
return np.mean(np.log1p(y) - np.log1p(x), axis=0)
elif ave == 'col':
# average across columns
if math == 'log2(x+1)':
return np.mean(np.log2(y+1) - np.log2(x+1), axis=1)
elif math == 'log10':
return np.mean(np.log10(y) - np.log10(x), axis=1)
elif math == 'log1p':
return np.mean(np.log1p(y) - np.log1p(x), axis=1)


def calculatePhenotypeScore(x, y, x_ctrl, y_ctrl, growth_rate, math, ave):
"""
Calculate phenotype score normalized by negative control and growth rate.
Args:
x (np.array): array of values
y (np.array): array of values
x_ctrl (np.array): array of values
y_ctrl (np.array): array of values
growth_rate (int): growth rate
math (str): math to use for calculating score
ave (str): average method
Returns:
np.array: array of scores
"""
# calculate control median and std
ctrl_median = np.median(calculateDelta(x=x_ctrl, y=y_ctrl, math=math, ave=ave))

# calculate delta
delta = calculateDelta(x=x, y=y, math=math, ave=ave)

# calculate score
return (delta - ctrl_median) / growth_rate


def calculateZScorePhenotypeScore(x, y, x_ctrl, y_ctrl, growth_rate, math, ave):
pass
# # calculate control median and std
Expand Down Expand Up @@ -270,62 +296,3 @@ def generatePseudoGeneLabels(adata, num_pseudogenes=None, ctrl_label='negCtrl'):
adata.var.loc[adata.var.targetType.eq('gene'), 'pseudoLabel'] = 'gene'
adata.var.loc[adata.var.pseudoLabel.eq(''), 'pseudoLabel'] = np.nan


def matrixStat(x, y, test, ave_reps):
"""
Get p-values comparing `y` vs `x` matrices.
Args:
x (np.array): array of values
y (np.array): array of values
test (str): test to use for calculating p-value
ave_reps (bool): average replicates
Returns:
np.array: array of p-values
"""
# calculate p-values
if test == 'MW':
# run Mann-Whitney U rank test
pass
elif test == 'ttest':
# run ttest
if ave_reps:
# average across replicates
p_value = ttest_rel(y, x, axis=1)[1]
else:
# average across all values
p_value = ttest_rel(y, x)[1]
return p_value
else:
raise ValueError(f'Test "{test}" not recognized')


def matrixTest(x, y, x_ctrl, y_ctrl, math, ave_reps, test = 'ttest', growth_rate = 1):
"""
Calculate phenotype score and p-values comparing `y` vs `x` matrices.
Args:
x (np.array): array of values
y (np.array): array of values
x_ctrl (np.array): array of values
y_ctrl (np.array): array of values
math (str): math to use for calculating score
ave_reps (bool): average replicates
test (str): test to use for calculating p-value
growth_rate (int): growth rate
Returns:
np.array: array of scores
np.array: array of p-values
"""
# check if average across replicates
ave = 'col' if ave_reps else 'all'

# calculate growth score
scores = calculatePhenotypeScore(
x = x, y = y, x_ctrl = x_ctrl, y_ctrl = y_ctrl,
growth_rate = growth_rate, math = math,
ave = ave
)

# compute p-value
p_values = matrixStat(x, y, test=test, ave_reps=ave_reps)

return scores, p_values
49 changes: 49 additions & 0 deletions screenpro/phenoStats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""
phenoStats module
"""

from scipy.stats import ttest_rel
import numpy as np
from statsmodels.stats.multitest import multipletests


def matrixStat(x, y, test, ave_reps):
"""
Get p-values comparing `y` vs `x` matrices.
Args:
x (np.array): array of values
y (np.array): array of values
test (str): test to use for calculating p-value
ave_reps (bool): average replicates
Returns:
np.array: array of p-values
"""
# calculate p-values
if test == 'MW':
# run Mann-Whitney U rank test
pass
elif test == 'ttest':
# run ttest
if ave_reps:
# average across replicates
p_value = ttest_rel(y, x, axis=1)[1]
else:
# average across all values
p_value = ttest_rel(y, x)[1]
return p_value
else:
raise ValueError(f'Test "{test}" not recognized')


def getFDR(p_values, method='fdr_bh'):
"""
Calculate FDR.
"""
# fill na with 1
p_values[np.isnan(p_values)] = 1
# Calculate the adjusted p-values using the Benjamini-Hochberg method
if p_values is None:
raise ValueError('p_values is None')
_, adj_p_values, _, _ = multipletests(p_values, alpha=0.05, method='fdr_bh')

return adj_p_values

0 comments on commit 7fb521c

Please sign in to comment.