diff --git a/docs/source/conf.py b/docs/source/conf.py index 4188b052..31592778 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -199,6 +199,7 @@ intersphinx_mapping = { "python": ("https://docs.python.org/3.8", None), "pandas": ("https://pandas.pydata.org/pandas-docs/stable/", None), + "dask": ("https://docs.dask.org/en/stable/", None), "tables": ("https://www.pytables.org/", None), "numpy": ("https://numpy.org/doc/stable/", None), "networkx": ("https://networkx.org/documentation/stable/", None), diff --git a/docs/source/simulated_populations/index.rst b/docs/source/simulated_populations/index.rst index 4ebfba67..446f5df1 100644 --- a/docs/source/simulated_populations/index.rst +++ b/docs/source/simulated_populations/index.rst @@ -137,3 +137,11 @@ or United States), unzip the contents to the desired location on your computer. Once you've unzipped the simulated population data, you can pass the directory path to the :code:`source` parameter of the :ref:`dataset generation functions ` to generate large-scale datasets! + +If you're using one of the larger populations, you'll also want to take a look at the +:code:`engine` parameter. +By default, pseudopeople generates datasets using Pandas, which does not fully parallelize +across cores and requires the entire dataset to fit into RAM. +However, by passing "dask" to the :code:`engine` parameter, you can run the dataset +generation on a Dask cluster, which can spill data to disk and even distribute +the computation across multiple computers! diff --git a/setup.py b/setup.py index 3d7c76e8..fc208b24 100644 --- a/setup.py +++ b/setup.py @@ -51,10 +51,12 @@ "jupyter", ] + dask_requirements = ["dask"] + test_requirements = [ "pytest", "pytest-mock", - ] + ] + dask_requirements lint_requirements = [ "black==22.3.0", @@ -109,6 +111,7 @@ + test_requirements + interactive_requirements + lint_requirements, + "dask": dask_requirements, }, # entry_points=""" # [console_scripts] diff --git a/src/pseudopeople/interface.py b/src/pseudopeople/interface.py index 6dd6e9a6..36cc50aa 100644 --- a/src/pseudopeople/interface.py +++ b/src/pseudopeople/interface.py @@ -1,8 +1,9 @@ from pathlib import Path -from typing import Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Literal, Optional, Union import numpy as np import pandas as pd +from layered_config_tree import LayeredConfigTree from loguru import logger from packaging.version import parse from tqdm import tqdm @@ -16,12 +17,15 @@ INT_TO_STRING_COLUMNS, ) from pseudopeople.exceptions import DataSourceError -from pseudopeople.loader import load_standard_dataset_file +from pseudopeople.loader import load_standard_dataset from pseudopeople.noise import noise_dataset from pseudopeople.schema_entities import COLUMNS, DATASETS, Dataset, DtypeNames from pseudopeople.utilities import ( + PANDAS_ENGINE, + DataFrame, cleanse_integer_columns, configure_logging_to_terminal, + get_engine_from_string, get_state_abbreviation, to_string_preserve_nans, ) @@ -34,7 +38,8 @@ def _generate_dataset( config: Union[Path, str, Dict], user_filters: List[tuple], verbose: bool = False, -) -> pd.DataFrame: + engine_name: Literal["pandas", "dask"] = "pandas", +) -> DataFrame: """ Helper for generating noised datasets. @@ -50,8 +55,10 @@ def _generate_dataset( List of parquet filters, possibly empty :param verbose: Log with verbosity if True. Default is False. + :param engine_name: + String indicating engine to use for loading data. Determines the return type. :return: - Noised dataset data in a pd.DataFrame + Noised dataset data in a dataframe """ configure_logging_to_terminal(verbose) configuration_tree = get_configuration(config, dataset, user_filters) @@ -62,52 +69,97 @@ def _generate_dataset( source = Path(source) validate_source_compatibility(source, dataset) - data_paths = fetch_filepaths(dataset, source) - if not data_paths: - raise DataSourceError( - f"No datasets found at directory {str(source)}. " - "Please provide the path to the unmodified root data directory." + engine = get_engine_from_string(engine_name) + + if engine == PANDAS_ENGINE: + # We process shards serially + data_file_paths = fetch_filepaths(dataset, source) + if not data_file_paths: + raise DataSourceError( + f"No datasets found at directory {str(source)}. " + "Please provide the path to the unmodified root data directory." + ) + + validate_data_path_suffix(data_file_paths) + + # Iterate sequentially + noised_dataset = [] + iterator = ( + tqdm(data_file_paths, desc="Noising data", leave=False) + if len(data_file_paths) > 1 + else data_file_paths ) - validate_data_path_suffix(data_paths) - noised_dataset = [] - iterator = ( - tqdm(data_paths, desc="Noising data", leave=False) - if len(data_paths) > 1 - else data_paths - ) - for data_path_index, data_path in enumerate(iterator): - logger.debug(f"Loading data from {data_path}.") - data = _load_data_from_path(data_path, user_filters) - if data.empty: - continue - data = _reformat_dates_for_noising(data, dataset) - data = _clean_input_data(data, dataset) - # Use a different seed for each data file/shard, otherwise the randomness will duplicate - # and the Nth row in each shard will get the same noise - data_path_seed = f"{seed}_{data_path_index}" - noised_data = noise_dataset(dataset, data, configuration_tree, data_path_seed) - noised_data = _extract_columns(dataset.columns, noised_data) - noised_dataset.append(noised_data) - - # Check if all shards for the dataset are empty - if len(noised_dataset) == 0: - raise ValueError( - "Invalid value provided for 'state' or 'year'. No data found with " - f"the user provided 'state' or 'year' filters at {data_path}." + for data_file_index, data_file_path in enumerate(iterator): + logger.debug(f"Loading data from {data_file_path}.") + data = load_standard_dataset( + data_file_path, user_filters, engine=engine, is_file=True + ) + if len(data.index) == 0: + continue + # Use a different seed for each data file/shard, otherwise the randomness will duplicate + # and the Nth row in each shard will get the same noise + data_path_seed = f"{seed}_{data_file_index}" + noised_data = _prep_and_noise_dataset( + data, dataset, configuration_tree, data_path_seed + ) + noised_dataset.append(noised_data) + + # Check if all shards for the dataset are empty + if len(noised_dataset) == 0: + raise ValueError( + "Invalid value provided for 'state' or 'year'. No data found with " + f"the user provided 'state' or 'year' filters at {source / dataset.name}." + ) + noised_dataset = pd.concat(noised_dataset, ignore_index=True) + + noised_dataset = _coerce_dtypes( + noised_dataset, + dataset, + ) + else: + # Let dask deal with how to partition the shards -- we pass it the + # entire directory containing the parquet files + data_directory_path = source / dataset.name + data = load_standard_dataset( + data_directory_path, user_filters, engine=engine, is_file=False ) - noised_dataset = pd.concat(noised_dataset, ignore_index=True) - noised_dataset = _coerce_dtypes( - noised_dataset, - dataset, - ) + # Check if all shards for the dataset are empty + if len(data) == 0: + raise ValueError( + "Invalid value provided for 'state' or 'year'. No data found with " + f"the user provided 'state' or 'year' filters at {data_directory_path}." + ) + + noised_dataset = data.map_partitions( + lambda df, partition_info=None: _coerce_dtypes( + _prep_and_noise_dataset( + df, + dataset, + configuration_tree, + seed=f"{seed}_{partition_info['number'] if partition_info is not None else 1}", + ), + dataset, + ), + meta=[(c.name, c.dtype_name) for c in dataset.columns], + ) logger.debug("*** Finished ***") return noised_dataset +def _prep_and_noise_dataset( + data: pd.DataFrame, dataset: Dataset, configuration_tree: LayeredConfigTree, seed: Any +) -> pd.DataFrame: + data = _reformat_dates_for_noising(data, dataset) + data = _clean_input_data(data, dataset) + noised_data = noise_dataset(dataset, data, configuration_tree, seed) + noised_data = _extract_columns(dataset.columns, noised_data) + return noised_data + + def validate_source_compatibility(source: Path, dataset: Dataset): # TODO [MIC-4546]: Clean this up w/ metadata and update test_interface.py tests to be generic directories = [x.name for x in source.iterdir() if x.is_dir()] @@ -182,12 +234,6 @@ def _coerce_dtypes( return data -def _load_data_from_path(data_path: Path, user_filters: List[Tuple]) -> pd.DataFrame: - """Load data from a data file given a data_path and a year_filter.""" - data = load_standard_dataset_file(data_path, user_filters) - return data - - def _reformat_dates_for_noising(data: pd.DataFrame, dataset: Dataset): """Formats date columns so they can be noised as strings.""" data = data.copy() @@ -243,7 +289,8 @@ def generate_decennial_census( year: Optional[int] = 2020, state: Optional[str] = None, verbose: bool = False, -) -> pd.DataFrame: + engine: Literal["pandas", "dask"] = "pandas", +) -> DataFrame: """ Generates a pseudopeople decennial census dataset which represents simulated responses to the US Census Bureau's Census of Population @@ -287,9 +334,18 @@ def generate_decennial_census( Log with verbosity if `True`. Default is `False`. + :param engine: + + Engine to use for loading data. Determines the return type. + Default is "pandas" which returns a pandas DataFrame. + "dask" returns a Dask DataFrame and requires Dask to be + installed (e.g. `pip install pseudopeople[dask]`). + It runs the dataset generation on a Dask cluster, which can + parallelize and run out-of-core. + :return: - A `pandas.DataFrame` of simulated decennial census data. + A DataFrame of simulated decennial census data. :raises ConfigurationError: @@ -312,7 +368,9 @@ def generate_decennial_census( user_filters.append( (DATASETS.census.state_column_name, "==", get_state_abbreviation(state)) ) - return _generate_dataset(DATASETS.census, source, seed, config, user_filters, verbose) + return _generate_dataset( + DATASETS.census, source, seed, config, user_filters, verbose, engine_name=engine + ) def generate_american_community_survey( @@ -322,7 +380,8 @@ def generate_american_community_survey( year: Optional[int] = 2020, state: Optional[str] = None, verbose: bool = False, -) -> pd.DataFrame: + engine: Literal["pandas", "dask"] = "pandas", +) -> DataFrame: """ Generates a pseudopeople ACS dataset which represents simulated responses to the ACS survey. @@ -373,9 +432,18 @@ def generate_american_community_survey( Log with verbosity if `True`. Default is `False`. + :param engine: + + Engine to use for loading data. Determines the return type. + Default is "pandas" which returns a pandas DataFrame. + "dask" returns a Dask DataFrame and requires Dask to be + installed (e.g. `pip install pseudopeople[dask]`). + It runs the dataset generation on a Dask cluster, which can + parallelize and run out-of-core. + :return: - A `pandas.DataFrame` of simulated ACS data. + A DataFrame of simulated ACS data. :raises ConfigurationError: @@ -415,7 +483,9 @@ def generate_american_community_survey( user_filters.extend( [(DATASETS.acs.state_column_name, "==", get_state_abbreviation(state))] ) - return _generate_dataset(DATASETS.acs, source, seed, config, user_filters, verbose) + return _generate_dataset( + DATASETS.acs, source, seed, config, user_filters, verbose, engine_name=engine + ) def generate_current_population_survey( @@ -425,7 +495,8 @@ def generate_current_population_survey( year: Optional[int] = 2020, state: Optional[str] = None, verbose: bool = False, -) -> pd.DataFrame: + engine: Literal["pandas", "dask"] = "pandas", +) -> DataFrame: """ Generates a pseudopeople CPS dataset which represents simulated responses to the CPS survey. @@ -477,9 +548,18 @@ def generate_current_population_survey( Log with verbosity if `True`. Default is `False`. + :param engine: + + Engine to use for loading data. Determines the return type. + Default is "pandas" which returns a pandas DataFrame. + "dask" returns a Dask DataFrame and requires Dask to be + installed (e.g. `pip install pseudopeople[dask]`). + It runs the dataset generation on a Dask cluster, which can + parallelize and run out-of-core. + :return: - A `pandas.DataFrame` of simulated CPS data. + A DataFrame of simulated CPS data. :raises ConfigurationError: @@ -519,7 +599,9 @@ def generate_current_population_survey( user_filters.extend( [(DATASETS.cps.state_column_name, "==", get_state_abbreviation(state))] ) - return _generate_dataset(DATASETS.cps, source, seed, config, user_filters, verbose) + return _generate_dataset( + DATASETS.cps, source, seed, config, user_filters, verbose, engine_name=engine + ) def generate_taxes_w2_and_1099( @@ -529,7 +611,8 @@ def generate_taxes_w2_and_1099( year: Optional[int] = 2020, state: Optional[str] = None, verbose: bool = False, -) -> pd.DataFrame: + engine: Literal["pandas", "dask"] = "pandas", +) -> DataFrame: """ Generates a pseudopeople W2 and 1099 tax dataset which represents simulated tax form data. @@ -572,9 +655,18 @@ def generate_taxes_w2_and_1099( Log with verbosity if `True`. Default is `False`. + :param engine: + + Engine to use for loading data. Determines the return type. + Default is "pandas" which returns a pandas DataFrame. + "dask" returns a Dask DataFrame and requires Dask to be + installed (e.g. `pip install pseudopeople[dask]`). + It runs the dataset generation on a Dask cluster, which can + parallelize and run out-of-core. + :return: - A `pandas.DataFrame` of simulated W2 and 1099 tax data. + A DataFrame of simulated W2 and 1099 tax data. :raises ConfigurationError: @@ -599,7 +691,7 @@ def generate_taxes_w2_and_1099( (DATASETS.tax_w2_1099.state_column_name, "==", get_state_abbreviation(state)) ) return _generate_dataset( - DATASETS.tax_w2_1099, source, seed, config, user_filters, verbose + DATASETS.tax_w2_1099, source, seed, config, user_filters, verbose, engine_name=engine ) @@ -610,7 +702,8 @@ def generate_women_infants_and_children( year: Optional[int] = 2020, state: Optional[str] = None, verbose: bool = False, -) -> pd.DataFrame: + engine: Literal["pandas", "dask"] = "pandas", +) -> DataFrame: """ Generates a pseudopeople WIC dataset which represents a simulated version of the administrative data that would be recorded by WIC. @@ -663,9 +756,19 @@ def generate_women_infants_and_children( :param verbose: Log with verbosity if `True`. Default is `False`. + + :param engine: + + Engine to use for loading data. Determines the return type. + Default is "pandas" which returns a pandas DataFrame. + "dask" returns a Dask DataFrame and requires Dask to be + installed (e.g. `pip install pseudopeople[dask]`). + It runs the dataset generation on a Dask cluster, which can + parallelize and run out-of-core. + :return: - A `pandas.DataFrame` of simulated WIC data. + A DataFrame of simulated WIC data. :raises ConfigurationError: @@ -689,7 +792,9 @@ def generate_women_infants_and_children( user_filters.append( (DATASETS.wic.state_column_name, "==", get_state_abbreviation(state)) ) - return _generate_dataset(DATASETS.wic, source, seed, config, user_filters, verbose) + return _generate_dataset( + DATASETS.wic, source, seed, config, user_filters, verbose, engine_name=engine + ) def generate_social_security( @@ -698,7 +803,8 @@ def generate_social_security( config: Union[Path, str, Dict[str, Dict]] = None, year: Optional[int] = 2020, verbose: bool = False, -) -> pd.DataFrame: + engine: Literal["pandas", "dask"] = "pandas", +) -> DataFrame: """ Generates a pseudopeople SSA dataset which represents simulated Social Security Administration (SSA) data. @@ -732,9 +838,18 @@ def generate_social_security( Log with verbosity if `True`. Default is `False`. + :param engine: + + Engine to use for loading data. Determines the return type. + Default is "pandas" which returns a pandas DataFrame. + "dask" returns a Dask DataFrame and requires Dask to be + installed (e.g. `pip install pseudopeople[dask]`). + It runs the dataset generation on a Dask cluster, which can + parallelize and run out-of-core. + :return: - A `pandas.DataFrame` of simulated SSA data. + A DataFrame of simulated SSA data. :raises ConfigurationError: @@ -763,7 +878,9 @@ def generate_social_security( except (pd.errors.OutOfBoundsDatetime, ValueError): raise ValueError(f"Invalid year provided: '{year}'") seed = seed * 10_000 + year - return _generate_dataset(DATASETS.ssa, source, seed, config, user_filters, verbose) + return _generate_dataset( + DATASETS.ssa, source, seed, config, user_filters, verbose, engine_name=engine + ) def generate_taxes_1040( @@ -773,7 +890,8 @@ def generate_taxes_1040( year: Optional[int] = 2020, state: Optional[str] = None, verbose: bool = False, -) -> pd.DataFrame: + engine: Literal["pandas", "dask"] = "pandas", +) -> DataFrame: """ Generates a pseudopeople 1040 tax dataset which represents simulated tax form data. @@ -816,9 +934,18 @@ def generate_taxes_1040( Log with verbosity if `True`. Default is `False`. + :param engine: + + Engine to use for loading data. Determines the return type. + Default is "pandas" which returns a pandas DataFrame. + "dask" returns a Dask DataFrame and requires Dask to be + installed (e.g. `pip install pseudopeople[dask]`). + It runs the dataset generation on a Dask cluster, which can + parallelize and run out-of-core. + :return: - A `pandas.DataFrame` of simulated 1040 tax data. + A DataFrame of simulated 1040 tax data. :raises ConfigurationError: @@ -842,7 +969,9 @@ def generate_taxes_1040( user_filters.append( (DATASETS.tax_1040.state_column_name, "==", get_state_abbreviation(state)) ) - return _generate_dataset(DATASETS.tax_1040, source, seed, config, user_filters, verbose) + return _generate_dataset( + DATASETS.tax_1040, source, seed, config, user_filters, verbose, engine_name=engine + ) def fetch_filepaths(dataset: Dataset, source: Path) -> Union[List, List[dict]]: diff --git a/src/pseudopeople/loader.py b/src/pseudopeople/loader.py index fc786659..f2f65b14 100644 --- a/src/pseudopeople/loader.py +++ b/src/pseudopeople/loader.py @@ -7,21 +7,44 @@ from pseudopeople.constants.metadata import DatasetNames from pseudopeople.exceptions import DataSourceError +from pseudopeople.utilities import PANDAS_ENGINE, DataFrame, Engine -def load_standard_dataset_file(data_path: Path, user_filters: List[Tuple]) -> pd.DataFrame: - if data_path.suffix == ".parquet": +def load_standard_dataset( + data_path: Path, + user_filters: List[Tuple], + engine: Engine = PANDAS_ENGINE, + is_file: bool = True, +) -> DataFrame: + if is_file and data_path.suffix != ".parquet": + raise DataSourceError( + f"Source path must be a .parquet file. Provided {data_path.suffix}" + ) + + if engine == PANDAS_ENGINE: if len(user_filters) == 0: # pyarrow.parquet.read_table doesn't accept an empty list user_filters = None data = pq.read_table(data_path, filters=user_filters).to_pandas() + + # TODO: The index in our simulated population files is never meaningful. + # For some reason, the 1040 dataset is currently saved with a non-RangeIndex + # in the large data, and all datasets have a non-RangeIndex in the sample data. + # If we don't drop these here, our index can have duplicates when we load multiple + # shards at once. Having duplicates in the index breaks much of + # our noising logic. + data.reset_index(drop=True, inplace=True) else: + # Dask + import dask.dataframe as dd + + data = dd.read_parquet(str(data_path), filters=user_filters) + # See TODO above. + data = data.reset_index(drop=True) + + if not isinstance(data, engine.dataframe_class): raise DataSourceError( - f"Source path must be a .parquet file. Provided {data_path.suffix}" - ) - if not isinstance(data, pd.DataFrame): - raise DataSourceError( - f"File located at {data_path} must contain a pandas DataFrame. " + f"File located at {data_path} must contain a DataFrame. " "Please provide the path to the unmodified root data directory." ) diff --git a/src/pseudopeople/utilities.py b/src/pseudopeople/utilities.py index acbb3334..73c0d60b 100644 --- a/src/pseudopeople/utilities.py +++ b/src/pseudopeople/utilities.py @@ -1,6 +1,7 @@ import sys +from dataclasses import dataclass from functools import cache -from typing import Any, Optional, Union +from typing import Any, Callable, Optional, Union import numpy as np import pandas as pd @@ -229,6 +230,51 @@ def count_occurrences(string, sub): return count +#################### +# Engine utilities # +#################### + + +@dataclass +class Engine: + name: str + dataframe_class_getter: Callable + + @property + def dataframe_class(self): + return self.dataframe_class_getter() + + +PANDAS_ENGINE = Engine("pandas", lambda: pd.DataFrame) + + +def get_dask_dataframe(): + import dask.dataframe as dd + + return dd.DataFrame + + +DASK_ENGINE = Engine("dask", get_dask_dataframe) + + +def get_engine_from_string(engine: str): + if engine == "pandas": + return PANDAS_ENGINE + elif engine == "dask": + return DASK_ENGINE + else: + raise ValueError(f"Unknown engine {engine}") + + +try: + # Optional dependency + import dask.dataframe as dd + + DataFrame = Union[dd.DataFrame, pd.DataFrame] +except ImportError: + DataFrame = pd.DataFrame + + ########################## # Data utility functions # ########################## diff --git a/tests/integration/test_interface.py b/tests/integration/test_interface.py index 51f7de21..32de0000 100644 --- a/tests/integration/test_interface.py +++ b/tests/integration/test_interface.py @@ -81,8 +81,16 @@ DATASETS.tax_1040.name, ], ) +@pytest.mark.parametrize( + "engine", + [ + "pandas", + "dask", + ], +) def test_generate_dataset_from_multiple_shards( dataset_name: str, + engine: str, config, request, split_sample_data_dir, @@ -103,9 +111,13 @@ def test_generate_dataset_from_multiple_shards( seed=SEED, year=None, source=split_sample_data_dir, + engine=engine, config=config, ) + if engine == "dask": + noised_dataset = noised_dataset.compute() + # Check same order of magnitude of rows was removed -- we don't know the # full data size (we would need unnoised data for that), so we just check # for similar lengths