Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Infe prof and same diff #23

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
88d6df8
documentation
AntonioVitoMastromarino Dec 6, 2023
09353a5
Merge branch 'infe_prof_and_same_diff' of github.com:SABS-R3-Epidemio…
AntonioVitoMastromarino Dec 6, 2023
d4ee7c4
test_re_scaler
AntonioVitoMastromarino Jan 17, 2024
6373775
notation_consistence
AntonioVitoMastromarino Jan 17, 2024
ce4c145
added smoothing feature
AntonioVitoMastromarino Jan 31, 2024
3556ca0
fixed something
AntonioVitoMastromarino Jan 31, 2024
59b474e
Merge branch 'dev' into infe_prof_and_same_diff
AntonioVitoMastromarino Jan 31, 2024
78800dc
fixed something
AntonioVitoMastromarino Jan 31, 2024
85e9f4a
fixed something
AntonioVitoMastromarino Jan 31, 2024
e7184f9
fixed style
AntonioVitoMastromarino Jan 31, 2024
069f7dc
fixed style
AntonioVitoMastromarino Jan 31, 2024
8d43c7e
fixed style
AntonioVitoMastromarino Jan 31, 2024
f1ee3df
fixed style
AntonioVitoMastromarino Jan 31, 2024
8ec5339
fixed style
AntonioVitoMastromarino Jan 31, 2024
e56be52
fixed style
AntonioVitoMastromarino Jan 31, 2024
1b6e120
fixed style
AntonioVitoMastromarino Jan 31, 2024
57bb852
fixed style
AntonioVitoMastromarino Jan 31, 2024
a88e2b2
fixed style
AntonioVitoMastromarino Jan 31, 2024
18799ff
fixed style
AntonioVitoMastromarino Jan 31, 2024
a8fbd51
Merge branch 'dev' into infe_prof_and_same_diff
YunliQi Feb 14, 2024
0596734
Tests fix
YunliQi Feb 14, 2024
b9d5c61
smoothing added
AntonioVitoMastromarino Feb 26, 2024
77d220b
smoothing added
AntonioVitoMastromarino Feb 26, 2024
592cadc
smoothing added
AntonioVitoMastromarino Feb 27, 2024
7fa8766
smoothing added
AntonioVitoMastromarino Feb 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions epios/post_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ def _wrapper_Region_AgeRegion(self, sampling_method, sample_size, time_sample, n
people = sampler_class.sample(sample_size=sample_size)

# Get the results of people sampled
X = SamplingMaker(non_resp_rate=non_resp_rate, keeptrack=True, TheData=self.time_data,
X = SamplingMaker(non_resp_rate=non_resp_rate, keep_track=True, data=self.time_data,
false_positive=0, false_negative=0, threshold=None)
ite = X([time_sample[i]], people)

Expand Down Expand Up @@ -621,7 +621,7 @@ def _wrapper_Region_AgeRegion(self, sampling_method, sample_size, time_sample, n
people = sampler_class.sample(sample_size=sample_size)

# Get results of each people sampled
X = SamplingMaker(non_resp_rate=0, keeptrack=True, TheData=self.time_data,
X = SamplingMaker(non_resp_rate=0, keep_track=True, data=self.time_data,
false_positive=0, false_negative=0, threshold=None)
ite = X(time_sample, people)

Expand Down Expand Up @@ -649,7 +649,7 @@ def _wrapper_Region_AgeRegion(self, sampling_method, sample_size, time_sample, n
people = sampler_class.sample(sample_size=sample_size)

# Get the results of each people sampled
X = SamplingMaker(non_resp_rate=0, keeptrack=True, TheData=self.time_data,
X = SamplingMaker(non_resp_rate=0, keep_track=True, data=self.time_data,
false_positive=0, false_negative=0, threshold=None)
ite = X([time_sample[i]], people)

Expand Down Expand Up @@ -707,7 +707,7 @@ def _wrapper_Age_Base(self, sampling_method, sample_size, time_sample,
people = sampler_class.sample(sample_size=sample_size)

# Get results of each people sampled
X = SamplingMaker(non_resp_rate=0, keeptrack=True, TheData=self.time_data,
X = SamplingMaker(non_resp_rate=0, keep_track=True, data=self.time_data,
false_positive=0, false_negative=0, threshold=None)
ite = X(time_sample, people)

Expand All @@ -732,7 +732,7 @@ def _wrapper_Age_Base(self, sampling_method, sample_size, time_sample,
people = sampler_class.sample(sample_size=sample_size)

# Get the results of each people sampled
X = SamplingMaker(non_resp_rate=0, keeptrack=True, TheData=self.time_data,
X = SamplingMaker(non_resp_rate=0, keep_track=True, data=self.time_data,
false_positive=0, false_negative=0, threshold=None)
ite = X([time_sample[i]], people)

Expand Down
75 changes: 75 additions & 0 deletions epios/re_scaler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import numpy
from numpy import array


class ReScaler():

'''
Class to de-bias the observed prevalence by affine transformation. The day
when the nonresprate will depend on the status this class will take this
into account. Observation is meant to be a list, each entry being the rate of
positive tests. If smoothin has been set, then obsetvation is a list of lists.

If smoothing has been set, then the estimates are further manipulated
the problem is, given arrays T[k],Y[k] for k in range(n),
and given a function w then one wants to minimize the cost

m0[n], m1[n] = argmin(sum(w[n,k] * (Y[k] - m0 - m1 * T[k])**2 for k in range(n + 1)))
w[n,k] = obs[n,k] * (1 - obs[n,k]) * num[n,k] * smoothing(T[n] - T[k]).

This corresponds to solve
m0 * a + m1 * b = A
m1 * b + m2 * c = B
with
a = sum(w(T[n] - T[k]) for k in range(n + 1))
b = sum(w(T[n] - T[k]) * T[k] for k in range(n + 1))
c = sum(w(T[n] - T[k]) * T[k]**2 for k in range(n + 1))
A = sum(w(T[n] - T[k]) * Y[k] for k in range(n + 1))
B = sum(w(T[n] - T[k]) * Y[k] * T[k] for k in range(n + 1))
that is
m0 = (A * c - B * b) / (a * c - b**2)
m1 = (B * a - A * b) / (a * c - b**2).

This is a weighted least square difference probrem where the solution
is a line approximating the prevalence of the infection focusing
on the more recent estimates, that are more reliable. However it
is not clear which smoothing could be better for this purpose.
As well the resulting estimate can be negative, which might be undesirable.

'''

def __init__(self, false_positive=0, false_negative=0, smoothing=None):
if false_negative + false_positive != 1:
self.false_positive = false_positive
self.false_negative = false_negative
self.smoothing = smoothing
else:
raise Exception('useless test')

def __call__(self, observation, times=None, tested=None):
if self.smoothing is None:
estimates = (array(observation) - self.false_positive) / (1 - self.false_negative - self.false_positive)
return estimates
elif times is None:
raise Exception('please insert times of sampling')
elif tested is None:
raise Exception('please insert number of sampled')
else:
smooth_estimate = []
for n, (obs, num) in enumerate(zip(observation, tested)):
estimates = (array(obs) - self.false_positive) / (1 - self.false_negative - self.false_positive)
temp = array([self.smoothing(times[n] - times[k]) for k in range(n + 1)], dtype=numpy.double)
temp *= array(obs) * (1 - array(obs)) * array(num)
try:
a = temp.sum()
b = (temp * times[0: n + 1]).sum()
c = (temp * (times[0: n + 1]**2)).sum()
assert a * c != b**2
A = (temp * estimates[0: n + 1]).sum()
B = (temp * times[0: n + 1] * estimates[0: n + 1]).sum()
m0 = (A * c - B * b) / (a * c - b**2)
m1 = (B * a - A * b) / (a * c - b**2)
smooth_estimate.append(m0 + m1 * times[n])
except:
smooth_estimate.append(estimates[n])
return array(smooth_estimate)
131 changes: 90 additions & 41 deletions epios/sampling_maker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,108 @@


class SamplingMaker():
'''Class to return the results of sampling
'''
Class to return the results of sampling

Parameters:
-----------
non_resp_rate : float, between 0 and 1
The probability that the result of a test is 'NonResponder',
despite infectious status and viral load.

Default is zero.
threshold : float or None
If the viral load is higher then the threshold,
then the result of the test will be positive,
otherwise it will be negative.

non_resp_rate : float, between 0 and 1
The probability of a person that do not respond
keeptrack : bool
Whether or not to change people sampled at each time point
TheData : pandas.DataFrame
The infection data of the population at different time points
false_positive : float, between 0 and 1
The possibility of a normal person to get a positive test result
false_negative : float, between 0 and 1
The possibility of a infected person to get a negative test result
threshold : NoneType
(Tbh, I also do not know what is this for)
Default is None (see recognised below)
false_positive : float, between 0 and 1
If the result is supposed to be negative,
then it will be positive with probability false_positive.

Default is zero.
false_negative : float, between 0 and 1
If the result is supposed to be positive,
then it will be positive with probability false_negative.

Default is zero.
keep_track : bool
If this is True, the same group of people is tested at each timestep.
Otherwise (default), at each timestep a new group of peaople is selected for testing.
data : pandas.DataFrame
index is the list of times the simulation ran.
columns is the list of IDs of the entire populations.
If threshold is None this contains the infectious statuses of the entire population.
Otherwise this contains the viral loads of the entire population.
'''

def __init__(self, non_resp_rate=0, keeptrack=False, TheData=None,
def __init__(self, non_resp_rate=0, keep_track=False, data=None,
false_positive=0, false_negative=0, threshold=None):
self.recognised = [3, 4, 5, 6, 7, 8]
self.non_resp_rate = non_resp_rate
self.keeptrack = keeptrack
self.TheData = TheData
self.recognised = [3, 4, 5, 6, 7, 8]
self.threshold = threshold
self.false_positive = false_positive
self.false_negative = false_negative
self.threshold = threshold
self.keep_track = keep_track
self.data = data

def __call__(self, sampling_times, people, post_proc = False):

'''
Method to return the results for all the planned tests

Inputs:
-------
sampling_times : list
List of the planned times for tests in the same format as data.index.
people : list
If keep_track is True this is a list of IDs in the same
format as columns. Otherwise this is a list of the same
length as sampling_times. In this case all elements are
lists of IDs in the same format as columns.

Output:
A pandas.DataFrame if keep_track is True. Otherwise a list
of pandas.DataFrame objects and their averages and size if
not post_proc. Otherwise a list of lists of DataFrames and
their average and size. These are no strictly needed here.
'''

if self.keep_track:
STATUSES = self.data.loc[sampling_times, people]
return STATUSES.apply(lambda x: list(map(self._testresult, x)))
else:
times_people = zip(sampling_times, people)
STATUSES = map(lambda t: self.data.loc[[t[0]], t[1]], times_people)
res = list(map(lambda x: x.apply(lambda x: list(map(self._testresult, x))), STATUSES))
# computing of the lists observ and number can be non-necessary in this place
if post_proc:
result = []
observ = []
number = []
temp = []
for x in res:
for n, y in enumerate(temp):
temp[n] = y.drop(columns = x.columns, errors = 'ignore')
temp.append(x)
result.append(temp.copy())
observ.append(list(map(lambda x: x.mean().mean(), temp)))
number.append(list(map(lambda x: len(x.columns), temp)))
else:
observ = list(map(lambda x: x.mean().mean(), res))
number = list(map(lambda x: len(x.columns), res))
return result, (observ, number)

def _testresult(self, load):
'''
Method to return the result for one test

If threshold is None, then load is the infectiousness status of the testes person
Otherwise, it is the viral load of the tested person.
Possible outputs are 'NonResponder', 'Positive', 'Negative'.
'''

def testresult(self, load):
if bool(binomial(1, self.non_resp_rate)):
return 'NonResponder'
if self.threshold is None:
Expand All @@ -49,25 +120,3 @@ def testresult(self, load):
return 'Positive'
else:
return 'Negative'

def __call__(self, sampling_times, people):
'''
This will return the test result for samples provided

Parameters:
-----------

sampling_times : list
A list of time points to sample
people : list
A list of ID of people sampled

'''
if self.keeptrack:
STATUSES = self.TheData.loc[sampling_times, people]
return STATUSES.apply(lambda x: list(map(self.testresult, x)))
else:
times_people = zip(sampling_times, people)
STATUSES = map(lambda t: self.TheData.loc[[t[0]], t[1]], times_people)
RESULTS = map(lambda x: x.apply(lambda x: list(map(self.testresult, x))), STATUSES)
return list(RESULTS)
26 changes: 26 additions & 0 deletions epios/tests/test_re_scaler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from unittest import TestCase
from numpy.random import rand
from numpy import array
from epios.re_scaler import ReScaler


class TestRS(TestCase):

def test_call(self):
x = rand()
try:
ReScaler(false_positive=x, false_negative=1 - x)
raise Exception('shall not work')
except:
self.assertEqual(ReScaler()(x), x)
self.assertEqual(ReScaler(false_positive=1, false_negative=1)(x), 1 - x)

def test_smooth(self):
x = [[1.0], [1.0, 2.0], [1.0, 2.0, 3.0]]
with self.assertRaises(Exception):
ReScaler(smoothing=lambda x: 1)(x, tested=[[1],[1,1],[1,1,1]])
with self.assertRaises(Exception):
ReScaler(smoothing=lambda x: 1)(x, times=array([0.0, 1.0, 2.0]))
self.assertEqual(ReScaler(smoothing=lambda x: 1)(x, times=array([0.0, 1.0, 2.0]), tested=[[1],[1,1],[1,1,1]])[0], 1.0)
self.assertEqual(ReScaler(smoothing=lambda x: 1)(x, times=array([0.0, 1.0, 2.0]), tested=[[1],[1,1],[1,1,1]])[1], 2.0)
self.assertEqual(ReScaler(smoothing=lambda x: 1)(x, times=array([0.0, 1.0, 2.0]), tested=[[1],[1,1],[1,1,1]])[2], 3.0)
46 changes: 26 additions & 20 deletions epios/tests/test_sampling_maker.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,48 @@
import pandas as pd
from unittest import TestCase
from sampling_maker import SamplingMaker

from numpy import array

class TestSM(TestCase):

def test_positive(self):
self.assertEqual(SamplingMaker(threshold=1).testresult(1.1), 'Positive')
self.assertEqual(SamplingMaker(threshold=1)._testresult(1.1), 'Positive')
for a in [3, 4, 5, 6, 7, 8]:
self.assertEqual(SamplingMaker().testresult(a), 'Positive')
self.assertEqual(SamplingMaker()._testresult(a), 'Positive')

def test_negative(self):
self.assertEqual(SamplingMaker(threshold=1).testresult(0.9), 'Negative')
self.assertEqual(SamplingMaker(threshold=1)._testresult(0.9), 'Negative')
for a in [1, 2, 9, 10, 11]:
self.assertEqual(SamplingMaker().testresult(a), 'Negative')
self.assertEqual(SamplingMaker()._testresult(a), 'Negative')

def test_false_positive(self):
self.assertEqual(SamplingMaker(threshold=1, false_positive=1).testresult(0.9), 'Positive')
self.assertEqual(SamplingMaker(threshold=1, false_positive=1)._testresult(0.9), 'Positive')
for a in [1, 2, 9, 10, 11]:
self.assertEqual(SamplingMaker(false_positive=1).testresult(a), 'Positive')
self.assertEqual(SamplingMaker(false_positive=1)._testresult(a), 'Positive')

def test_false_negative(self):
self.assertEqual(SamplingMaker(threshold=1, false_negative=1).testresult(1.1), 'Negative')
self.assertEqual(SamplingMaker(threshold=1, false_negative=1)._testresult(1.1), 'Negative')
for a in [3, 4, 5, 6, 7, 8]:
self.assertEqual(SamplingMaker(false_negative=1).testresult(a), 'Negative')
self.assertEqual(SamplingMaker(false_negative=1)._testresult(a), 'Negative')

def test_nonresponders(self):
self.assertEqual(SamplingMaker(non_resp_rate=1).testresult(None), 'NonResponder')
self.assertEqual(SamplingMaker(non_resp_rate=1)._testresult(None), 'NonResponder')

def test__call__(self):
t = [0, 2]
X = SamplingMaker(keeptrack=True, threshold=0.5, TheData=pd.DataFrame({0: [0, 0, 1],
1: [1, 1, 0], 2: [2, 2, 2]}))
self.assertFalse((X(t, [0, 1]) != pd.DataFrame({0: ['Negative', 'Positive'],
1: ['Positive', 'Negative']}, index=[0, 2])).any(axis=None))
X = SamplingMaker(TheData=pd.DataFrame({0: [1, 1, 3],
1: [3, 3, 1], 2: [2, 2, 2]}))
self.assertFalse((X(t, [[0, 1], [0, 1]])[0] != pd.DataFrame({0: 'Negative',
1: 'Positive'}, index=[0])).any(axis=None))
self.assertFalse((X(t, [[0, 1], [0, 1]])[1] != pd.DataFrame({0: 'Positive',
1: 'Negative'}, index=[2])).any(axis=None))
X = SamplingMaker(keep_track=True, threshold=0.5, data=pd.DataFrame({0: [0, 0, 1],
1: [1, 1, 0],
2: [2, 2, 2]}))
self.assertTrue((X(t, [0, 1]) == pd.DataFrame({0: ['Negative', 'Positive'],
1: ['Positive', 'Negative']}, index=[0, 2])).all(axis=None))
X = SamplingMaker(data=pd.DataFrame({0: [1, 1, 3],
1: [3, 3, 1],
2: [2, 2, 2]}))
self.assertTrue((X(t, [[0, 1], [0, 1]])[0] == pd.DataFrame({0: ['Negative'],
1: ['Positive']}, index=[0])).all(axis=None))
self.assertTrue((X(t, [[0, 1], [0, 1]])[1] == pd.DataFrame({0: ['Positive'],
1: ['Negative']}, index=[2])).all(axis=None))
result, number = X(t, [[0, 1], [1]], post_proc=True)
self.assertEqual(number[0][0], 2)
self.assertEqual(number[1][0], 1)
self.assertEqual(number[1][1], 1)
Loading