From b37a484427b3b6851e406c7005902d570d6a6337 Mon Sep 17 00:00:00 2001 From: Zeb Burke-Conte Date: Mon, 22 Apr 2024 15:36:02 -0700 Subject: [PATCH 1/6] Add Dask engine to dataset generation functions --- docs/source/conf.py | 1 + setup.py | 5 +- src/pseudopeople/interface.py | 233 +++++++++++++++++++--------- src/pseudopeople/loader.py | 37 ++++- src/pseudopeople/utilities.py | 48 +++++- tests/integration/test_interface.py | 12 ++ 6 files changed, 256 insertions(+), 80 deletions(-) diff --git a/docs/source/conf.py b/docs/source/conf.py index 4188b052..31592778 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -199,6 +199,7 @@ intersphinx_mapping = { "python": ("https://docs.python.org/3.8", None), "pandas": ("https://pandas.pydata.org/pandas-docs/stable/", None), + "dask": ("https://docs.dask.org/en/stable/", None), "tables": ("https://www.pytables.org/", None), "numpy": ("https://numpy.org/doc/stable/", None), "networkx": ("https://networkx.org/documentation/stable/", None), diff --git a/setup.py b/setup.py index 8375ebaf..88f3d61b 100644 --- a/setup.py +++ b/setup.py @@ -51,10 +51,12 @@ "jupyter", ] + dask_requirements = ["dask"] + test_requirements = [ "pytest", "pytest-mock", - ] + ] + dask_requirements lint_requirements = [ "black==22.3.0", @@ -109,6 +111,7 @@ + test_requirements + interactive_requirements + lint_requirements, + "dask": dask_requirements, }, # entry_points=""" # [console_scripts] diff --git a/src/pseudopeople/interface.py b/src/pseudopeople/interface.py index bdd63dda..b1b081c8 100644 --- a/src/pseudopeople/interface.py +++ b/src/pseudopeople/interface.py @@ -1,11 +1,12 @@ from pathlib import Path -from typing import Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Literal, Optional, Union import numpy as np import pandas as pd from loguru import logger from packaging.version import parse from tqdm import tqdm +from vivarium import ConfigTree from pseudopeople import __version__ as psp_version from pseudopeople.configuration import get_configuration @@ -16,12 +17,15 @@ INT_COLUMNS, ) from pseudopeople.exceptions import DataSourceError -from pseudopeople.loader import load_standard_dataset_file +from pseudopeople.loader import load_standard_dataset from pseudopeople.noise import noise_dataset from pseudopeople.schema_entities import COLUMNS, DATASETS, Dataset from pseudopeople.utilities import ( + PANDAS_ENGINE, + DataFrame, cleanse_integer_columns, configure_logging_to_terminal, + get_engine_from_string, get_state_abbreviation, ) @@ -33,7 +37,8 @@ def _generate_dataset( config: Union[Path, str, Dict], user_filters: List[tuple], verbose: bool = False, -) -> pd.DataFrame: + engine_name: Literal["pandas", "dask"] = "pandas", +) -> DataFrame: """ Helper for generating noised datasets. @@ -49,8 +54,10 @@ def _generate_dataset( List of parquet filters, possibly empty :param verbose: Log with verbosity if True. Default is False. + :param engine_name: + String indicating engine to use for loading data. Determines the return type. :return: - Noised dataset data in a pd.DataFrame + Noised dataset data in a dataframe """ configure_logging_to_terminal(verbose) configuration_tree = get_configuration(config, dataset, user_filters) @@ -61,55 +68,97 @@ def _generate_dataset( source = Path(source) validate_source_compatibility(source, dataset) - data_paths = fetch_filepaths(dataset, source) - if not data_paths: - raise DataSourceError( - f"No datasets found at directory {str(source)}. " - "Please provide the path to the unmodified root data directory." + engine = get_engine_from_string(engine_name) + + if engine == PANDAS_ENGINE: + # We process shards serially + data_paths = fetch_filepaths(dataset, source) + if not data_paths: + raise DataSourceError( + f"No datasets found at directory {str(source)}. " + "Please provide the path to the unmodified root data directory." + ) + + validate_data_path_suffix(data_paths) + + # Iterate sequentially + noised_dataset = [] + iterator = ( + tqdm(data_paths, desc="Noising data", leave=False) + if len(data_paths) > 1 + else data_paths ) - validate_data_path_suffix(data_paths) - noised_dataset = [] - iterator = ( - tqdm(data_paths, desc="Noising data", leave=False) - if len(data_paths) > 1 - else data_paths - ) - for data_path_index, data_path in enumerate(iterator): - logger.debug(f"Loading data from {data_path}.") - data = _load_data_from_path(data_path, user_filters) - if data.empty: - continue - data = _reformat_dates_for_noising(data, dataset) - data = _coerce_dtypes(data, dataset) - # Use a different seed for each data file/shard, otherwise the randomness will duplicate - # and the Nth row in each shard will get the same noise - data_path_seed = f"{seed}_{data_path_index}" - noised_data = noise_dataset(dataset, data, configuration_tree, data_path_seed) - noised_data = _extract_columns(dataset.columns, noised_data) - noised_dataset.append(noised_data) - - # Check if all shards for the dataset are empty - if len(noised_dataset) == 0: - raise ValueError( - "Invalid value provided for 'state' or 'year'. No data found with " - f"the user provided 'state' or 'year' filters at {data_path}." + for data_path_index, data_path in enumerate(iterator): + logger.debug(f"Loading data from {data_path}.") + data = load_standard_dataset(data_path, user_filters, engine=engine, is_file=True) + if len(data.index) == 0: + continue + # Use a different seed for each data file/shard, otherwise the randomness will duplicate + # and the Nth row in each shard will get the same noise + data_path_seed = f"{seed}_{data_path_index}" + noised_data = _prep_and_noise_dataset( + data, dataset, configuration_tree, data_path_seed + ) + noised_dataset.append(noised_data) + + # Check if all shards for the dataset are empty + if len(noised_dataset) == 0: + raise ValueError( + "Invalid value provided for 'state' or 'year'. No data found with " + f"the user provided 'state' or 'year' filters at {data_path}." + ) + noised_dataset = pd.concat(noised_dataset, ignore_index=True) + + # Known pandas bug: pd.concat does not preserve category dtypes so we coerce + # again after concat (https://github.com/pandas-dev/pandas/issues/51362) + noised_dataset = _coerce_dtypes( + noised_dataset, + dataset, + cleanse_int_cols=True, + ) + else: + # Let dask deal with how to partition the shards -- the data path is the + # entire directory containing the parquet files + data_path = source / dataset.name + data = load_standard_dataset(data_path, user_filters, engine=engine, is_file=False) + + # Check if all shards for the dataset are empty + if len(data) == 0: + raise ValueError( + "Invalid value provided for 'state' or 'year'. No data found with " + f"the user provided 'state' or 'year' filters at {data_path}." + ) + + noised_dataset = data.map_partitions( + lambda df, partition_info=None: _coerce_dtypes( + _prep_and_noise_dataset( + df, + dataset, + configuration_tree, + seed=f"{seed}_{partition_info['number'] if partition_info is not None else 1}", + ), + dataset, + cleanse_int_cols=True, + ), + meta=[(c.name, c.dtype_name) for c in dataset.columns], ) - noised_dataset = pd.concat(noised_dataset, ignore_index=True) - - # Known pandas bug: pd.concat does not preserve category dtypes so we coerce - # again after concat (https://github.com/pandas-dev/pandas/issues/51362) - noised_dataset = _coerce_dtypes( - noised_dataset, - dataset, - cleanse_int_cols=True, - ) logger.debug("*** Finished ***") return noised_dataset +def _prep_and_noise_dataset( + data: pd.DataFrame, dataset: Dataset, configuration_tree: ConfigTree, seed: Any +) -> pd.DataFrame: + data = _reformat_dates_for_noising(data, dataset) + data = _coerce_dtypes(data, dataset) + noised_data = noise_dataset(dataset, data, configuration_tree, seed) + noised_data = _extract_columns(dataset.columns, noised_data) + return noised_data + + def validate_source_compatibility(source: Path, dataset: Dataset): # TODO [MIC-4546]: Clean this up w/ metadata and update test_interface.py tests to be generic directories = [x.name for x in source.iterdir() if x.is_dir()] @@ -168,12 +217,6 @@ def _coerce_dtypes( return data -def _load_data_from_path(data_path: Path, user_filters: List[Tuple]) -> pd.DataFrame: - """Load data from a data file given a data_path and a year_filter.""" - data = load_standard_dataset_file(data_path, user_filters) - return data - - def _reformat_dates_for_noising(data: pd.DataFrame, dataset: Dataset): """Formats date columns so they can be noised as strings.""" data = data.copy() @@ -229,7 +272,8 @@ def generate_decennial_census( year: Optional[int] = 2020, state: Optional[str] = None, verbose: bool = False, -) -> pd.DataFrame: + engine: Literal["pandas", "dask"] = "pandas", +) -> DataFrame: """ Generates a pseudopeople decennial census dataset which represents simulated responses to the US Census Bureau's Census of Population @@ -273,9 +317,13 @@ def generate_decennial_census( Log with verbosity if `True`. Default is `False`. + :param engine: + + Engine to use for loading data. Determines the return type. + :return: - A `pandas.DataFrame` of simulated decennial census data. + A DataFrame of simulated decennial census data. :raises ConfigurationError: @@ -298,7 +346,9 @@ def generate_decennial_census( user_filters.append( (DATASETS.census.state_column_name, "==", get_state_abbreviation(state)) ) - return _generate_dataset(DATASETS.census, source, seed, config, user_filters, verbose) + return _generate_dataset( + DATASETS.census, source, seed, config, user_filters, verbose, engine_name=engine + ) def generate_american_community_survey( @@ -308,7 +358,8 @@ def generate_american_community_survey( year: Optional[int] = 2020, state: Optional[str] = None, verbose: bool = False, -) -> pd.DataFrame: + engine: Literal["pandas", "dask"] = "pandas", +) -> DataFrame: """ Generates a pseudopeople ACS dataset which represents simulated responses to the ACS survey. @@ -359,9 +410,13 @@ def generate_american_community_survey( Log with verbosity if `True`. Default is `False`. + :param engine: + + Engine to use for loading data. Determines the return type. + :return: - A `pandas.DataFrame` of simulated ACS data. + A DataFrame of simulated ACS data. :raises ConfigurationError: @@ -401,7 +456,9 @@ def generate_american_community_survey( user_filters.extend( [(DATASETS.acs.state_column_name, "==", get_state_abbreviation(state))] ) - return _generate_dataset(DATASETS.acs, source, seed, config, user_filters, verbose) + return _generate_dataset( + DATASETS.acs, source, seed, config, user_filters, verbose, engine_name=engine + ) def generate_current_population_survey( @@ -411,7 +468,8 @@ def generate_current_population_survey( year: Optional[int] = 2020, state: Optional[str] = None, verbose: bool = False, -) -> pd.DataFrame: + engine: Literal["pandas", "dask"] = "pandas", +) -> DataFrame: """ Generates a pseudopeople CPS dataset which represents simulated responses to the CPS survey. @@ -463,9 +521,13 @@ def generate_current_population_survey( Log with verbosity if `True`. Default is `False`. + :param engine: + + Engine to use for loading data. Determines the return type. + :return: - A `pandas.DataFrame` of simulated CPS data. + A DataFrame of simulated CPS data. :raises ConfigurationError: @@ -505,7 +567,9 @@ def generate_current_population_survey( user_filters.extend( [(DATASETS.cps.state_column_name, "==", get_state_abbreviation(state))] ) - return _generate_dataset(DATASETS.cps, source, seed, config, user_filters, verbose) + return _generate_dataset( + DATASETS.cps, source, seed, config, user_filters, verbose, engine_name=engine + ) def generate_taxes_w2_and_1099( @@ -515,7 +579,8 @@ def generate_taxes_w2_and_1099( year: Optional[int] = 2020, state: Optional[str] = None, verbose: bool = False, -) -> pd.DataFrame: + engine: Literal["pandas", "dask"] = "pandas", +) -> DataFrame: """ Generates a pseudopeople W2 and 1099 tax dataset which represents simulated tax form data. @@ -558,9 +623,13 @@ def generate_taxes_w2_and_1099( Log with verbosity if `True`. Default is `False`. + :param engine: + + Engine to use for loading data. Determines the return type. + :return: - A `pandas.DataFrame` of simulated W2 and 1099 tax data. + A DataFrame of simulated W2 and 1099 tax data. :raises ConfigurationError: @@ -585,7 +654,7 @@ def generate_taxes_w2_and_1099( (DATASETS.tax_w2_1099.state_column_name, "==", get_state_abbreviation(state)) ) return _generate_dataset( - DATASETS.tax_w2_1099, source, seed, config, user_filters, verbose + DATASETS.tax_w2_1099, source, seed, config, user_filters, verbose, engine_name=engine ) @@ -596,7 +665,8 @@ def generate_women_infants_and_children( year: Optional[int] = 2020, state: Optional[str] = None, verbose: bool = False, -) -> pd.DataFrame: + engine: Literal["pandas", "dask"] = "pandas", +) -> DataFrame: """ Generates a pseudopeople WIC dataset which represents a simulated version of the administrative data that would be recorded by WIC. @@ -649,9 +719,14 @@ def generate_women_infants_and_children( :param verbose: Log with verbosity if `True`. Default is `False`. + + :param engine: + + Engine to use for loading data. Determines the return type. + :return: - A `pandas.DataFrame` of simulated WIC data. + A DataFrame of simulated WIC data. :raises ConfigurationError: @@ -675,7 +750,9 @@ def generate_women_infants_and_children( user_filters.append( (DATASETS.wic.state_column_name, "==", get_state_abbreviation(state)) ) - return _generate_dataset(DATASETS.wic, source, seed, config, user_filters, verbose) + return _generate_dataset( + DATASETS.wic, source, seed, config, user_filters, verbose, engine_name=engine + ) def generate_social_security( @@ -684,7 +761,8 @@ def generate_social_security( config: Union[Path, str, Dict[str, Dict]] = None, year: Optional[int] = 2020, verbose: bool = False, -) -> pd.DataFrame: + engine: Literal["pandas", "dask"] = "pandas", +) -> DataFrame: """ Generates a pseudopeople SSA dataset which represents simulated Social Security Administration (SSA) data. @@ -718,9 +796,13 @@ def generate_social_security( Log with verbosity if `True`. Default is `False`. + :param engine: + + Engine to use for loading data. Determines the return type. + :return: - A `pandas.DataFrame` of simulated SSA data. + A DataFrame of simulated SSA data. :raises ConfigurationError: @@ -749,7 +831,9 @@ def generate_social_security( except (pd.errors.OutOfBoundsDatetime, ValueError): raise ValueError(f"Invalid year provided: '{year}'") seed = seed * 10_000 + year - return _generate_dataset(DATASETS.ssa, source, seed, config, user_filters, verbose) + return _generate_dataset( + DATASETS.ssa, source, seed, config, user_filters, verbose, engine_name=engine + ) def generate_taxes_1040( @@ -759,7 +843,8 @@ def generate_taxes_1040( year: Optional[int] = 2020, state: Optional[str] = None, verbose: bool = False, -) -> pd.DataFrame: + engine: Literal["pandas", "dask"] = "pandas", +) -> DataFrame: """ Generates a pseudopeople 1040 tax dataset which represents simulated tax form data. @@ -802,9 +887,13 @@ def generate_taxes_1040( Log with verbosity if `True`. Default is `False`. + :param engine: + + Engine to use for loading data. Determines the return type. + :return: - A `pandas.DataFrame` of simulated 1040 tax data. + A DataFrame of simulated 1040 tax data. :raises ConfigurationError: @@ -828,7 +917,9 @@ def generate_taxes_1040( user_filters.append( (DATASETS.tax_1040.state_column_name, "==", get_state_abbreviation(state)) ) - return _generate_dataset(DATASETS.tax_1040, source, seed, config, user_filters, verbose) + return _generate_dataset( + DATASETS.tax_1040, source, seed, config, user_filters, verbose, engine_name=engine + ) def fetch_filepaths(dataset: Dataset, source: Path) -> Union[List, List[dict]]: diff --git a/src/pseudopeople/loader.py b/src/pseudopeople/loader.py index fc786659..f2f65b14 100644 --- a/src/pseudopeople/loader.py +++ b/src/pseudopeople/loader.py @@ -7,21 +7,44 @@ from pseudopeople.constants.metadata import DatasetNames from pseudopeople.exceptions import DataSourceError +from pseudopeople.utilities import PANDAS_ENGINE, DataFrame, Engine -def load_standard_dataset_file(data_path: Path, user_filters: List[Tuple]) -> pd.DataFrame: - if data_path.suffix == ".parquet": +def load_standard_dataset( + data_path: Path, + user_filters: List[Tuple], + engine: Engine = PANDAS_ENGINE, + is_file: bool = True, +) -> DataFrame: + if is_file and data_path.suffix != ".parquet": + raise DataSourceError( + f"Source path must be a .parquet file. Provided {data_path.suffix}" + ) + + if engine == PANDAS_ENGINE: if len(user_filters) == 0: # pyarrow.parquet.read_table doesn't accept an empty list user_filters = None data = pq.read_table(data_path, filters=user_filters).to_pandas() + + # TODO: The index in our simulated population files is never meaningful. + # For some reason, the 1040 dataset is currently saved with a non-RangeIndex + # in the large data, and all datasets have a non-RangeIndex in the sample data. + # If we don't drop these here, our index can have duplicates when we load multiple + # shards at once. Having duplicates in the index breaks much of + # our noising logic. + data.reset_index(drop=True, inplace=True) else: + # Dask + import dask.dataframe as dd + + data = dd.read_parquet(str(data_path), filters=user_filters) + # See TODO above. + data = data.reset_index(drop=True) + + if not isinstance(data, engine.dataframe_class): raise DataSourceError( - f"Source path must be a .parquet file. Provided {data_path.suffix}" - ) - if not isinstance(data, pd.DataFrame): - raise DataSourceError( - f"File located at {data_path} must contain a pandas DataFrame. " + f"File located at {data_path} must contain a DataFrame. " "Please provide the path to the unmodified root data directory." ) diff --git a/src/pseudopeople/utilities.py b/src/pseudopeople/utilities.py index 0e6a71b6..ecd4a011 100644 --- a/src/pseudopeople/utilities.py +++ b/src/pseudopeople/utilities.py @@ -1,6 +1,7 @@ import sys +from dataclasses import dataclass from functools import cache -from typing import Any, Optional, Union +from typing import Any, Callable, Optional, Union import numpy as np import pandas as pd @@ -224,6 +225,51 @@ def count_occurrences(string, sub): return count +#################### +# Engine utilities # +#################### + + +@dataclass +class Engine: + name: str + dataframe_class_getter: Callable + + @property + def dataframe_class(self): + return self.dataframe_class_getter() + + +PANDAS_ENGINE = Engine("pandas", lambda: pd.DataFrame) + + +def get_dask_dataframe(): + import dask.dataframe as dd + + return dd.DataFrame + + +DASK_ENGINE = Engine("dask", get_dask_dataframe) + + +def get_engine_from_string(engine: str): + if engine == "pandas": + return PANDAS_ENGINE + elif engine == "dask": + return DASK_ENGINE + else: + raise ValueError(f"Unknown engine {engine}") + + +try: + # Optional dependency + import dask.dataframe as dd + + DataFrame = Union[dd.DataFrame, pd.DataFrame] +except ImportError: + DataFrame = pd.DataFrame + + ########################## # Data utility functions # ########################## diff --git a/tests/integration/test_interface.py b/tests/integration/test_interface.py index e8a1c2d5..533125fa 100644 --- a/tests/integration/test_interface.py +++ b/tests/integration/test_interface.py @@ -79,8 +79,16 @@ DATASETS.tax_1040.name, ], ) +@pytest.mark.parametrize( + "engine", + [ + "pandas", + "dask", + ], +) def test_generate_dataset_from_multiple_shards( dataset_name: str, + engine: str, config, request, split_sample_data_dir, @@ -101,9 +109,13 @@ def test_generate_dataset_from_multiple_shards( seed=SEED, year=None, source=split_sample_data_dir, + engine=engine, config=config, ) + if engine == "dask": + noised_dataset = noised_dataset.compute() + # Check same order of magnitude of rows was removed -- we don't know the # full data size (we would need unnoised data for that), so we just check # for similar lengths From c47bd4202eae7ee94c4f6c3ca85d7e2196672268 Mon Sep 17 00:00:00 2001 From: Zeb Burke-Conte Date: Tue, 23 Apr 2024 18:13:34 -0700 Subject: [PATCH 2/6] Switch to layered_config_tree --- src/pseudopeople/interface.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pseudopeople/interface.py b/src/pseudopeople/interface.py index b1b081c8..6d67e196 100644 --- a/src/pseudopeople/interface.py +++ b/src/pseudopeople/interface.py @@ -3,10 +3,10 @@ import numpy as np import pandas as pd +from layered_config_tree import LayeredConfigTree from loguru import logger from packaging.version import parse from tqdm import tqdm -from vivarium import ConfigTree from pseudopeople import __version__ as psp_version from pseudopeople.configuration import get_configuration @@ -150,7 +150,7 @@ def _generate_dataset( def _prep_and_noise_dataset( - data: pd.DataFrame, dataset: Dataset, configuration_tree: ConfigTree, seed: Any + data: pd.DataFrame, dataset: Dataset, configuration_tree: LayeredConfigTree, seed: Any ) -> pd.DataFrame: data = _reformat_dates_for_noising(data, dataset) data = _coerce_dtypes(data, dataset) From 9172062a70ec7fb49cdd329748e94f2fafd20df8 Mon Sep 17 00:00:00 2001 From: Zeb Burke-Conte Date: Thu, 25 Apr 2024 09:37:28 -0700 Subject: [PATCH 3/6] Clarify file-directory distinction --- src/pseudopeople/interface.py | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/src/pseudopeople/interface.py b/src/pseudopeople/interface.py index 6d67e196..49c07dea 100644 --- a/src/pseudopeople/interface.py +++ b/src/pseudopeople/interface.py @@ -72,31 +72,33 @@ def _generate_dataset( if engine == PANDAS_ENGINE: # We process shards serially - data_paths = fetch_filepaths(dataset, source) - if not data_paths: + data_file_paths = fetch_filepaths(dataset, source) + if not data_file_paths: raise DataSourceError( f"No datasets found at directory {str(source)}. " "Please provide the path to the unmodified root data directory." ) - validate_data_path_suffix(data_paths) + validate_data_path_suffix(data_file_paths) # Iterate sequentially noised_dataset = [] iterator = ( - tqdm(data_paths, desc="Noising data", leave=False) - if len(data_paths) > 1 - else data_paths + tqdm(data_file_paths, desc="Noising data", leave=False) + if len(data_file_paths) > 1 + else data_file_paths ) - for data_path_index, data_path in enumerate(iterator): - logger.debug(f"Loading data from {data_path}.") - data = load_standard_dataset(data_path, user_filters, engine=engine, is_file=True) + for data_file_index, data_file_path in enumerate(iterator): + logger.debug(f"Loading data from {data_file_path}.") + data = load_standard_dataset( + data_file_path, user_filters, engine=engine, is_file=True + ) if len(data.index) == 0: continue # Use a different seed for each data file/shard, otherwise the randomness will duplicate # and the Nth row in each shard will get the same noise - data_path_seed = f"{seed}_{data_path_index}" + data_path_seed = f"{seed}_{data_file_index}" noised_data = _prep_and_noise_dataset( data, dataset, configuration_tree, data_path_seed ) @@ -106,7 +108,7 @@ def _generate_dataset( if len(noised_dataset) == 0: raise ValueError( "Invalid value provided for 'state' or 'year'. No data found with " - f"the user provided 'state' or 'year' filters at {data_path}." + f"the user provided 'state' or 'year' filters at {source / dataset.name}." ) noised_dataset = pd.concat(noised_dataset, ignore_index=True) @@ -118,16 +120,18 @@ def _generate_dataset( cleanse_int_cols=True, ) else: - # Let dask deal with how to partition the shards -- the data path is the + # Let dask deal with how to partition the shards -- we pass it the # entire directory containing the parquet files - data_path = source / dataset.name - data = load_standard_dataset(data_path, user_filters, engine=engine, is_file=False) + data_directory_path = source / dataset.name + data = load_standard_dataset( + data_directory_path, user_filters, engine=engine, is_file=False + ) # Check if all shards for the dataset are empty if len(data) == 0: raise ValueError( "Invalid value provided for 'state' or 'year'. No data found with " - f"the user provided 'state' or 'year' filters at {data_path}." + f"the user provided 'state' or 'year' filters at {data_directory_path}." ) noised_dataset = data.map_partitions( From 8c925a4b6dc5921df6441865a1e7771f466e75ad Mon Sep 17 00:00:00 2001 From: Zeb Burke-Conte Date: Thu, 25 Apr 2024 09:37:44 -0700 Subject: [PATCH 4/6] Include default in docstrings --- src/pseudopeople/interface.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/pseudopeople/interface.py b/src/pseudopeople/interface.py index 49c07dea..5ea4dfbe 100644 --- a/src/pseudopeople/interface.py +++ b/src/pseudopeople/interface.py @@ -324,6 +324,7 @@ def generate_decennial_census( :param engine: Engine to use for loading data. Determines the return type. + Default is "pandas" which returns a pandas DataFrame. :return: @@ -417,6 +418,7 @@ def generate_american_community_survey( :param engine: Engine to use for loading data. Determines the return type. + Default is "pandas" which returns a pandas DataFrame. :return: @@ -528,6 +530,7 @@ def generate_current_population_survey( :param engine: Engine to use for loading data. Determines the return type. + Default is "pandas" which returns a pandas DataFrame. :return: @@ -630,6 +633,7 @@ def generate_taxes_w2_and_1099( :param engine: Engine to use for loading data. Determines the return type. + Default is "pandas" which returns a pandas DataFrame. :return: @@ -727,6 +731,7 @@ def generate_women_infants_and_children( :param engine: Engine to use for loading data. Determines the return type. + Default is "pandas" which returns a pandas DataFrame. :return: @@ -803,6 +808,7 @@ def generate_social_security( :param engine: Engine to use for loading data. Determines the return type. + Default is "pandas" which returns a pandas DataFrame. :return: @@ -894,6 +900,7 @@ def generate_taxes_1040( :param engine: Engine to use for loading data. Determines the return type. + Default is "pandas" which returns a pandas DataFrame. :return: From ae96e84dd853379c7ee9fbafc1273178e7983574 Mon Sep 17 00:00:00 2001 From: Zeb Burke-Conte Date: Thu, 25 Apr 2024 13:25:48 -0700 Subject: [PATCH 5/6] Add more docs --- docs/source/simulated_populations/index.rst | 8 ++++++ src/pseudopeople/interface.py | 28 +++++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/docs/source/simulated_populations/index.rst b/docs/source/simulated_populations/index.rst index 4ebfba67..446f5df1 100644 --- a/docs/source/simulated_populations/index.rst +++ b/docs/source/simulated_populations/index.rst @@ -137,3 +137,11 @@ or United States), unzip the contents to the desired location on your computer. Once you've unzipped the simulated population data, you can pass the directory path to the :code:`source` parameter of the :ref:`dataset generation functions ` to generate large-scale datasets! + +If you're using one of the larger populations, you'll also want to take a look at the +:code:`engine` parameter. +By default, pseudopeople generates datasets using Pandas, which does not fully parallelize +across cores and requires the entire dataset to fit into RAM. +However, by passing "dask" to the :code:`engine` parameter, you can run the dataset +generation on a Dask cluster, which can spill data to disk and even distribute +the computation across multiple computers! diff --git a/src/pseudopeople/interface.py b/src/pseudopeople/interface.py index 2c7ca35f..3724364a 100644 --- a/src/pseudopeople/interface.py +++ b/src/pseudopeople/interface.py @@ -339,6 +339,10 @@ def generate_decennial_census( Engine to use for loading data. Determines the return type. Default is "pandas" which returns a pandas DataFrame. + "dask" returns a Dask DataFrame and requires Dask to be + installed (e.g. `pip install pseudopeople[dask]`). + It runs the dataset generation on a Dask cluster, which can + parallelize and run out-of-core. :return: @@ -433,6 +437,10 @@ def generate_american_community_survey( Engine to use for loading data. Determines the return type. Default is "pandas" which returns a pandas DataFrame. + "dask" returns a Dask DataFrame and requires Dask to be + installed (e.g. `pip install pseudopeople[dask]`). + It runs the dataset generation on a Dask cluster, which can + parallelize and run out-of-core. :return: @@ -545,6 +553,10 @@ def generate_current_population_survey( Engine to use for loading data. Determines the return type. Default is "pandas" which returns a pandas DataFrame. + "dask" returns a Dask DataFrame and requires Dask to be + installed (e.g. `pip install pseudopeople[dask]`). + It runs the dataset generation on a Dask cluster, which can + parallelize and run out-of-core. :return: @@ -648,6 +660,10 @@ def generate_taxes_w2_and_1099( Engine to use for loading data. Determines the return type. Default is "pandas" which returns a pandas DataFrame. + "dask" returns a Dask DataFrame and requires Dask to be + installed (e.g. `pip install pseudopeople[dask]`). + It runs the dataset generation on a Dask cluster, which can + parallelize and run out-of-core. :return: @@ -746,6 +762,10 @@ def generate_women_infants_and_children( Engine to use for loading data. Determines the return type. Default is "pandas" which returns a pandas DataFrame. + "dask" returns a Dask DataFrame and requires Dask to be + installed (e.g. `pip install pseudopeople[dask]`). + It runs the dataset generation on a Dask cluster, which can + parallelize and run out-of-core. :return: @@ -823,6 +843,10 @@ def generate_social_security( Engine to use for loading data. Determines the return type. Default is "pandas" which returns a pandas DataFrame. + "dask" returns a Dask DataFrame and requires Dask to be + installed (e.g. `pip install pseudopeople[dask]`). + It runs the dataset generation on a Dask cluster, which can + parallelize and run out-of-core. :return: @@ -915,6 +939,10 @@ def generate_taxes_1040( Engine to use for loading data. Determines the return type. Default is "pandas" which returns a pandas DataFrame. + "dask" returns a Dask DataFrame and requires Dask to be + installed (e.g. `pip install pseudopeople[dask]`). + It runs the dataset generation on a Dask cluster, which can + parallelize and run out-of-core. :return: From e7ac21c5245a03ecb9c0a8f1fc9cc0f4432ee62d Mon Sep 17 00:00:00 2001 From: Zeb Burke-Conte Date: Thu, 25 Apr 2024 14:09:40 -0700 Subject: [PATCH 6/6] Remove cleanse_int_cols argument --- src/pseudopeople/interface.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/pseudopeople/interface.py b/src/pseudopeople/interface.py index 3724364a..36cc50aa 100644 --- a/src/pseudopeople/interface.py +++ b/src/pseudopeople/interface.py @@ -141,7 +141,6 @@ def _generate_dataset( seed=f"{seed}_{partition_info['number'] if partition_info is not None else 1}", ), dataset, - cleanse_int_cols=True, ), meta=[(c.name, c.dtype_name) for c in dataset.columns], )