Skip to content

Commit

Permalink
Partly finished age_region tests
Browse files Browse the repository at this point in the history
  • Loading branch information
YunliQi committed Nov 25, 2023
1 parent 1bc73d0 commit 0f8fd3f
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 13 deletions.
36 changes: 24 additions & 12 deletions epios/age_region.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ def multinomial_draw(self, n: int, prob: list):
# The following block trasform the probability to a list of barriers between 0 and 1
# So we can use np.rand to generate a random number between 0 and 1 to compare with the barriers to determine which group it is
df = self.data
prob = np.array(prob)
if n > len(df):
raise ValueError('Sample size should not be greater than population size')

# The following code generate the cap for each age-region group, since there is a maximum the number of people in one age group in a region
# The cap list will have shape (number of region, number of age groups)
Expand Down Expand Up @@ -180,6 +183,8 @@ def multinomial_draw(self, n: int, prob: list):
if current_block[pos_region, pos_age] + 1 > cap_block[pos_region, pos_age]:
# reduce the corresponding prob to 0, and distribute its prob to the rest of blocks
prob_exceed = prob[pos_region, pos_age]
if prob_exceed == 1:
raise KeyError('Probability provided not supported for the sample size')
prob[pos_region, pos_age] = 0
prob = prob / (1 - prob_exceed)
prob = prob.reshape((1, -1))[0]
Expand All @@ -190,16 +195,19 @@ def multinomial_draw(self, n: int, prob: list):
threshold.append(threshold[-1] + prob[k - 1])
except:
threshold.append(0)
if threshold[-1] >= 1:
threshold.append(threshold[-1])
else:
threshold.append(1)
if len(threshold) > 0:
if threshold[-1] >= 1:
threshold.append(threshold[-1])
else:
threshold.append(1)
prob = prob.reshape((-1, len_age))

# Testing whether it hits age cap
if current_age[pos_age] + 1 > cap_age[0][pos_age]:
# Similarly, reduce all prob for this age group to 0, and re-distribute
prob_exceed = prob[:, pos_age].sum()
if prob_exceed == 1:
raise KeyError('Probability provided not supported for the sample size')
prob = np.delete(prob, pos_age, 1)
cap_block = np.delete(cap_block, pos_age, 1)
current_block = np.delete(current_block, pos_age, 1)
Expand All @@ -214,16 +222,19 @@ def multinomial_draw(self, n: int, prob: list):
threshold.append(threshold[-1] + prob[k - 1])
except:
threshold.append(0)
if threshold[-1] >= 1:
threshold.append(threshold[-1])
else:
threshold.append(1)
if len(threshold) > 0:
if threshold[-1] >= 1:
threshold.append(threshold[-1])
else:
threshold.append(1)
prob = prob.reshape((-1, len_age))

# Testing whether it hits region cap
if current_region[pos_region] + 1 > cap_region[0][pos_region]:
# Similar to the above
prob_exceed = prob[pos_region, :].sum()
if prob_exceed == 1:
raise KeyError('Probability provided not supported for the sample size')
prob = np.delete(prob, pos_region, 0)
cap_block = np.delete(cap_block, pos_region, 0)
current_block = np.delete(current_block, pos_region, 0)
Expand All @@ -237,10 +248,11 @@ def multinomial_draw(self, n: int, prob: list):
threshold.append(threshold[-1] + prob[k - 1])
except:
threshold.append(0)
if threshold[-1] >= 1:
threshold.append(threshold[-1])
else:
threshold.append(1)
if len(threshold) > 0:
if threshold[-1] >= 1:
threshold.append(threshold[-1])
else:
threshold.append(1)
prob = prob.reshape((-1, len_age))
break
return res, res_cap_block
Expand Down
75 changes: 75 additions & 0 deletions epios/tests/test_age_region.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import numpy as np
import pandas as pd
import unittest
from unittest import TestCase
from epios import DataProcess
from epios import Sampler
import os
from numpy.testing import assert_array_equal
# from pandas.testing import assert_frame_equal


class TestDataProcess(TestCase):

def setUp(self) -> None:
self.path = './testing_ageregion/'
try:
os.mkdir(self.path[2:-1])
except:
raise KeyError('Directory already exists, terminated not to overwrite anything!')
self.data = pd.DataFrame({'ID': ['0.0.0.0', '0.0.0.1', '0.0.1.0', '0.1.0.0', '0.2.0.0', '1.0.0.0'], 'age': [1, 81, 45, 33, 20, 60]})
self.processor = DataProcess(self.data)
self.processor.pre_process(path=self.path)

self.sampler = Sampler(geoinfo_path=self.path + 'microcells.csv', ageinfo_path=self.path + 'pop_dist.json', data_path=self.path + 'data.csv')

self.expected_age_dist = [1 / 6, 0.0, 0.0, 0.0, 1 / 6, 0.0, 1 / 6, 0.0, 0.0, 1 / 6, 0.0, 0.0, 1 / 6, 0.0, 0.0, 0.0, 1 / 6]
self.expected_region_dist = [5 / 6, 1 / 6]
# self.expected_json = [1 / 6, 0.0, 0.0, 0.0, 1 / 6, 0.0, 1 / 6, 0.0, 0.0, 1 / 6, 0.0, 0.0, 1 / 6, 0.0, 0.0, 0.0, 1 / 6]
# self.expected_df_microcell = pd.DataFrame({'cell': [0, 0, 0, 0, 1], 'microcell': [0, 0, 1, 2, 0], 'household': [0, 1, 0, 0, 0], 'Susceptible': [2, 1, 1, 1, 1]})
# self.expected_df_population = pd.DataFrame({'ID': ['0.0.0.0', '0.0.0.1', '0.0.1.0', '0.1.0.0', '0.2.0.0', '1.0.0.0'], 'age': [1, 81, 45, 33, 20, 60], 'cell': [0, 0, 0, 0, 0, 1], 'microcell': [0, 0, 0, 1, 2, 0], 'household': [0, 0, 1, 0, 0, 0]})

def test_get_age_dist(self):
self.assertEqual(self.sampler.get_age_dist(), self.expected_age_dist)

def test_get_region_dist(self):
self.assertEqual(self.sampler.get_region_dist(), self.expected_region_dist)

def test_bool_exceed(self):
self.assertEqual(self.sampler.bool_exceed(1, 0, 0, 2, 2, 2), False)
self.assertEqual(self.sampler.bool_exceed(0, 1, 0, 2, 2, 2), False)
self.assertEqual(self.sampler.bool_exceed(0, 0, 1, 2, 2, 2), False)
self.assertEqual(self.sampler.bool_exceed(0, 0, 0, 2, 2, 2), True)

def test_multinomial_draw(self):
np.random.seed(1)
age_dist = self.sampler.get_age_dist()
region_dist = self.sampler.get_region_dist()
ar_dist = np.array(age_dist) * np.array(region_dist).reshape((-1, 1))
ar_dist = ar_dist.reshape((1, -1))[0]
with self.assertRaises(ValueError):
self.sampler.multinomial_draw(len(self.sampler.data) + 1, ar_dist)

with self.assertRaises(KeyError):
self.sampler.multinomial_draw(len(self.sampler.data), [1] + [0] * (len(age_dist) * len(region_dist) - 1))

res, cap = self.sampler.multinomial_draw(len(self.sampler.data), ar_dist)
try:
assert_array_equal(res, np.array(cap).reshape((1, -1))[0])
except:
self.fail('not draw as expected')

def tearDown(self) -> None:
if os.path.exists(self.path):
if os.path.exists(self.path + 'pop_dist.json'):
os.remove(self.path + 'pop_dist.json')
if os.path.exists(self.path + 'microcells.csv'):
os.remove(self.path + 'microcells.csv')
if os.path.exists(self.path + 'data.csv'):
os.remove(self.path + 'data.csv')
os.rmdir(self.path)


if __name__ == '__main__':

unittest.main()
2 changes: 1 addition & 1 deletion epios/tests/test_data_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
class TestDataProcess(TestCase):

def setUp(self) -> None:
self.path = './testing_files/'
self.path = './testing_dataprocess/'
self.data = pd.DataFrame({'ID': ['0.0.0.0', '0.0.0.1', '0.0.1.0', '0.1.0.0', '0.2.0.0', '1.0.0.0'], 'age': [1, 81, 45, 33, 20, 60]})
self.processor = DataProcess(self.data)
self.expected_json = [1 / 6, 0.0, 0.0, 0.0, 1 / 6, 0.0, 1 / 6, 0.0, 0.0, 1 / 6, 0.0, 0.0, 1 / 6, 0.0, 0.0, 0.0, 1 / 6]
Expand Down

0 comments on commit 0f8fd3f

Please sign in to comment.