Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel py3 #43

Open
wants to merge 41 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
e40dc69
parallelized synthpop and updated for python 3
mxndrwgrdnr Apr 4, 2018
e2b946d
cleaned up parallel processing code
mxndrwgrdnr Apr 4, 2018
6caa811
added tqdm to travis config
mxndrwgrdnr Apr 4, 2018
014b9d4
more packages for travis config
mxndrwgrdnr Apr 4, 2018
1d2aea6
python 3 specifications for tests
mxndrwgrdnr Apr 4, 2018
1a7de6f
more python3 fixes for tests
mxndrwgrdnr Apr 4, 2018
cae7992
update to ipu test to account for the fact that max_iterations no lon…
mxndrwgrdnr Apr 4, 2018
e2b8b2a
fixed ipu test for py3
mxndrwgrdnr Apr 4, 2018
49138a2
pep8 fix
mxndrwgrdnr Apr 4, 2018
dd648f4
script to generate 9 county bay area population in parallel
mxndrwgrdnr Apr 9, 2018
af117fd
script to generate 9 county bay area population in parallel
mxndrwgrdnr Apr 9, 2018
a8a2876
fixed relative imports for tests
mxndrwgrdnr Jun 27, 2018
feaebc4
replaced pep8 with pycodestyle per pep8 UserWarning
mxndrwgrdnr Jun 27, 2018
87a8929
travis fixes
mxndrwgrdnr Jun 27, 2018
4072946
pycodestyle does not like bare 'except' clauses
mxndrwgrdnr Jun 27, 2018
b446b96
this might take too long for travis. let's see
mxndrwgrdnr Jun 27, 2018
8416521
changed test county for starter2 to something smaller bc travis is ti…
mxndrwgrdnr Jun 27, 2018
046166c
edited travis config to try and fix the issue
mxndrwgrdnr Jun 27, 2018
d97deeb
edited travis config to try and fix the issue
mxndrwgrdnr Jun 27, 2018
8ab5cc7
still trying to fix memory error in travis
mxndrwgrdnr Jun 27, 2018
8c94652
still trying to fix memory error in travis
mxndrwgrdnr Jun 27, 2018
3783b02
still trying to fix memory error in travis
mxndrwgrdnr Jun 27, 2018
8d1cc00
added unit test for census cache
mxndrwgrdnr Jun 27, 2018
39601fb
added test for parallel synthesizer
mxndrwgrdnr Jun 28, 2018
2f0e8b5
fixed indentation
mxndrwgrdnr Jun 28, 2018
32c3e75
Merge branch 'master' into parallel_py3
mxndrwgrdnr Jun 28, 2018
82a8f67
relaxed fit quality requirements for tests
mxndrwgrdnr Jun 28, 2018
fd70c64
Merge branch 'parallel_py3' of github.com:UDST/synthpop into parallel…
mxndrwgrdnr Jun 28, 2018
9922683
retain runtime error for max_iterations in IPU and add ignore_max_ite…
mxndrwgrdnr Jul 3, 2018
de52355
increase wait time for travis build
mxndrwgrdnr Jul 3, 2018
97c793b
porting latest changes from rome to oslo
mxndrwgrdnr Nov 6, 2018
6acc0b8
oslo back to rome
mxndrwgrdnr Nov 7, 2018
0c10761
use new parallel method in tests
mxndrwgrdnr Nov 7, 2018
6098f17
updated travis yaml to use specific version of tqdm that should hopef…
mxndrwgrdnr Nov 7, 2018
315bdaa
fixed style errors should pass tests now
mxndrwgrdnr Nov 7, 2018
c180116
fixed style errors should pass tests now
mxndrwgrdnr Nov 7, 2018
5ab4799
starter2 parallel test
cvanoli Jul 15, 2019
6b9b5e2
Add ignore_max_iterations var to synthesize_all functions
cvanoli Aug 6, 2019
0631425
update setup.py
cvanoli Nov 26, 2019
8c9e9ac
Correct the deleted acsyear missing in query function, add h_acs self
cvanoli Jan 27, 2020
d3a3a53
Merge branch 'parallel_py3' of https://github.com/UDST/synthpop into …
cvanoli Jan 27, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ language: python
sudo: false
python:
- '2.7'
- '3.5'
install:
- if [[ "$TRAVIS_PYTHON_VERSION" == "2.7" ]]; then wget http://repo.continuum.io/miniconda/Miniconda-latest-Linux-x86_64.sh
-O miniconda.sh; else wget http://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh
Expand All @@ -13,7 +14,7 @@ install:
- conda update -q conda
- conda info -a
- |
conda create -q -n test-environment python=$TRAVIS_PYTHON_VERSION pip numexpr numpy pandas scipy pytest
conda create -q -n test-environment python=$TRAVIS_PYTHON_VERSION pip numexpr numpy pandas scipy pytest tqdm futures
- source activate test-environment
- pip install pytest-cov coveralls pep8
- pip install .
Expand Down
107 changes: 107 additions & 0 deletions scripts/sfbay_synth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import os
import pandas as pd
from glob import glob
import warnings

from synthpop.census_helpers import Census
from synthpop.recipes.starter2 import Starter
from synthpop.synthesizer import synthesize_all_in_parallel

warnings.filterwarnings('ignore')

counties = [
"Napa County", "Santa Clara County", "Solano County", "San Mateo County",
"Marin County", "San Francisco County", "Sonoma County",
"Contra Costa County", "Alameda County"]

if __name__ == '__main__':

for county in counties:
c = Census(os.environ["CENSUS"])
starter = Starter(os.environ["CENSUS"], "CA", county)

county_dfs = synthesize_all_in_parallel(starter)

hh_all = county_dfs[0]
p_all = county_dfs[1]
fits_all = county_dfs[2]

hh_all.index.name = 'household_id'
p_all.index.name = 'person_id'
p_all.rename(columns={'hh_id': 'household_id'}, inplace=True)

hh_all['age_of_head'] = p_all[p_all.RELP == 0].groupby(
'household_id').AGEP.max()
hh_all['race_of_head'] = p_all[p_all.RELP == 0].groupby(
'household_id').RAC1P.max()
hh_all['workers'] = p_all[p_all.ESR.isin([1, 2, 4, 5])].groupby(
'household_id').size()
hh_all['children'] = p_all[p_all.AGEP < 18].groupby(
'household_id').size()
hh_all['tenure'] = 2
hh_all.tenure[hh_all.TEN < 3] = 1 # tenure coded 1:own, 2:rent
hh_all['recent_mover'] = 0
hh_all.recent_mover[hh_all.MV < 4] = 1 # 1 if recent mover
hh_all = hh_all.rename(columns={
'VEH': 'cars', 'HINCP': 'income', 'NP': 'persons',
'BLD': 'building_type'})

for col in hh_all.columns:
if col not in [
'persons', 'income', 'age_of_head', 'race_of_head',
'hispanic_head', 'workers', 'children', 'cars', 'tenure',
'recent_mover', 'building_type', 'serialno', 'state',
'county', 'tract', 'block group']:
del hh_all[col]

p_all.rename(columns={
'AGEP': 'age', 'RAC1P': 'race_id', 'NP': 'persons',
'SPORDER': 'member_id', 'HISP': 'hispanic', 'RELP': 'relate',
'SEX': 'sex', 'WKHP': 'hours', 'SCHL': 'edu', 'PERNP': 'earning'},
inplace=True)
p_all['student'] = 0
p_all.student[p_all.SCH.isin([2, 3])] = 1
p_all['work_at_home'] = 0
p_all.work_at_home[p_all.JWTR == 11] = 1
p_all['worker'] = 0
p_all.worker[p_all.ESR.isin([1, 2, 4, 5])] = 1

for col in p_all.columns:
if col not in ['household_id', 'member_id',
'relate', 'age', 'sex', 'race_id', 'hispanic',
'student', 'worker', 'hours',
'work_at_home', 'edu', 'earning']:
del p_all[col]

hh_all.to_csv('{0}_hh_synth_parallel.csv'.format(county))
p_all.to_csv('{0}_p_synth_parallel.csv'.format(county))

# concat all the county dfs
hh_fnames = glob('*hh*.csv')

p_df_list = []
hh_df_list = []
hh_index_start = 0
p_index_start = 0

for hh_file in hh_fnames:
county = hh_file.split('_hh')[0]
hh_df = pd.read_csv(hh_file, index_col='household_id', header=0)
p_df = pd.read_csv(
glob(county + '_p*.csv')[0], index_col='person_id', header=0)
print(county + ': {0}'.format(str(hh_df.iloc[0].county)))
hh_df.index += hh_index_start
p_df.household_id += hh_index_start
p_df.index += p_index_start
hh_df_list.append(hh_df)
p_df_list.append(p_df)
hh_index_start = hh_df.index.values[-1] + 1
p_index_start = p_df.index.values[-1] + 1

hh_all = pd.concat(hh_df_list)
p_all = pd.concat(p_df_list)
print(len(hh_all.iloc[hh_all.index.duplicated(keep=False)]))
print(len(p_all.iloc[p_all.index.duplicated(keep=False)]))
p_all.to_csv('sfbay_persons_2018_04_08.csv')
hh_all.to_csv('sfbay_households_2018_04_08.csv')

4 changes: 3 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
url='https://github.com/udst/synthpop',
classifiers=[
'Development Status :: 4 - Beta',
'Programming Language :: Python :: 2.7'
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6'
],
packages=find_packages(exclude=['*.tests']),
install_requires=[
Expand Down
10 changes: 5 additions & 5 deletions synthpop/categorizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
def categorize(df, eval_d, index_cols=None):
cat_df = pd.DataFrame(index=df.index)

for index, expr in eval_d.iteritems():
for index, expr in eval_d.items():
cat_df[index] = df.eval(expr)

if index_cols is not None:
Expand Down Expand Up @@ -47,11 +47,11 @@ def category_combinations(index):
for cat_name, cat_value in index:
d.setdefault(cat_name, [])
d[cat_name].append(cat_value)
for cat_name in d.keys():
for cat_name in list(d):
if len(d[cat_name]) == 1:
del d[cat_name]
df = pd.DataFrame(list(itertools.product(*d.values())))
df.columns = cols = d.keys()
df = pd.DataFrame(list(itertools.product(*list(d.values()))))
df.columns = cols = list(d.keys())
df.index.name = "cat_id"
df = df.reset_index().set_index(cols)
return df
Expand All @@ -62,7 +62,7 @@ def joint_distribution(sample_df, category_df, mapping_functions=None):
# set counts to zero
category_df["frequency"] = 0

category_names = category_df.index.names
category_names = list(category_df.index.names)
if mapping_functions:
for name in category_names:
assert name in mapping_functions, "Every category needs to have " \
Expand Down
6 changes: 3 additions & 3 deletions synthpop/census_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def _query(self, census_columns, state, county, forstr,
def chunks(l, n):
""" Yield successive n-sized chunks from l.
"""
for i in xrange(0, len(l), n):
for i in range(0, len(l), n):
yield l[i:i+n]

for census_column_batch in chunks(census_columns, 45):
Expand All @@ -98,7 +98,7 @@ def chunks(l, n):
df = dfs[0]
for mdf in dfs[1:]:
df = pd.merge(df, mdf, on="NAME", suffixes=("", "_ignore"))
drop_cols = filter(lambda x: "_ignore" in x, df.columns)
drop_cols = list(filter(lambda x: "_ignore" in x, df.columns))
df = df.drop(drop_cols, axis=1)

return df
Expand All @@ -115,7 +115,7 @@ def block_group_and_tract_query(self, block_group_columns,
df = self._scale_and_merge(df1, block_group_size_attr, df2,
tract_size_attr, tract_columns,
merge_columns, suffixes=("", "_ignore"))
drop_cols = filter(lambda x: "_ignore" in x, df.columns)
drop_cols = list(filter(lambda x: "_ignore" in x, df.columns))
df = df.drop(drop_cols, axis=1)

return df
Expand Down
12 changes: 8 additions & 4 deletions synthpop/ipu/ipu.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import numpy as np
import pandas as pd
import warnings


def _drop_zeros(df):
Expand Down Expand Up @@ -99,7 +100,7 @@ def iter_columns(self):
The returned column contains only the non-zero elements.

"""
return self._everything.itervalues()
return self._everything.values()

def get_column(self, key):
"""
Expand Down Expand Up @@ -187,7 +188,7 @@ def _update_weights(column, weights, constraint):
new_weights : ndarray

"""
adj = constraint / (column * weights).sum()
adj = constraint / float((column * weights).sum())
return weights * adj


Expand Down Expand Up @@ -259,9 +260,12 @@ def household_weights(
iterations += 1

if iterations > max_iterations:
raise RuntimeError(
warnings.warn(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a conv with @janowicz we discussed that it would be better if this ignoring of errors was made optional. Could we make an ignore parameter, where, if set to True, would raise warnings instead of errors?

'Maximum number of iterations reached during IPU: {}'.format(
max_iterations))
max_iterations), UserWarning)
return (
pd.Series(best_weights, index=household_freq.index),
best_fit_qual, iterations)

return (
pd.Series(best_weights, index=household_freq.index),
Expand Down
4 changes: 2 additions & 2 deletions synthpop/ipu/test/test_ipu.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def test_household_weights(
def test_household_weights_max_iter(
household_freqs, person_freqs, household_constraints,
person_constraints):
with pytest.raises(RuntimeError):
with pytest.warns(UserWarning):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idem comment to ipu.py line263.

ipu.household_weights(
household_freqs, person_freqs, household_constraints,
person_constraints, convergence=1e-7, max_iterations=10)
Expand All @@ -179,7 +179,7 @@ def test_FrequencyAndConstraints(freq_wrap):
assert freq_wrap.ncols == 5
assert len(list(freq_wrap.iter_columns())) == 5

iter_cols = freq_wrap.iter_columns()
iter_cols = iter(freq_wrap.iter_columns())

key, col, constraint, nz = next(iter_cols)
assert key == ('yes', 'blue')
Expand Down
4 changes: 2 additions & 2 deletions synthpop/recipes/starter2.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ def __init__(self, key, state, county, tract=None):
# that will be in the outputted synthetic population
self.h_pums_cols = ('serialno', 'PUMA00', 'PUMA10', 'RT', 'NP', 'TYPE',
'R65', 'HINCP', 'VEH', 'MV', 'TEN', 'BLD', 'R18')
self.p_pums_cols = ('serialno', 'PUMA00', 'PUMA10', 'RELP', 'AGEP',
'ESR', 'RAC1P', 'HISP', 'SEX')
self.p_pums_cols = ('serialno', 'SPORDER', 'PUMA00', 'PUMA10', 'RELP', 'AGEP',
'ESR', 'SCHL', 'SCH', 'JWTR', 'PERNP', 'WKHP', 'RAC1P', 'HISP', 'SEX')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you adding these variables if they are not used in the synthesis?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They will be.


def get_geography_name(self):
# this synthesis is at the block group level for most variables
Expand Down
Loading