Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel py3 #43

Open
wants to merge 41 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
e40dc69
parallelized synthpop and updated for python 3
mxndrwgrdnr Apr 4, 2018
e2b946d
cleaned up parallel processing code
mxndrwgrdnr Apr 4, 2018
6caa811
added tqdm to travis config
mxndrwgrdnr Apr 4, 2018
014b9d4
more packages for travis config
mxndrwgrdnr Apr 4, 2018
1d2aea6
python 3 specifications for tests
mxndrwgrdnr Apr 4, 2018
1a7de6f
more python3 fixes for tests
mxndrwgrdnr Apr 4, 2018
cae7992
update to ipu test to account for the fact that max_iterations no lon…
mxndrwgrdnr Apr 4, 2018
e2b8b2a
fixed ipu test for py3
mxndrwgrdnr Apr 4, 2018
49138a2
pep8 fix
mxndrwgrdnr Apr 4, 2018
dd648f4
script to generate 9 county bay area population in parallel
mxndrwgrdnr Apr 9, 2018
af117fd
script to generate 9 county bay area population in parallel
mxndrwgrdnr Apr 9, 2018
a8a2876
fixed relative imports for tests
mxndrwgrdnr Jun 27, 2018
feaebc4
replaced pep8 with pycodestyle per pep8 UserWarning
mxndrwgrdnr Jun 27, 2018
87a8929
travis fixes
mxndrwgrdnr Jun 27, 2018
4072946
pycodestyle does not like bare 'except' clauses
mxndrwgrdnr Jun 27, 2018
b446b96
this might take too long for travis. let's see
mxndrwgrdnr Jun 27, 2018
8416521
changed test county for starter2 to something smaller bc travis is ti…
mxndrwgrdnr Jun 27, 2018
046166c
edited travis config to try and fix the issue
mxndrwgrdnr Jun 27, 2018
d97deeb
edited travis config to try and fix the issue
mxndrwgrdnr Jun 27, 2018
8ab5cc7
still trying to fix memory error in travis
mxndrwgrdnr Jun 27, 2018
8c94652
still trying to fix memory error in travis
mxndrwgrdnr Jun 27, 2018
3783b02
still trying to fix memory error in travis
mxndrwgrdnr Jun 27, 2018
8d1cc00
added unit test for census cache
mxndrwgrdnr Jun 27, 2018
39601fb
added test for parallel synthesizer
mxndrwgrdnr Jun 28, 2018
2f0e8b5
fixed indentation
mxndrwgrdnr Jun 28, 2018
32c3e75
Merge branch 'master' into parallel_py3
mxndrwgrdnr Jun 28, 2018
82a8f67
relaxed fit quality requirements for tests
mxndrwgrdnr Jun 28, 2018
fd70c64
Merge branch 'parallel_py3' of github.com:UDST/synthpop into parallel…
mxndrwgrdnr Jun 28, 2018
9922683
retain runtime error for max_iterations in IPU and add ignore_max_ite…
mxndrwgrdnr Jul 3, 2018
de52355
increase wait time for travis build
mxndrwgrdnr Jul 3, 2018
97c793b
porting latest changes from rome to oslo
mxndrwgrdnr Nov 6, 2018
6acc0b8
oslo back to rome
mxndrwgrdnr Nov 7, 2018
0c10761
use new parallel method in tests
mxndrwgrdnr Nov 7, 2018
6098f17
updated travis yaml to use specific version of tqdm that should hopef…
mxndrwgrdnr Nov 7, 2018
315bdaa
fixed style errors should pass tests now
mxndrwgrdnr Nov 7, 2018
c180116
fixed style errors should pass tests now
mxndrwgrdnr Nov 7, 2018
5ab4799
starter2 parallel test
cvanoli Jul 15, 2019
6b9b5e2
Add ignore_max_iterations var to synthesize_all functions
cvanoli Aug 6, 2019
0631425
update setup.py
cvanoli Nov 26, 2019
8c9e9ac
Correct the deleted acsyear missing in query function, add h_acs self
cvanoli Jan 27, 2020
d3a3a53
Merge branch 'parallel_py3' of https://github.com/UDST/synthpop into …
cvanoli Jan 27, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
language: python
sudo: false
python:
- '2.7'
- '3.5'
Expand All @@ -13,14 +12,13 @@ install:
- conda config --set always_yes yes --set changeps1 no
- conda update -q conda
- conda info -a
- |
conda create -q -n test-environment python=$TRAVIS_PYTHON_VERSION pip numexpr numpy pandas scipy pytest
- conda create -q -n test-environment python=$TRAVIS_PYTHON_VERSION pip numexpr numpy pandas scipy pytest tqdm==4.24 futures
- source activate test-environment
- pip install pytest-cov coveralls pep8
- pip install pytest-cov coveralls pycodestyle
- pip install .
script:
- pep8 synthpop
- py.test --cov synthpop --cov-report term-missing
- pycodestyle synthpop
- travis_wait 20 py.test --cov synthpop --cov-report term-missing
after_success:
- coveralls
notifications:
Expand Down
134 changes: 134 additions & 0 deletions scripts/sfbay_synth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import os
import pandas as pd
from glob import glob
import warnings
from datetime import date
from multiprocessing import freeze_support

from synthpop.census_helpers import Census
from synthpop.recipes.starter2 import Starter
from synthpop.synthesizer import synthesize_all_in_parallel, \
synthesize_all_in_parallel_mp, \
synthesize_all_in_parallel_full

warnings.filterwarnings('ignore')

today = str(date.today())

counties = [
# "Alpine County",
# "Napa County",
"Santa Clara County",
# "Solano County",
# "San Mateo County",
# "Marin County",
# "San Francisco County",
# "Sonoma County",
# "Contra Costa County",
# "Alameda County"
]

if __name__ == '__main__':

freeze_support()

for county in counties:
print('#' * 80)
print(' Processing {0} '.format(county).center(80, '#'))
c = Census(os.environ["CENSUS"])
starter = Starter(os.environ["CENSUS"], "CA", county)
# county_dfs = synthesize_all(starter, num_geogs=1)
county_dfs = synthesize_all_in_parallel_full(
starter,
# max_workers=20,
# num_geogs=100
)
print('#' * 80)

# hh_all = county_dfs[0]
# p_all = county_dfs[1]
# fits_all = county_dfs[2]

# hh_all.index.name = 'household_id'
# p_all.index.name = 'person_id'
# p_all.rename(columns={'hh_id': 'household_id'}, inplace=True)

# hh_all['age_of_head'] = p_all[p_all.RELP == 0].groupby(
# 'household_id').AGEP.max()
# hh_all['race_of_head'] = p_all[p_all.RELP == 0].groupby(
# 'household_id').RAC1P.max()
# hh_all['workers'] = p_all[p_all.ESR.isin([1, 2, 4, 5])].groupby(
# 'household_id').size()
# hh_all['children'] = p_all[p_all.AGEP < 18].groupby(
# 'household_id').size()
# hh_all['tenure'] = 2
# hh_all.tenure[hh_all.TEN < 3] = 1 # tenure coded 1:own, 2:rent
# hh_all['recent_mover'] = 0
# hh_all.recent_mover[hh_all.MV < 4] = 1 # 1 if recent mover
# hh_all = hh_all.rename(columns={
# 'VEH': 'cars', 'HINCP': 'income', 'NP': 'persons',
# 'BLD': 'building_type'})

# for col in hh_all.columns:
# if col not in [
# 'persons', 'income', 'age_of_head', 'race_of_head',
# 'hispanic_head', 'workers', 'children', 'cars', 'tenure',
# 'recent_mover', 'building_type', 'serialno', 'state',
# 'county', 'tract', 'block group']:
# del hh_all[col]

# p_all.rename(columns={
# 'AGEP': 'age', 'RAC1P': 'race_id', 'NP': 'persons',
# 'SPORDER': 'member_id', 'HISP': 'hispanic', 'RELP': 'relate',
# 'SEX': 'sex', 'WKHP': 'hours', 'SCHL': 'edu', 'PERNP': 'earning',
# 'JWTR': 'primary_commute_mode'},
# inplace=True)
# p_all['student'] = 0
# p_all.loc[p_all.SCH.isin([2, 3]), 'student'] = 1
# p_all['work_at_home'] = 0
# p_all.loc[p_all.primary_commute_mode == 11, 'work_at_home'] = 1
# p_all['worker'] = 0
# p_all.loc[p_all.ESR.isin([1, 2, 4, 5]), 'worker'] = 1
# p_all['self_employed'] = 0
# p_all.loc[p_all['COW'].isin([6, 7]), 'self_employed'] = 1

# for col in p_all.columns:
# if col not in ['household_id', 'member_id',
# 'relate', 'age', 'sex', 'race_id', 'hispanic',
# 'student', 'worker', 'hours',
# 'work_at_home', 'edu', 'earning', 'self_employed']:
# del p_all[col]

# hh_all.to_csv('{0}_hh_synth_parallel_{1}.csv'.format(
# county.replace(' ', '_'), today))
# p_all.to_csv('{0}_p_synth_parallel_{1}.csv'.format(
# county.replace(' ', '_'), today))

# # concat all the county dfs
# hh_fnames = glob('*hh*.csv')

# p_df_list = []
# hh_df_list = []
# hh_index_start = 0
# p_index_start = 0

# for hh_file in hh_fnames:
# county = hh_file.split('_hh')[0]
# hh_df = pd.read_csv(hh_file, index_col='household_id', header=0)
# p_df = pd.read_csv(
# glob(county + '_p*.csv')[0], index_col='person_id', header=0)
# print(county + ': {0}'.format(str(hh_df.iloc[0].county)))
# hh_df.index += hh_index_start
# p_df.household_id += hh_index_start
# p_df.index += p_index_start
# hh_df_list.append(hh_df)
# p_df_list.append(p_df)
# hh_index_start = hh_df.index.values[-1] + 1
# p_index_start = p_df.index.values[-1] + 1

# hh_all = pd.concat(hh_df_list)
# p_all = pd.concat(p_df_list)
# print(len(hh_all.iloc[hh_all.index.duplicated(keep=False)]))
# print(len(p_all.iloc[p_all.index.duplicated(keep=False)]))
# p_all.to_csv('sfbay_persons_2018_09_27.csv')
# hh_all.to_csv('sfbay_households_2018_09_27.csv')
6 changes: 4 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
'Development Status :: 4 - Beta',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6'
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7'
],
packages=find_packages(exclude=['*.tests']),
install_requires=[
Expand All @@ -24,6 +25,7 @@
'numpy>=1.8.0',
'pandas>=0.15.0',
'scipy>=0.13.3',
'us>=0.8'
'us>=0.8',
'tqdm>=4.23'
]
)
6 changes: 3 additions & 3 deletions synthpop/census_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def chunks(l, n):
""" Yield successive n-sized chunks from l.
"""
for i in range(0, len(l), n):
yield l[i:i+n]
yield l[i: i + n]

for census_column_batch in chunks(census_columns, 45):
census_column_batch = list(census_column_batch)
Expand Down Expand Up @@ -200,12 +200,12 @@ def try_fips_lookup(self, state, county=None):
if county is None:
try:
return getattr(us.states, state).fips
except:
except (KeyError, NameError, ValueError, AttributeError, IndexError):
pass
return state

try:
return df.loc[(state, county)]
except:
except (KeyError, NameError, ValueError, AttributeError, IndexError):
pass
return state, county
2 changes: 1 addition & 1 deletion synthpop/ipf/test/test_ipf.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import pytest
from pandas.util import testing as pdt

from .. import ipf
from synthpop.ipf import ipf


def test_trivial_ipf():
Expand Down
20 changes: 15 additions & 5 deletions synthpop/ipu/ipu.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import numpy as np
import pandas as pd
import warnings


def _drop_zeros(df):
Expand Down Expand Up @@ -99,7 +100,7 @@ def iter_columns(self):
The returned column contains only the non-zero elements.

"""
return list(self._everything.values())
return self._everything.values()

def get_column(self, key):
"""
Expand Down Expand Up @@ -193,7 +194,7 @@ def _update_weights(column, weights, constraint):

def household_weights(
household_freq, person_freq, household_constraints, person_constraints,
convergence=1e-4, max_iterations=20000):
convergence=1e-4, max_iterations=20000, ignore_max_iters=False):
"""
Calculate the household weights that best match household and
person level attributes.
Expand Down Expand Up @@ -259,9 +260,18 @@ def household_weights(
iterations += 1

if iterations > max_iterations:
raise RuntimeError(
'Maximum number of iterations reached during IPU: {}'.format(
max_iterations))

if ignore_max_iters:
warnings.warn(
'Maximum number of iterations reached '
'during IPU: {}'.format(max_iterations), UserWarning)
return (
pd.Series(best_weights, index=household_freq.index),
best_fit_qual, iterations)
else:
raise RuntimeError(
'Maximum number of iterations reached '
'during IPU: {}'.format(max_iterations))

return (
pd.Series(best_weights, index=household_freq.index),
Expand Down
13 changes: 10 additions & 3 deletions synthpop/ipu/test/test_ipu.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
import numpy.testing as npt
import pandas as pd
import pytest
from pandas.util import testing as pdt

from .. import ipu
from synthpop.ipu import ipu


@pytest.fixture(scope='module')
Expand Down Expand Up @@ -169,10 +168,18 @@ def test_household_weights(
def test_household_weights_max_iter(
household_freqs, person_freqs, household_constraints,
person_constraints):

with pytest.warns(UserWarning):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idem comment to ipu.py line263.

ipu.household_weights(
household_freqs, person_freqs, household_constraints,
person_constraints, convergence=1e-7, max_iterations=10,
ignore_max_iters=True)

with pytest.raises(RuntimeError):
ipu.household_weights(
household_freqs, person_freqs, household_constraints,
person_constraints, convergence=1e-7, max_iterations=10)
person_constraints, convergence=1e-7, max_iterations=10,
ignore_max_iters=False)


def test_FrequencyAndConstraints(freq_wrap):
Expand Down
33 changes: 19 additions & 14 deletions synthpop/recipes/starter2.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ class Starter:
Returns
-------
household_marginals : DataFrame
Marginals per block group for the household data (from ACS 5-year estimates)
Marginals per block group for the household
data (from ACS 5-year estimates)
person_marginals : DataFrame
Marginals per block group for the person data (from ACS 5-year estimates)
Marginals per block group for the person
data (from ACS 5-year estimates)
household_jointdist : DataFrame
joint distributions for the households (from PUMS 2010-2000), one joint
distribution for each PUMA (one row per PUMA)
Expand All @@ -57,7 +59,8 @@ def __init__(self, key, state, county, tract=None, acsyear=2016):
income_columns = ['B19001_0%02dE' % i for i in range(1, 18)]
vehicle_columns = ['B08201_0%02dE' % i for i in range(1, 7)]
workers_columns = ['B08202_0%02dE' % i for i in range(1, 6)]
presence_of_children_columns = ['B11005_001E', 'B11005_002E', 'B11005_011E']
presence_of_children_columns = [
'B11005_001E', 'B11005_002E', 'B11005_011E']
presence_of_seniors_columns = ['B11007_002E', 'B11007_007E']
tenure_mover_columns = ['B25038_0%02dE' % i for i in range(1, 16)]
block_group_columns = (
Expand Down Expand Up @@ -137,7 +140,8 @@ def __init__(self, key, state, county, tract=None, acsyear=2016):
female_age_columns = ['B01001_0%02dE' % i for i in range(27, 50)]
all_columns = population + sex + race + male_age_columns + \
female_age_columns + hh_population + hispanic
p_acs = c.block_group_query(all_columns, state, county, tract=tract, year=acsyear)
p_acs = c.block_group_query(
all_columns, state, county, tract=tract, year=acsyear)
self.p_acs = p_acs
self.p_acs_cat = cat.categorize(p_acs, {
("person_age", "19 and under"):
Expand All @@ -162,11 +166,11 @@ def __init__(self, key, state, county, tract=None, acsyear=2016):
"B01001_043E + B01001_044E + B01001_045E + "
"B01001_046E + B01001_047E + B01001_048E + "
"B01001_049E) * B11002_001E*1.0/B01001_001E",
("race", "white"): "(B02001_002E) * B11002_001E*1.0/B01001_001E",
("race", "black"): "(B02001_003E) * B11002_001E*1.0/B01001_001E",
("race", "asian"): "(B02001_005E) * B11002_001E*1.0/B01001_001E",
("race", "other"): "(B02001_004E + B02001_006E + B02001_007E + "
"B02001_008E) * B11002_001E*1.0/B01001_001E",
("race", "white"): "(B02001_002E) * B11002_001E*1.0/B01001_001E",
("race", "black"): "(B02001_003E) * B11002_001E*1.0/B01001_001E",
("race", "asian"): "(B02001_005E) * B11002_001E*1.0/B01001_001E",
("race", "other"): "(B02001_004E + B02001_006E + B02001_007E + "
"B02001_008E) * B11002_001E*1.0/B01001_001E",
("person_sex", "male"):
"(B01001_002E) * B11002_001E*1.0/B01001_001E",
("person_sex", "female"):
Expand All @@ -177,13 +181,14 @@ def __init__(self, key, state, county, tract=None, acsyear=2016):
"(B03003_002E) * B11002_001E*1.0/B01001_001E",
}, index_cols=['state', 'county', 'tract', 'block group'])

# Put the needed PUMS variables here. These are also the PUMS variables
# that will be in the outputted synthetic population
# Put the needed PUMS variables here. These are also the PUMS
# variables that will be in the outputted synthetic population
self.h_pums_cols = ('serialno', 'PUMA00', 'PUMA10', 'RT', 'NP', 'TYPE',
'R65', 'HINCP', 'VEH', 'MV', 'TEN', 'BLD', 'R18')
self.p_pums_cols = ('serialno', 'PUMA00', 'PUMA10', 'RELP', 'AGEP',
'ESR', 'RAC1P', 'HISP', 'SEX', 'SPORDER',
'PERNP', 'SCHL', 'WKHP', 'JWTR', 'SCH')
self.p_pums_cols = (
'serialno', 'SPORDER', 'PUMA00', 'PUMA10', 'RELP', 'AGEP', 'ESR',
'SCHL', 'SCH', 'JWTR', 'PERNP', 'WKHP', 'RAC1P', 'HISP', 'SEX',
'COW')

def get_geography_name(self):
# this synthesis is at the block group level for most variables
Expand Down
14 changes: 11 additions & 3 deletions synthpop/recipes/tests/test_starter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import pytest
from ...synthesizer import *
from ..starter import Starter

from synthpop.synthesizer import *
from synthpop.recipes.starter import Starter
from synthpop.recipes.starter2 import Starter as Starter2


@pytest.fixture
Expand All @@ -9,5 +11,11 @@ def key():


def test_starter(key):
st = Starter(key, "CA", "Napa County")
st = Starter(key, "CA", "Alpine County")
# just run it for now
synthesize_all(st, num_geogs=1)


# no synthesizer bc it's too memory intensive for travis
def test_starter2(key):
Starter2(key, "CA", "Alpine County")
Loading